Source code for monai.deploy.operators.stl_conversion_operator

# Copyright 2022-2023 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict, Optional, Union

import numpy as np

from monai.deploy.utils.importutil import optional_import

nib, _ = optional_import("nibabel")
sitk, _ = optional_import("SimpleITK")
label, _ = optional_import("skimage.measure", name="label")
measure, _ = optional_import("skimage", name="measure")
mesh, _ = optional_import("stl", name="mesh")
resize, _ = optional_import("skimage.transform", name="resize")
trimesh, _ = optional_import("trimesh")

from monai.deploy.core import ConditionType, Fragment, Image, Operator, OperatorSpec

__all__ = ["STLConversionOperator", "STLConverter"]


# nibabel is required by the dependent class STLConverter.
# @md.env(
#     pip_packages=["numpy>=1.21", "nibabel >= 3.2.1", "numpy-stl>=2.12.0", "scikit-image>=0.17.2", "trimesh>=3.8.11"]
# )
[docs]class STLConversionOperator(Operator): """Converts volumetric image to surface mesh in STL format. If a file path is provided, the STL binary will be saved in the said output folder. This operator also save the STL file as bytes in memory, idenfied by the named output. Being optional, this output does not require any downstream receiver. Named inputs: image: Image object for which to generate surface mesh. output_file: Optional, the path of the file to save the mesh in STL format. If provided, this will overrider the output file path set on the object. Named output: stl_bytes: Bytes of the surface mesh STL file. Optional, not requiring a downstram receiver. """
[docs] def __init__( self, fragment: Fragment, *args, output_file: Union[Path, str], class_id=None, is_smooth=True, keep_largest_connected_component=True, **kwargs, ) -> None: """Creates an object to generate a surface mesh and saves it as an STL file if the path is provided. Args: fragment (Fragment): An instance of the Application class which is derived from Fragment. output_file ([Path,str], optional): output STL file path. None for no file output. class_id (array, optional): Class label ids. Defaults to None. is_smooth (bool, optional): smoothing or not. Defaults to True. keep_largest_connected_component (bool, optional): Defaults to True. """ self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__)) self._class_id = class_id self._is_smooth = is_smooth self._keep_largest_connected_component = keep_largest_connected_component self._output_file = Path(output_file) if output_file and len(str(output_file)) > 0 else None self._converter = STLConverter(*args, **kwargs) self.input_name_image = "image" self.input_name_output_file = "output_file" self.output_name_stl_bytes = "stl_bytes" super().__init__(fragment, *args, **kwargs)
[docs] def setup(self, spec: OperatorSpec): spec.input(self.input_name_image) spec.input(self.input_name_output_file).condition(ConditionType.NONE) # Optional, set as needed. spec.output(self.output_name_stl_bytes).condition(ConditionType.NONE) # No receivers required.
[docs] def compute(self, op_input, op_output, context): """Gets the input (image), processes it and sets results in the output. This function sets the mesh in STL bytes in its named output, which require no receivers. If provided, the mesh will be saved to the file in STL format. Args: op_input (InputContext): An input context for the operator. op_output (OutputContext): An output context for the operator. context (ExecutionContext): An execution context for the operator. """ input_image = op_input.receive(self.input_name_image) if not input_image: raise ValueError("Input image is not received.") _output_file = op_input.receive(self.input_name_output_file) if not _output_file: # Use the object's attribute to get the STL output path, if any. if self._output_file and len(str(self._output_file)) > 0: _output_file = Path(self._output_file) _output_file.parent.mkdir(parents=True, exist_ok=True) self._logger.info(f"Output will be saved in file {_output_file}.") stl_bytes = self._convert(input_image, _output_file) op_output.emit(stl_bytes, self.output_name_stl_bytes)
def _convert(self, image: Image, output_file: Optional[Path] = None): """ Args: image (Image): object with the image (ndarray in DHW) and its metadata dictionary. output_file (Path, optional): output file path. Default None for no file output. Returns: Bytes: Bytes of the binary of STL file """ # Use path in the output_file arg if provided. if isinstance(output_file, Path): output_file.parent.mkdir(exist_ok=True) return self._converter.convert( image=image, output_file=output_file, class_ids=self._class_id, is_smooth=self._is_smooth, keep_largest_connected_component=self._keep_largest_connected_component, )
[docs]class STLConverter: """Converts volumetric image to surface mesh in STL"""
[docs] def __init__(self, *args, **kwargs): """Creates an instance to generate a surface mesh in STL with an Image object.""" self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__))
[docs] def convert( self, image: Image, output_file: Optional[Path] = None, class_ids=None, is_smooth=True, keep_largest_connected_component=True, ): """ Args: image (Image): object with the image (ndarray of DHW index order) and its metadata dictionary. output_file (str): output STL file path. Default to None for not saving output file. class_id (array, optional): Class label id. Defaults to None. is_smooth (bool, optional): smoothing or not. Defaults to True. keep_largest_connected_component (bool, optional): Defaults to True. Returns: Bytes of the binary STL file. """ if not image or not isinstance(image, Image): raise ValueError("image is not a Image object.") if isinstance(output_file, Path): output_file.parent.mkdir(parents=True, exist_ok=True) s_image = self.SpatialImage(image) nda = s_image.image_array self._logger.info(f"Image ndarray shape:{nda.shape}") if keep_largest_connected_component: nda = STLConverter.get_largest_cc(nda) res = s_image.spacing if res is None: raise ValueError("Image spacing/resolution is missing.") # In case image has been re-oriented from the original affine = s_image.original_affine if ( affine is not None and s_image.affine is not None and np.sum(np.abs(s_image.original_affine - s_image.affine)) > 1e-7 ): codes = nib.orientations.axcodes2ornt(nib.orientations.aff2axcodes(np.linalg.inv(affine))) nda = nib.orientations.apply_orientation(np.squeeze(nda), codes) new_nda = np.zeros(shape=nda.shape, dtype=np.uint8) if class_ids is None: new_nda += (nda > 0).astype(np.uint8) elif isinstance(class_ids, list): for class_id in class_ids: new_nda += (nda == class_id).astype(np.uint8) else: try: new_nda += (nda == class_ids).astype(np.uint8) except ValueError as err: err_msg = "That was no valid value for class_id." self._logger.error(err_msg) raise ValueError(err_msg) from err max_res = np.amin(res) target_shape = [] for _j in range(3): length = float(nda.shape[_j]) * res[_j] / max_res length = int(np.round(length)) target_shape.append(length) new_nda = STLConverter.resize_volume(nda, output_shape=target_shape) verts, faces, _, _ = measure.marching_cubes(new_nda, level=0.5, step_size=5) for _j in range(3): verts[:, _j] = (verts[:, _j] + 0.5) * float(nda.shape[_j]) / float(new_nda.shape[_j]) - 0.5 itk_image = s_image.itk_image for _j in range(verts.shape[0]): vert = (float(verts[_j, 0]), float(verts[_j, 1]), float(verts[_j, 2])) vert = itk_image.TransformContinuousIndexToPhysicalPoint(vert) verts[_j, :] = np.array(vert) # Write out the STL file, and then load into trimesh try: temp_folder = tempfile.mkdtemp() raw_stl_filename = os.path.join(temp_folder, "temp.stl") STLConverter.write_stl(verts, faces, raw_stl_filename) mesh_data = trimesh.load(raw_stl_filename) if is_smooth: trimesh.smoothing.filter_taubin(mesh_data, iterations=20) final_file_path = output_file if output_file else os.path.join(temp_folder, "surface_mesh.stl") mesh_data.export(final_file_path) with open(str(final_file_path), "rb") as r_file: stl_bytes = r_file.read() finally: shutil.rmtree(temp_folder) return stl_bytes
# Helper functions @staticmethod def get_largest_cc(nda): logging.debug("ndarray shape: {}".format(nda.shape)) labels = label(nda) # assume at least 1 CC assert labels.max() != 0 largest_cc = labels == np.argmax(np.bincount(labels.flat)[1:]) + 1 largest_cc = largest_cc.astype(np.uint8) return largest_cc @staticmethod def resize_volume(nda, output_shape, order=1, preserve_range=True, anti_aliasing=False): return resize( nda, output_shape, order=order, mode="constant", preserve_range=preserve_range, anti_aliasing=anti_aliasing ) @staticmethod def write_stl(verts, faces, filename): # Create the mesh cube = mesh.Mesh(np.zeros(faces.shape[0], dtype=mesh.Mesh.dtype)) for i, f in enumerate(faces): for j in range(3): cube.vectors[i][j] = verts[f[j], :] cube.save(os.path.splitext(filename)[0] + ".stl") # Helper class for wrapping the App SDK Image object #
[docs] class SpatialImage: """Object encapsulating a spatial volume image instance of Image. Channel is not supported in this version. """
[docs] def __init__(self, image: Image, dtype=np.float32): """Creates an instance. Args: image(Image): An instance of Image. dtype (Numpy type, optional): Defaults to np.float32. """ self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__)) if not image or not isinstance(image, Image): raise ValueError("Argument is not a Image object.") self._image = image self._dtype = dtype self._props: Dict = {} """ Properties may include some or all of the following img_array shape spacing original_affine affine """ self._read_from_in_mem_image(self._image)
@property def image_array(self): """Image data in Numpy array, or None""" return self._props.get("img_array", None) @property def itk_image(self): """ITK image object created from the encapsulated image object, or None""" return self._props.get("itk_image", None) @property def shape(self): """Shape of image array, or None""" return self._props.get("shape", None) @property def spacing(self): """Pixel spacing of original image, aka resolution, or None""" return self._props.get("spacing", None) @property def original_affine(self): """Original affine of the image, or None""" return self._props.get("original_affine", None) @property def affine(self): """Affine of the re-oriented image data, or None""" return self._props.get("affine", None)
[docs] def set_property(self, key: str, value): """Sets an image property Args: key (str): key of the property value: value of the property """ self._props[key] = value
[docs] def get_property(self, key: str, default=None): """Gets value of the specified property Args: key (str): key of the property default: default value if the property does not exist. Returns: the value of the property, or the default value if property does not exist. """ return self._props.get(key, default)
[docs] def get_data(self): """Returns the image array in ndarray""" return self._props.get("img_array", None)
def _load_data(self, image): img_array = image.asnumpy() img_meta_dict = image.metadata() shape = np.asarray(image.asnumpy().shape) spacing = np.asarray( ( img_meta_dict["row_pixel_spacing"], img_meta_dict["col_pixel_spacing"], img_meta_dict["depth_pixel_spacing"], ) ) original_affine = img_meta_dict["nifti_affine_transform"] affine = original_affine itk_image = sitk.GetImageFromArray(img_array) itk_image.SetSpacing( [ float(img_meta_dict["row_pixel_spacing"]), float(img_meta_dict["col_pixel_spacing"]), float(img_meta_dict["depth_pixel_spacing"]), ] ) direction = [] direction.extend(img_meta_dict["row_direction_cosine"]) direction.extend(img_meta_dict["col_direction_cosine"]) direction.extend(img_meta_dict["depth_direction_cosine"]) itk_image.SetDirection(direction) return img_array, affine, original_affine, shape, spacing, itk_image def _read_from_in_mem_image(self, image): """Parse the in-memory image for the attributes. Args: image (Image): App SDK Image instance. Returns: An instance of SpacialImage. """ img_array, affine, original_affine, shape, spacing, itk_image = self._load_data(image) num_dims = len(img_array.shape) img_array = img_array.astype(self._dtype) if num_dims == 2: self._logger.info("2D image") elif num_dims == 3: self._logger.info("3D image") elif num_dims <= 5: # if 4d data, we assume 4th dimension is channels. # if 5d data, try to squeeze 5th dimension. if num_dims == 5: img_array = np.squeeze(img_array) if len(img_array.shape) != 4: raise ValueError("Cannot squeeze 5D image to 4D; object doesn't support time based data.") if self.is_channels_first: self._logger.info("4D image, channel first") else: self._logger.info("4D image, channel last") else: raise NotImplementedError("Object does not support image of dims {}".format(num_dims)) self._props["original_affine"] = original_affine self._props["affine"] = affine self._props["spacing"] = spacing self._props["shape"] = shape self._props["img_array"] = img_array self._props["itk_image"] = itk_image
def test(): from monai.deploy.operators.dicom_data_loader_operator import DICOMDataLoaderOperator from monai.deploy.operators.dicom_series_selector_operator import DICOMSeriesSelectorOperator from monai.deploy.operators.dicom_series_to_volume_operator import DICOMSeriesToVolumeOperator current_file_dir = Path(__file__).parent.resolve() data_path = current_file_dir.joinpath("../../../inputs/spleen_ct/dcm") output_path = Path.cwd() / "output_stl/test.stl" fragment = Fragment() loader = DICOMDataLoaderOperator(fragment, name="dcm_loader") series_selector = DICOMSeriesSelectorOperator(fragment, name="series_selector") dcm_to_volume_op = DICOMSeriesToVolumeOperator(fragment, name="dcm_to_vol") stl_writer = STLConversionOperator(fragment, output_file=output_path, name="stl_writer") # Testing with the main entry functions study_list = loader.load_data_to_studies(data_path.absolute()) study_selected_series_list = series_selector.filter(None, study_list) image = dcm_to_volume_op.convert_to_image(study_selected_series_list) stl_writer._convert(image, Path("output_stl/test.stl")) if __name__ == "__main__": test()