Developer Guide

MONAI Stream pipelines are componsed of three types of components: sources, filters and sinks, each of which can be distinguished from the other by type as they are respectively represented by StreamSourceComponent, StreamFilterComponent, and StreamSinkComponent.

In the next section we provide a summary of the most commonly used MONAI Stream components, then we elaborate on how they can be combined together to create streaming inference apps.

MONAI Stream Pipeline components

URISource

URISource is a source component which generates data when provided with a URI to a data source such as a local video file, a video stream (e.g. via http://), or a live stream.

URISource(uri="file:///my/videos/video.mp4"),

Note

Feed from multiple URISource components can be aggregated together via NVAggregatedSourcesBin (see below).

AJAVideoSource

AJAVideoSource provides support for live streaming from AJA capture devices connected to x86 or Clara AGX systems.

Warning

AJAVideoSource is currently not compatible with NVAggregatedSourcesBin.

FakeSource

FakeSource is a sink component that allows the developer to end the MONAI Stream pipeline without the need to visualize data.

NVAggregatedSourcesBin

NVAggregatedSourcesBin is a special type of source component, which can aggregate data from multiple sources and present it as a single unit of data concatenated in the batch dimension. For instance it may concatenate multiple URISource components as shown below, resizing all to the same dimensions, and stacking the data in the batch dimension.

The code below show an example declaration of NVAggregatedSourcesBin with two URISource components, the output of which is resized to 864 x 480 and stacked in the batch dimensions.

NVAggregatedSourcesBin(
   [
      URISource(uri="file:///my/videos/video1.mp4"),
      URISource(uri="file:///my/videos/video2.mp4"),
   ],
   output_width=864,
   output_height=480,
)
stateDiagram-v2 state "URISource" as URISource_1 state "URISource" as URISource_2 state NVAggregatedSourcesBin { URISource_1 --> BatchData URISource_2 --> BatchData BatchData --> [*] }

NVVideoConvert

NVVideoConvert is a filter component which allows the developer to convert the upstream data both in format and size.

For example we may want to create an NVVideoConvert component that converts data to RGBA with size 864 x 480.

NVVideoConvert(
      FilterProperties(
         format="RGBA",
         width=864,
         height=480,
      )
)

NVInferServer

NVInferServer receives the output of NVVideoConvert and runs a configured AI model to produce results (e.g. segmentation, classification, etc.) in the form of User Metadata. This means that NVInferServer outputs primarily the original input along with inference results in user medatadata, therefore one must be careful to select the correct data in the following component.

For the NVInferServer the developer will need to specify a configuration using the infer server configuration objects InferServerConfiguration. In the example below, NVInferServer uses the default configuration with minor modifications specifying the path to the model repository /app/models, the model name cholec_unet_864x480, the model version (-1 referring to the latest), and the inference server log verbosity.

infer_server_config = NVInferServer.generate_default_config()
infer_server_config.infer_config.backend.trt_is.model_repo.root = "/app/models"
infer_server_config.infer_config.backend.trt_is.model_name = "us_unet_256x256"
infer_server_config.infer_config.backend.trt_is.version = "-1"
infer_server_config.infer_config.backend.trt_is.model_repo.log_level = 0

...

NVInferServer(
   config=infer_server_config,
)

The inference server received the data provided to it from the upstream component (e.g. NVVideoConvert) and performs inference based on the configured models in the model repo. The results of the inference are stored in the “user metadata”, therefore the primary output of NVInferServer is the original data stream and the results are stores in the user metadata. we will see how to access the user metadata in the TransformsChainComponent.

stateDiagram-v2 state NVInferServer { [*] --> Model Model --> Model_Output_1 Model --> Model_Output_... Model --> Model_Output_N [*] --> [*] Model_Output_1 --> User_Metadata[1..N] Model_Output_... --> User_Metadata[1..N] Model_Output_N --> User_Metadata[1..N] User_Metadata[1..N] --> [*] }

TransformChainComponent

TransformChainComponent is a filter component which allows the developer to apply MONAI transformations to streaming data coming from any other MONAI Stream source or filter. When placed after an NVInferServer component it takes all the inputs, original and user metadata, presents them to the MONAI transformations specified in the `transform_chain` parameter, and outputs the result specified by the output_label parameter. The inputs to the transform chain are labelled as follows:

  • the original stream is always present in the inputs with key ORIGINAL_IMAGE,

  • additional inputs to the transform chain are only available when TransformChainComponent follows NVInferServer where the keys to each output from the model in the NVInferServer match the output names of the model (see code below).

Warning

Currently, TransformChainComponent has limitations on the size of the input and output. Specifically, the size of the output in the transform_chain must be the same as the size of the input.

In the example below, TransformChainComponent will output the data with key output_label="CONCAT_IMAGE". Here, the input keys to the transform_chain are "ORIGINAL_IMAGE" and "OUTPUT__0", where the latter is the output label of the model defined in the NVInferServer in the last section.

# define a color-blending function to be used in the transform chain below
def color_blender(img: torch.Tensor):
   # show background segmentation as red
   img[..., 1] -= img[..., 1] * (1.0 - img[..., 4])
   img[..., 2] -= img[..., 2] * (1.0 - img[..., 4])

   # show foreground segmentation as blue
   img[..., 0] -= img[..., 0] * img[..., 5]
   img[..., 1] -= img[..., 1] * img[..., 5]

   return img[..., :4]

...

TransformChainComponent(
   # choose the label in the transform chain which we want to output
   output_label="CONCAT_IMAGE",
   # specify transformation to be applied to data
   transform_chain=Compose(
      [
         # apply post-transforms to segmentation model output `OUTPUT__0`
         Activationsd(keys=["OUTPUT__0"], sigmoid=True),
         AsDiscreted(keys=["OUTPUT__0"]),
         AsChannelLastd(keys=["OUTPUT__0"]),
         # concatenate segmentation and original image
         CastToTyped(keys=["ORIGINAL_IMAGE"], dtype=np.float),
         ConcatItemsd(keys=["ORIGINAL_IMAGE", "OUTPUT__0"], name="CONCAT_IMAGE", dim=2),
         # blend the original image and segmentation
         Lambdad(keys=["CONCAT_IMAGE"], func=color_blender),
         ScaleIntensityd(keys=["CONCAT_IMAGE"], minv=0, maxv=256),
         CastToTyped(keys=["CONCAT_IMAGE"], dtype=np.uint8),
      ]
   ),
)
stateDiagram-v2 state TransformChainComponent { [*] --> ImplicitInputMapping state "CastToTyped" as CastToTypedFLOAT state "CastToTyped" as CastToTypedINT state ImplicitInputMapping { state "[ Input[0], Input[1] ]" as IMInputs state "{<br>'ORIGINAL_IMAGE': Input[0],<br> 'OUTPUT__0': Input[1]<br>}" as IMOutputs [*] --> IMInputs IMInputs --> IMOutputs: Map List to Dict IMOutputs --> [*] } ImplicitInputMapping --> Activationsd Activationsd --> AsDiscreted AsDiscreted --> AsChannelLastd AsChannelLastd --> CastToTypedFLOAT CastToTypedFLOAT --> ConcatItemsd ConcatItemsd --> Lambdad Lambdad --> ScaleIntensityd ScaleIntensityd --> CastToTypedINT CastToTypedINT --> ImplicitOutputMapping state ImplicitOutputMapping { state "{<br>'ORIGINAL_IMAGE': Output[0],<br> 'OUTPUT__0': Output[1],<br>'CONCAT_IMAGE': Output[2]<br/>}" as OMInputs state "Output[2]" as OMOutputs [*] --> OMInputs OMInputs --> OMOutputs: Select "CONCAT_IMAGE" OMOutputs --> [*] } ImplicitOutputMapping --> [*] }

TransformChainComponentCupy

TransformChainComponentCupy is a filter component which allows the developer to insert custom data transformations that employ Cupy. It is a temporary counterpart to TransformChainComponent for use mainly in applications expected to run in Clara AGX devices as PyTorch (and by extension MONAI SDK) is currently not supported in Clara AGX devices.

TransformChainComponentCupy works the same fashion as TransformChainComponent, however, it passes Dict[str, cupy.ndarray] to the transform_chain.

# color blender function used in `TransformChainComponentCupy`
def color_blender(inputs: Dict[str, cupy.ndarray]):
   img = inputs["ORIGINAL_IMAGE"]
   mask = inputs["OUTPUT__0"]

   mask = cupy.cudnn.activation_forward(mask, cupy.cuda.cudnn.CUDNN_ACTIVATION_SIGMOID)

   # Ultrasound model outputs two channels, so modify only the red
   # and green channel in-place to apply mask.
   img[..., 1] = cupy.multiply(cupy.multiply(mask[0, ...], 1.0 - mask[1, ...]), img[..., 1])
   img[..., 2] = cupy.multiply(mask[0, ...], img[..., 2])
   img[..., 0] = cupy.multiply(1.0 - mask[1, ...], img[..., 0])

   return {"BLENDED_IMAGE": img}

...

# we select the "BLENDED_IMAGE" output from `color_blender`
TransformChainComponentCupy(transform_chain=color_blender, output_label="BLENDED_IMAGE"),

NVEglGlesSink

NVEglGlesSink is a component that allows developers to visualize the outputs of their pipelines when data is streamed via NVIDIA GPU.

FakeSink

FakeSink is a sink component that allows the developer to end the MONAI Stream pipeline without the need to visualize data. FakeSink is useful for unit testing and for cases where TransformChainComponent outputs data to disk, but provides no output other than the original data stream.

MONAI Stream Pipelines by Example

A MONAI Stream pipeline is a chain composition of MONAI Stream components that begins with one or more StreamSourceComponent, ends with StreamSinkComponent, and in between uses StreamFilterComponent to manipulate the data such as applying transformations and running AI inference.

MONAI Stream with Aggregated Sources

Let us walk through a simple example such as monaistream-pytorch-pp-app where the pipeline can be visualized as shown below.

stateDiagram-v2 NVAggregatedSourcesBin --> NVVideoConvert: BatchData Output NVVideoConvert --> NVInferServer: RGBA Output NVInferServer --> TransformChainComponent: ORIGINAL_IMAGE NVInferServer --> TransformChainComponent: OUTPUT__0 TransformChainComponent --> NVEglGlesSink: CONCAT_IMAGE

We can create this streaming inference app using the following code.

 1# generate a default configuration for `NVInferServer`
 2infer_server_config = NVInferServer.generate_default_config()
 3
 4# update default configuration with
 5#   - model repo path
 6#   - model name
 7#   - model version
 8#   - NVInferServer log verbosity
 9infer_server_config.infer_config.backend.trt_is.model_repo.root = "/app/models"
10infer_server_config.infer_config.backend.trt_is.model_name = "us_unet_256x256"
11infer_server_config.infer_config.backend.trt_is.version = "-1"
12infer_server_config.infer_config.backend.trt_is.model_repo.log_level = 0
13
14# simple color blender function to use in `Lambdad` MONAI transform
15def color_blender(img: torch.Tensor):
16   # show background segmentation as red
17   img[..., 1] -= img[..., 1] * (1.0 - img[..., 4])
18   img[..., 2] -= img[..., 2] * (1.0 - img[..., 4])
19
20   # show foreground segmentation as blue
21   img[..., 0] -= img[..., 0] * img[..., 5]
22   img[..., 1] -= img[..., 1] * img[..., 5]
23
24   return img[..., :4]
25
26pipeline = StreamCompose(
27   [
28      # read from local video file using `URISource` and use
29      # `NVAggregatedSourcesBin` to apply sizing transformations
30      NVAggregatedSourcesBin(
31            [
32               URISource(uri="file:///app/videos/Q000_04_tu_segmented_ultrasound_256.avi"),
33            ],
34            output_width=256,
35            output_height=256,
36      ),
37      # convert video stream to RGBA
38      NVVideoConvert(
39            FilterProperties(
40               format="RGBA",
41               width=256,
42               height=256,
43            )
44      ),
45      # chain output to `NVInferServer`
46      NVInferServer(
47            config=infer_server_config,
48      ),
49      # use `TransformChainComponent` to blend the original image with the segmentation
50      # output from `NVInferServer`
51      TransformChainComponent(
52            output_label="CONCAT_IMAGE",
53            transform_chain=Compose(
54               [
55                  # apply post-transforms to segmentation
56                  Activationsd(keys=["OUTPUT__0"], sigmoid=True),
57                  AsDiscreted(keys=["OUTPUT__0"]),
58                  AsChannelLastd(keys=["OUTPUT__0"]),
59                  # concatenate segmentation and original image
60                  CastToTyped(keys=["ORIGINAL_IMAGE"], dtype=np.float),
61                  ConcatItemsd(keys=["ORIGINAL_IMAGE", "OUTPUT__0"], name="CONCAT_IMAGE", dim=2),
62                  # blend the original image and segmentation
63                  Lambdad(keys=["CONCAT_IMAGE"], func=color_blender),
64                  ScaleIntensityd(keys=["CONCAT_IMAGE"], minv=0, maxv=256),
65                  CastToTyped(keys=["CONCAT_IMAGE"], dtype=np.uint8),
66               ]
67            ),
68      ),
69      # display output for `TransformChainComponent`
70      NVEglGlesSink(sync=True),
71   ]
72)
73
74# execute pipeline
75pipeline()

Looking more closely at lines 9 and 10, the NVInferServer component expects a model named us_unet_256x256 under /app/models with the directory structure and model configuration file expected by the Triton Inference Server.

In this case the directory structure of the model required the pipeline above is

/app/models
└── us_unet_256x256
   ├── 1
   │   └── monai_unet.engine
   └── config.pbtxt

and the model configuration file config.pbtxt describes the model metadata, as below.

name: "us_unet_256x256"
platform: "tensorrt_plan"
default_model_filename: "monai_unet.engine"
max_batch_size: 1
input [
   {
      name: "INPUT__0"
      data_type: TYPE_FP32
      dims: [ 3, 256, 256 ]
   }
]
output [
   {
      name: "OUTPUT__0"
      data_type: TYPE_FP32
      dims: [ 2, 256, 256]
   }
]

# Specify GPU instance.
instance_group {
   kind: KIND_GPU
   count: 1
   gpus: 0
}

The model configuration file specifies the model type as a TensorRT plan, and it’s expected inputs and outputs. The highlighted line in the model configuration shows the (one and only in this case) model output OUTPUT__0 that will be passed from NVInferServer to TransformChainComponent. Following the pipeline code snippet above it is apparent that the label OUTPUT__0 of the model configuration matches the key of the object being manipulated in the transform_chain in line 53.

AJA Video Capture app

MONAI Stream provides native support for AJA capture cards with GPU direct memory access. A simple example is provided in monaistream-rdma-capture-app where the pipeline can be visualized as shown below.

stateDiagram-v2 AJAVideoSource --> NVEglGlesSink: RGBA 1080p in GPU

The visualized pipeline is built using the code below.

# create a MONAI Stream pipeline for AJA capture with GPU RDMA
pipeline = StreamCompose(
   [
      AJAVideoSource(
         mode="UHDp30-rgba",
         input_mode="hdmi",
         is_nvmm=True,
         output_width=1920,
         output_height=1080,
      ),
      NVEglGlesSink(sync=True),
   ]
)

# start pipeline
pipeline()

While the pipeline here is simple, developers can add NVInferServer and TransformChainComponent to perform live streaming inference using AJA video capture cards on x86 or Clara AGX.