Source code for monai.deploy.core.executors.single_process_executor

# Copyright 2021 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os
import shutil
from pathlib import Path

from colorama import Fore

from monai.deploy.core.domain.datapath import DataPath, NamedDataPath
from monai.deploy.core.execution_context import BaseExecutionContext, ExecutionContext
from monai.deploy.core.io_type import IOType
from monai.deploy.core.models import ModelFactory
from monai.deploy.core.operator_info import IO
from monai.deploy.exceptions import IOMappingError

from .executor import Executor

TEMP_WORKDIR = ".monai_workdir"  # working directory name used when no `workdir` is specified


[docs]class SingleProcessExecutor(Executor): """This class implements execution of a MONAI App in a single process in environment. """
[docs] def run(self): """Run the app. This method executes operators of the graph in topological order: - If a node (operator) is a root node, its input is treated as a data path (DataPath, IOType.DISK). - If a node (operator) is a leaf node, its output is treated as a data path (DataPath, IOType.DISK). - After the execution of an operator, the output of the operator is used as the input of the next operator. """ app_context = self.app.context # Take paths as absolute paths models = ModelFactory.create(os.path.abspath(app_context.model_path)) input_path = os.path.abspath(self.app.context.input_path) output_path = os.path.abspath(self.app.context.output_path) # Create the output directory if it does not exist if not os.path.exists(output_path): os.makedirs(output_path, exist_ok=True) # Store old pwd old_pwd = os.getcwd() # If workdir is not specified, create a temporary path (.monai_workdir) if not self.app.context.workdir: if os.path.exists(TEMP_WORKDIR): shutil.rmtree(TEMP_WORKDIR) os.mkdir(TEMP_WORKDIR) workdir = os.path.abspath(TEMP_WORKDIR) else: # Absolute path of the working directory workdir = os.path.abspath(self.app.context.workdir) # Create execution context # Currently, we only allow a single input/output path (with empty label: ""). # TODO(gigony): Supports multiple inputs/outputs (#87) exec_context = BaseExecutionContext( self.datastore, input=NamedDataPath({"": DataPath(input_path, read_only=True)}), output=NamedDataPath({"": DataPath(output_path, read_only=True)}), models=models, ) g = self.app.graph try: for op in g.gen_worklist(): op_exec_context = ExecutionContext(exec_context, op) # Set source input for a label if op is a root node and (<data type>,<storage type>) == (DataPath,IOType.DISK) is_root = g.is_root(op) if is_root: input_op_info = op.op_info input_labels = input_op_info.get_labels(IO.INPUT) for input_label in input_labels: input_data_type = input_op_info.get_data_type(IO.INPUT, input_label) input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label) if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK: op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label) # Set destination output for a label if op is a leaf node and (<data type>,<storage type>) == (DataPath,IOType.DISK) is_leaf = g.is_leaf(op) if is_leaf: output_op_info = op.op_info output_labels = output_op_info.get_labels(IO.OUTPUT) for output_label in output_labels: output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label) output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label) if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK: op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label) # Change the current working directory to the working directory of the operator # op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}" relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/") op_output_folder = str(Path(workdir, relative_output_path)) os.makedirs(op_output_folder, exist_ok=True) os.chdir(op_output_folder) # Execute pre_compute() print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET) op.pre_compute() # Execute compute() print( Fore.GREEN + "Executing operator %s " % op.__class__.__name__ + Fore.YELLOW + "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid) + Fore.RESET ) op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context) # Execute post_compute() print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET) op.post_compute() # Set input to next operator next_ops = g.gen_next_operators(op) for next_op in next_ops: io_map = g.get_io_map(op, next_op) if not io_map: import inspect raise IOMappingError( f"No IO mappings found for {op.name} -> {next_op.name} in " f"{inspect.getabsfile(self.app.__class__)}" ) next_op_exec_context = ExecutionContext(exec_context, next_op) for out_label, in_labels in io_map.items(): output = op_exec_context.output_context.get(out_label) for in_label in in_labels: next_op_exec_context.input_context.set(output, in_label) finally: # Always restore pwd even if an exception is raised (This logic can be run in an IPython environment) os.chdir(old_pwd) # Remove a temporary workdir old_pwd = os.getcwd() if os.path.exists(TEMP_WORKDIR): shutil.rmtree(TEMP_WORKDIR)