Source code for monailabel.utils.async_tasks.task

# Copyright (c) MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from monailabel.interfaces.utils.app import app_instance
from monailabel.utils.async_tasks.utils import processes, run_background_task, stop_background_task, tasks

logger = logging.getLogger(__name__)


[docs]class AsyncTask:
[docs] @staticmethod def run(method: str, request=None, params=None, force_sync=False, enqueue=False): if len(processes(method)) and not enqueue: description = f"++++++++++ {method.capitalize()} Task is Already Running" logger.info(description) return None, description instance = app_instance() config = instance.info().get("config", {}).get(method, {}) request = request if request else {} request.update(config) params = params if params is not None else {} request.update(params) # Support bundle/scripts over windows bundle_path = instance.bundle_path(request.get("model")) if bundle_path: request["bundle_path"] = bundle_path logger.info(f"{method.capitalize()} request: {request}") if force_sync: if method == "batch_infer": return instance.batch_infer(request), None if method == "scoring": return instance.scoring(request), None if method == "train": return instance.train(request), None return run_background_task(request, method), None
[docs] @staticmethod def status(method: str, all: bool = False, check_if_running: bool = False): batch_process = processes(method) batch_tasks = tasks(method) if check_if_running: if len(batch_process) == 0: description = f"No {method.capitalize()} Tasks are currently Running" logger.debug(description) return None, description task_id = next(iter(batch_process)) return [task for task in batch_tasks if task["id"] == task_id][0], None task = batch_tasks[-1] if len(batch_tasks) else None if task is None: description = f"No {method.capitalize()} Tasks Found" logger.debug(description) return None, description ret = batch_tasks if all else task return ret, None
[docs] @staticmethod def stop(method): return stop_background_task(method)