# Source code for monai.networks.nets.dynunet

```
# Copyright (c) MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# isort: dont-add-import: from __future__ import annotations
from typing import List, Optional, Sequence, Tuple, Union
import torch
import torch.nn as nn
from torch.nn.functional import interpolate
from monai.networks.blocks.dynunet_block import UnetBasicBlock, UnetOutBlock, UnetResBlock, UnetUpBlock
__all__ = ["DynUNet", "DynUnet", "Dynunet"]
class DynUNetSkipLayer(nn.Module):
"""
Defines a layer in the UNet topology which combines the downsample and upsample pathways with the skip connection.
The member `next_layer` may refer to instances of this class or the final bottleneck layer at the bottom the UNet
structure. The purpose of using a recursive class like this is to get around the Torchscript restrictions on
looping over lists of layers and accumulating lists of output tensors which must be indexed. The `heads` list is
shared amongst all the instances of this class and is used to store the output from the supervision heads during
forward passes of the network.
"""
heads: Optional[List[torch.Tensor]]
def __init__(self, index, downsample, upsample, next_layer, heads=None, super_head=None):
super().__init__()
self.downsample = downsample
self.next_layer = next_layer
self.upsample = upsample
self.super_head = super_head
self.heads = heads
self.index = index
def forward(self, x):
downout = self.downsample(x)
nextout = self.next_layer(downout)
upout = self.upsample(nextout, downout)
if self.super_head is not None and self.heads is not None and self.index > 0:
self.heads[self.index - 1] = self.super_head(upout)
return upout
[docs]class DynUNet(nn.Module):
"""
This reimplementation of a dynamic UNet (DynUNet) is based on:
`Automated Design of Deep Learning Methods for Biomedical Image Segmentation <https://arxiv.org/abs/1904.08128>`_.
`nnU-Net: Self-adapting Framework for U-Net-Based Medical Image Segmentation <https://arxiv.org/abs/1809.10486>`_.
`Optimized U-Net for Brain Tumor Segmentation <https://arxiv.org/pdf/2110.03352.pdf>`_.
This model is more flexible compared with ``monai.networks.nets.UNet`` in three
places:
- Residual connection is supported in conv blocks.
- Anisotropic kernel sizes and strides can be used in each layers.
- Deep supervision heads can be added.
The model supports 2D or 3D inputs and is consisted with four kinds of blocks:
one input block, `n` downsample blocks, one bottleneck and `n+1` upsample blocks. Where, `n>0`.
The first and last kernel and stride values of the input sequences are used for input block and
bottleneck respectively, and the rest value(s) are used for downsample and upsample blocks.
Therefore, pleasure ensure that the length of input sequences (``kernel_size`` and ``strides``)
is no less than 3 in order to have at least one downsample and upsample blocks.
To meet the requirements of the structure, the input size for each spatial dimension should be divisible
by the product of all strides in the corresponding dimension. In addition, the minimal spatial size should have
at least one dimension that has twice the size of the product of all strides.
For example, if `strides=((1, 2, 4), 2, 2, 1)`, the spatial size should be divisible by `(4, 8, 16)`,
and the minimal spatial size is `(8, 8, 16)` or `(4, 16, 16)` or `(4, 8, 32)`.
The output size for each spatial dimension equals to the input size of the corresponding dimension divided by the
stride in strides[0].
For example, if `strides=((1, 2, 4), 2, 2, 1)` and the input size is `(64, 32, 32)`, the output size is `(64, 16, 8)`.
For backwards compatibility with old weights, please set `strict=False` when calling `load_state_dict`.
Usage example with medical segmentation decathlon dataset is available at:
https://github.com/Project-MONAI/tutorials/tree/master/modules/dynunet_pipeline.
Args:
spatial_dims: number of spatial dimensions.
in_channels: number of input channels.
out_channels: number of output channels.
kernel_size: convolution kernel size.
strides: convolution strides for each blocks.
upsample_kernel_size: convolution kernel size for transposed convolution layers. The values should
equal to strides[1:].
filters: number of output channels for each blocks. Different from nnU-Net, in this implementation we add
this argument to make the network more flexible. As shown in the third reference, one way to determine
this argument is like:
``[64, 96, 128, 192, 256, 384, 512, 768, 1024][: len(strides)]``.
The above way is used in the network that wins task 1 in the BraTS21 Challenge.
If not specified, the way which nnUNet used will be employed. Defaults to ``None``.
dropout: dropout ratio. Defaults to no dropout.
norm_name: feature normalization type and arguments. Defaults to ``INSTANCE``.
`INSTANCE_NVFUSER` is a faster version of the instance norm layer, it can be used when:
1) `spatial_dims=3`, 2) CUDA device is available, 3) `apex` is installed and 4) non-Windows OS is used.
act_name: activation layer type and arguments. Defaults to ``leakyrelu``.
deep_supervision: whether to add deep supervision head before output. Defaults to ``False``.
If ``True``, in training mode, the forward function will output not only the final feature map
(from `output_block`), but also the feature maps that come from the intermediate up sample layers.
In order to unify the return type (the restriction of TorchScript), all intermediate
feature maps are interpolated into the same size as the final feature map and stacked together
(with a new dimension in the first axis)into one single tensor.
For instance, if there are two intermediate feature maps with shapes: (1, 2, 16, 12) and
(1, 2, 8, 6), and the final feature map has the shape (1, 2, 32, 24), then all intermediate feature maps
will be interpolated into (1, 2, 32, 24), and the stacked tensor will has the shape (1, 3, 2, 32, 24).
When calculating the loss, you can use torch.unbind to get all feature maps can compute the loss
one by one with the ground truth, then do a weighted average for all losses to achieve the final loss.
deep_supr_num: number of feature maps that will output during deep supervision head. The
value should be larger than 0 and less than the number of up sample layers.
Defaults to 1.
res_block: whether to use residual connection based convolution blocks during the network.
Defaults to ``False``.
trans_bias: whether to set the bias parameter in transposed convolution layers. Defaults to ``False``.
"""
def __init__(
self,
spatial_dims: int,
in_channels: int,
out_channels: int,
kernel_size: Sequence[Union[Sequence[int], int]],
strides: Sequence[Union[Sequence[int], int]],
upsample_kernel_size: Sequence[Union[Sequence[int], int]],
filters: Optional[Sequence[int]] = None,
dropout: Optional[Union[Tuple, str, float]] = None,
norm_name: Union[Tuple, str] = ("INSTANCE", {"affine": True}),
act_name: Union[Tuple, str] = ("leakyrelu", {"inplace": True, "negative_slope": 0.01}),
deep_supervision: bool = False,
deep_supr_num: int = 1,
res_block: bool = False,
trans_bias: bool = False,
):
super().__init__()
self.spatial_dims = spatial_dims
self.in_channels = in_channels
self.out_channels = out_channels
self.kernel_size = kernel_size
self.strides = strides
self.upsample_kernel_size = upsample_kernel_size
self.norm_name = norm_name
self.act_name = act_name
self.dropout = dropout
self.conv_block = UnetResBlock if res_block else UnetBasicBlock
self.trans_bias = trans_bias
if filters is not None:
self.filters = filters
self.check_filters()
else:
self.filters = [min(2 ** (5 + i), 320 if spatial_dims == 3 else 512) for i in range(len(strides))]
self.input_block = self.get_input_block()
self.downsamples = self.get_downsamples()
self.bottleneck = self.get_bottleneck()
self.upsamples = self.get_upsamples()
self.output_block = self.get_output_block(0)
self.deep_supervision = deep_supervision
self.deep_supr_num = deep_supr_num
# initialize the typed list of supervision head outputs so that Torchscript can recognize what's going on
self.heads: List[torch.Tensor] = [torch.rand(1)] * self.deep_supr_num
if self.deep_supervision:
self.deep_supervision_heads = self.get_deep_supervision_heads()
self.check_deep_supr_num()
self.apply(self.initialize_weights)
self.check_kernel_stride()
def create_skips(index, downsamples, upsamples, bottleneck, superheads=None):
"""
Construct the UNet topology as a sequence of skip layers terminating with the bottleneck layer. This is
done recursively from the top down since a recursive nn.Module subclass is being used to be compatible
with Torchscript. Initially the length of `downsamples` will be one more than that of `superheads`
since the `input_block` is passed to this function as the first item in `downsamples`, however this
shouldn't be associated with a supervision head.
"""
if len(downsamples) != len(upsamples):
raise ValueError(f"{len(downsamples)} != {len(upsamples)}")
if len(downsamples) == 0: # bottom of the network, pass the bottleneck block
return bottleneck
if superheads is None:
next_layer = create_skips(1 + index, downsamples[1:], upsamples[1:], bottleneck)
return DynUNetSkipLayer(index, downsample=downsamples[0], upsample=upsamples[0], next_layer=next_layer)
super_head_flag = False
if index == 0: # don't associate a supervision head with self.input_block
rest_heads = superheads
else:
if len(superheads) > 0:
super_head_flag = True
rest_heads = superheads[1:]
else:
rest_heads = nn.ModuleList()
# create the next layer down, this will stop at the bottleneck layer
next_layer = create_skips(1 + index, downsamples[1:], upsamples[1:], bottleneck, superheads=rest_heads)
if super_head_flag:
return DynUNetSkipLayer(
index,
downsample=downsamples[0],
upsample=upsamples[0],
next_layer=next_layer,
heads=self.heads,
super_head=superheads[0],
)
return DynUNetSkipLayer(index, downsample=downsamples[0], upsample=upsamples[0], next_layer=next_layer)
if not self.deep_supervision:
self.skip_layers = create_skips(
0, [self.input_block] + list(self.downsamples), self.upsamples[::-1], self.bottleneck
)
else:
self.skip_layers = create_skips(
0,
[self.input_block] + list(self.downsamples),
self.upsamples[::-1],
self.bottleneck,
superheads=self.deep_supervision_heads,
)
def check_kernel_stride(self):
kernels, strides = self.kernel_size, self.strides
error_msg = "length of kernel_size and strides should be the same, and no less than 3."
if len(kernels) != len(strides) or len(kernels) < 3:
raise ValueError(error_msg)
for idx, k_i in enumerate(kernels):
kernel, stride = k_i, strides[idx]
if not isinstance(kernel, int):
error_msg = f"length of kernel_size in block {idx} should be the same as spatial_dims."
if len(kernel) != self.spatial_dims:
raise ValueError(error_msg)
if not isinstance(stride, int):
error_msg = f"length of stride in block {idx} should be the same as spatial_dims."
if len(stride) != self.spatial_dims:
raise ValueError(error_msg)
def check_deep_supr_num(self):
deep_supr_num, strides = self.deep_supr_num, self.strides
num_up_layers = len(strides) - 1
if deep_supr_num >= num_up_layers:
raise ValueError("deep_supr_num should be less than the number of up sample layers.")
if deep_supr_num < 1:
raise ValueError("deep_supr_num should be larger than 0.")
def check_filters(self):
filters = self.filters
if len(filters) < len(self.strides):
raise ValueError("length of filters should be no less than the length of strides.")
else:
self.filters = filters[: len(self.strides)]
[docs] def forward(self, x):
out = self.skip_layers(x)
out = self.output_block(out)
if self.training and self.deep_supervision:
out_all = [out]
for feature_map in self.heads:
out_all.append(interpolate(feature_map, out.shape[2:]))
return torch.stack(out_all, dim=1)
return out
def get_input_block(self):
return self.conv_block(
self.spatial_dims,
self.in_channels,
self.filters[0],
self.kernel_size[0],
self.strides[0],
self.norm_name,
self.act_name,
dropout=self.dropout,
)
def get_bottleneck(self):
return self.conv_block(
self.spatial_dims,
self.filters[-2],
self.filters[-1],
self.kernel_size[-1],
self.strides[-1],
self.norm_name,
self.act_name,
dropout=self.dropout,
)
def get_output_block(self, idx: int):
return UnetOutBlock(self.spatial_dims, self.filters[idx], self.out_channels, dropout=self.dropout)
def get_downsamples(self):
inp, out = self.filters[:-2], self.filters[1:-1]
strides, kernel_size = self.strides[1:-1], self.kernel_size[1:-1]
return self.get_module_list(inp, out, kernel_size, strides, self.conv_block) # type: ignore
def get_upsamples(self):
inp, out = self.filters[1:][::-1], self.filters[:-1][::-1]
strides, kernel_size = self.strides[1:][::-1], self.kernel_size[1:][::-1]
upsample_kernel_size = self.upsample_kernel_size[::-1]
return self.get_module_list(
inp, # type: ignore
out, # type: ignore
kernel_size,
strides,
UnetUpBlock, # type: ignore
upsample_kernel_size,
trans_bias=self.trans_bias,
)
def get_module_list(
self,
in_channels: List[int],
out_channels: List[int],
kernel_size: Sequence[Union[Sequence[int], int]],
strides: Sequence[Union[Sequence[int], int]],
conv_block: nn.Module,
upsample_kernel_size: Optional[Sequence[Union[Sequence[int], int]]] = None,
trans_bias: bool = False,
):
layers = []
if upsample_kernel_size is not None:
for in_c, out_c, kernel, stride, up_kernel in zip(
in_channels, out_channels, kernel_size, strides, upsample_kernel_size
):
params = {
"spatial_dims": self.spatial_dims,
"in_channels": in_c,
"out_channels": out_c,
"kernel_size": kernel,
"stride": stride,
"norm_name": self.norm_name,
"act_name": self.act_name,
"dropout": self.dropout,
"upsample_kernel_size": up_kernel,
"trans_bias": trans_bias,
}
layer = conv_block(**params)
layers.append(layer)
else:
for in_c, out_c, kernel, stride in zip(in_channels, out_channels, kernel_size, strides):
params = {
"spatial_dims": self.spatial_dims,
"in_channels": in_c,
"out_channels": out_c,
"kernel_size": kernel,
"stride": stride,
"norm_name": self.norm_name,
"act_name": self.act_name,
"dropout": self.dropout,
}
layer = conv_block(**params)
layers.append(layer)
return nn.ModuleList(layers)
def get_deep_supervision_heads(self):
return nn.ModuleList([self.get_output_block(i + 1) for i in range(self.deep_supr_num)])
@staticmethod
def initialize_weights(module):
if isinstance(module, (nn.Conv3d, nn.Conv2d, nn.ConvTranspose3d, nn.ConvTranspose2d)):
module.weight = nn.init.kaiming_normal_(module.weight, a=0.01)
if module.bias is not None:
module.bias = nn.init.constant_(module.bias, 0)
DynUnet = Dynunet = DynUNet
```