Source code for monai.deploy.core.application

# Copyright 2021 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Optional, Set, Type, Union

from monai.deploy.cli.main import LOG_CONFIG_FILENAME, parse_args, set_up_logging
from monai.deploy.core.graphs.factory import GraphFactory
from monai.deploy.core.models import ModelFactory
from monai.deploy.exceptions import IOMappingError
from monai.deploy.utils.importutil import get_class_file_path, get_docstring, is_subclass
from monai.deploy.utils.sizeutil import convert_bytes
from monai.deploy.utils.version import get_sdk_semver

from .app_context import AppContext
from .datastores import DatastoreFactory
from .env import BaseEnv
from .executors import ExecutorFactory
from .graphs.graph import Graph
from .operator import Operator
from .operator_info import IO
from .runtime_env import RuntimeEnv


[docs]class Application(ABC): """This is the base application class. All applications should be extended from this Class. The application class provides support for chaining up operators, as well as mechanism to execute the application. """ # Application's name. <class name> if not specified. name: str = "" # Application's description. <class docstring> if not specified. description: str = "" # Application's version. <git version tag> or '0.0.0' if not specified. version: str = "" # Special attribute to identify the application. # Used by the CLI executing get_application() or is_subclass() from deploy.utils.importutil to # determine the application to run. # This is needed to identify Application class across different environments (e.g. by `runpy.run_path()`). _class_id: str = "monai.application" _env: Optional["ApplicationEnv"] = None
[docs] def __init__( self, runtime_env: Optional[RuntimeEnv] = None, do_run: bool = False, path: Optional[Union[str, Path]] = None, ): """Constructor for the application class. It initializes the application's graph, the runtime environment and the application context. if `do_run` is True, it would accept user's arguments from the application's context and execute the application. Args: runtime_env (Optional[RuntimeEnv]): The runtime environment to use. do_run (bool): Whether to run the application. path (Optional[Union[str, Path]]): The path to the application (Python file path). This path is used for launching the application to get the package information from `monai.deploy.utils.importutil.get_application` method. """ # Setup app description if not self.name: self.name = self.__class__.__name__ if not self.description: self.description = get_docstring(self.__class__) if not self.version: try: from monai.deploy._version import get_versions self.version = get_versions()["version"] except ImportError: self.version = "0.0.0" # Set the application path if path: self.path = Path(path) else: self.path = get_class_file_path(self.__class__) # Set the runtime environment if str(self.path) == "ipython": self.in_ipython = True else: self.in_ipython = False # Setup program arguments if path is not None or self.in_ipython: # If `path` is specified, it means that it is called by # monai.deploy.utils.importutil.get_application() to get the package info. # If `self.in_ipython` is True, it means that it is called by ipython environment. # In both cases, we should not parse the arguments from the command line. argv = [sys.executable, str(self.path)] # use default parameters else: argv = sys.argv # Parse the command line arguments args = parse_args(argv, default_command="exec") context = AppContext(args.__dict__, runtime_env) self._context: AppContext = context self._graph: Graph = GraphFactory.create(context.graph) # Execute the builder to set up the application self._builder() # Compose operators self.compose() if do_run: self.run(log_level=args.log_level)
@classmethod def __subclasshook__(cls, c: Type) -> bool: return is_subclass(c, cls._class_id) def _builder(self): """This method is called by the constructor of Application to set up the operator. This method returns `self` to allow for method chaining and new `_builder()` method is chained by decorators. Returns: An instance of Application. """ return self @property def context(self) -> AppContext: """Returns the context of this application.""" return self._context @property def graph(self) -> Graph: """Gives access to the DAG. Returns: Instance of the DAG """ return self._graph @property def env(self): """Gives access to the environment. This sets a default value for the application's environment if not set. Returns: An instance of ApplicationEnv. """ if self._env is None: self._env = ApplicationEnv() return self._env
[docs] def add_operator(self, operator: Operator): """Adds an operator to the graph. Args: operator (Operator): An instance of the operator of type Operator. """ # Ensure that the operator is valid operator.ensure_valid() self._graph.add_operator(operator)
[docs] def add_flow( self, source_op: Operator, destination_op: Operator, io_map: Optional[Dict[str, Union[str, Set[str]]]] = None ): """Adds a flow from source to destination. An output port of the source operator is connected to one of the input ports of a destination operators. Args: source_op (Operator): An instance of the source operator of type Operator. destination_op (Operator): An instance of the destination operator of type Operator. io_map (Optional[Dict[str, Union[str, Set[str]]]]): A dictionary of mapping from the source operator's label to the destination operator's label(s). """ # Ensure that the source and destination operators are valid source_op.ensure_valid() destination_op.ensure_valid() op_output_labels = source_op.op_info.get_labels(IO.OUTPUT) op_input_labels = destination_op.op_info.get_labels(IO.INPUT) if not io_map: if len(op_output_labels) > 1: raise IOMappingError( f"The source operator has more than one output port " f"({', '.join(op_output_labels)}) so mapping should be specified explicitly!" ) if len(op_input_labels) > 1: raise IOMappingError( f"The destination operator has more than one output port ({', '.join(op_input_labels)}) " f"so mapping should be specified explicitly!" ) io_map = {"": {""}} # Convert io_map's values to the set of strings. io_maps: Dict[str, Set[str]] = io_map # type: ignore for k, v in io_map.items(): if isinstance(v, str): io_maps[k] = {v} # Verify that the source & destination operator have the input and output ports specified by the io_map output_labels = list(io_maps.keys()) if len(op_output_labels) == 1 and len(output_labels) != 1: raise IOMappingError( f"The source operator({source_op.name}) has only one port with label " f"'{next(iter(op_output_labels))}' but io_map specifies {len(output_labels)} " f"labels({', '.join(output_labels)}) to the source operator's output port" ) for output_label in output_labels: if output_label not in op_output_labels: if len(op_output_labels) == 1 and len(output_labels) == 1 and output_label == "": # Set the default output port label. io_maps[next(iter(op_output_labels))] = io_maps[output_label] del io_maps[output_label] break raise IOMappingError( f"The source operator({source_op.name}) has no output port with label '{output_label}'. " f"It should be one of ({', '.join(op_output_labels)})." ) output_labels = list(io_maps.keys()) # re-evaluate output_labels for output_label in output_labels: input_labels = io_maps[output_label] if len(op_input_labels) == 1 and len(input_labels) != 1: raise IOMappingError( f"The destination operator({destination_op.name}) has only one port with label " f"'{next(iter(op_input_labels))}' but io_map specifies {len(input_labels)} " f"labels({', '.join(input_labels)}) to the destination operator's input port" ) for input_label in input_labels: if input_label not in op_input_labels: if len(op_input_labels) == 1 and len(input_labels) == 1 and input_label == "": # Set the default input port label. input_labels.clear() input_labels.add(next(iter(op_input_labels))) break raise IOMappingError( f"The destination operator({destination_op.name}) has no input port with label '{input_label}'. " f"It should be one of ({', '.join(op_input_labels)})." ) self._graph.add_flow(source_op, destination_op, io_maps)
[docs] def get_package_info(self, model_path: Union[str, Path] = "") -> Dict: """Returns the package information of this application. Args: model_path (Union[str, Path]): The path to the model directory. Returns: A dictionary containing the package information of this application. """ app_path = self.path.name command = f"python3 -u /opt/monai/app/{app_path}" resource = self.context.resource # Get model name/path list # - If no model files are found at `model_path`, None will be returned by the ModelFactory.create method and # the `model_list` will be an empty list. # - If the path represents a model repository (one or more model objects. Necessary condition is model_path is # a folder), then `model_list` will abe a list of model objects (name and path). # - If only one model is found at model_path or model_path is a valid model file, `model_list` would be a # single model object list. model_list = [] if model_path: models = ModelFactory.create(model_path) if models: model_list = models.get_model_list() # Get pip requirement list spec_list = self.env.pip_packages for op in self.graph.get_operators(): spec_list.extend(op.env.pip_packages) spec_set = set() pip_requirement_list = [] for p in spec_list: spec = p.strip().lower() if spec not in spec_set: pip_requirement_list.append(spec) spec_set.add(spec) return { "app-name": self.name, "app-version": self.version, "sdk-version": get_sdk_semver(), "command": command, "resource": { "cpu": resource.cpu, "gpu": resource.gpu, "memory": convert_bytes(resource.memory), }, "models": model_list, "pip-packages": pip_requirement_list, }
[docs] def run( self, log_level: Optional[str] = None, input: Optional[str] = None, output: Optional[str] = None, model: Optional[str] = None, workdir: Optional[str] = None, datastore: Optional[str] = None, executor: Optional[str] = None, ) -> None: """Runs the application. This method accepts `log_level` to set the log level of the application. Other arguments are used to specify the `input`, `output`, `model`, `workdir`, `datastore`, and `executor`. (Cannot set `graph` because it is set and used by the constructor.) If those arguments are not specified, values in the application context will be used. This method is useful when you want to interactively run the application inside IPython (Jupyter Notebook). For example, you can run the following code in a notebook: >>> from pathlib import Path >>> import monai.deploy.core as md >>> from monai.deploy.core import ( >>> Application, >>> DataPath, >>> ExecutionContext, >>> InputContext, >>> IOType, >>> Operator, >>> OutputContext, >>> ) >>> >>> @md.input("path", DataPath, IOType.DISK) >>> @md.output("path", DataPath, IOType.DISK) >>> class FirstOperator(Operator): >>> def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext): >>> print(f"First Operator. input:{op_input.get().path}, model:{context.models.get().path}") >>> output_path = Path("output_first.txt") >>> output_path.write_text("first output\\n") >>> output.set(DataPath(output_path)) >>> >>> @md.input("path", DataPath, IOType.DISK) >>> @md.output("path", DataPath, IOType.DISK) >>> class SecondOperator(Operator): >>> def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext): >>> print(f"First Operator. output:{op_output.get().path}, model:{context.models.get().path}") >>> # The leaf operators can only read output DataPath and should not set output DataPath. >>> output_path = op_output.get().path / "output_second.txt" >>> output_path.write_text("second output\\n") >>> >>> class App(Application): >>> def compose(self): >>> first_op = FirstOperator() >>> second_op = SecondOperator() >>> >>> self.add_flow(first_op, second_op) >>> >>> if __name__ == "__main__": >>> App(do_run=True) >>> app = App() >>> app.run(input="inp", output="out", model="model.pt") >>> !ls out Args: log_level (Optional[str]): A log level. input (Optional[str]): An input data path. output (Optional[str]): An output data path. model (Optional[str]): A model path. workdir (Optional[str]): A working directory path. datastore (Optional[str]): A datastore path. executor (Optional[str]): An executor name. """ # Set arguments args = {} if input is not None: args["input"] = input if output is not None: args["output"] = output if model is not None: args["model"] = model if workdir is not None: args["workdir"] = workdir if datastore is not None: args["datastore"] = datastore if executor is not None: args["executor"] = executor # If no arguments are specified and if runtime is in IPython, do not run the application. if len(args) == 0 and self.in_ipython: return # Update app context app_context = self.context app_context.update(args) # Set up logging (try to load `LOG_CONFIG_FILENAME` in the application folder) # and run the application app_log_config_path = self.path.parent / LOG_CONFIG_FILENAME set_up_logging(log_level, config_path=app_log_config_path) datastore_obj = DatastoreFactory.create(app_context.datastore) executor_obj = ExecutorFactory.create(app_context.executor, {"app": self, "datastore": datastore_obj}) executor_obj.run()
[docs] @abstractmethod def compose(self): """This is a method that needs to implemented by all subclasses. Derived appications will chain up the operators inside this compose method. """ pass
class ApplicationEnv(BaseEnv): """Settings for the application environment. This class is used to specify the environment settings for the application. """ pass