Source code for monailabel.tasks.scoring.epistemic_v2

# Copyright (c) MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import multiprocessing
import os
import time
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import torch
from monai.metrics.active_learning_metrics import VarianceMetric

from monailabel.interfaces.datastore import Datastore
from monailabel.interfaces.tasks.scoring import ScoringMethod
from monailabel.tasks.infer.basic_infer import BasicInferTask
from monailabel.utils.others.generic import name_to_device

logger = logging.getLogger(__name__)


[docs]class EpistemicScoring(ScoringMethod): """ First version of Epistemic computation used as active learning strategy """ def __init__( self, infer_task: BasicInferTask, max_samples=0, simulation_size=5, use_variance=False, key_output_entropy="epistemic_entropy", key_output_ts="epistemic_ts", ): super().__init__(f"Compute initial score based on dropout - {infer_task.description}") self.infer_task = infer_task self.dimension = infer_task.dimension self.max_samples = max_samples self.simulation_size = simulation_size self.use_variance = use_variance self.key_output_entropy = key_output_entropy self.key_output_ts = key_output_ts
[docs] def entropy_volume(self, vol_input): # The input is assumed with repetitions, channels and then volumetric data vol_input = vol_input.cpu().detach().numpy() if isinstance(vol_input, torch.Tensor) else vol_input vol_input = vol_input.astype(dtype="float32") dims = vol_input.shape reps = dims[0] entropy = np.zeros(dims[2:], dtype="float32") # Threshold values less than or equal to zero threshold = 0.00005 vol_input[vol_input <= 0] = threshold # Looping across channels as each channel is a class if len(dims) == 5 if self.dimension == 3 else 4: for channel in range(dims[1]): t_vol = np.squeeze( vol_input[:, channel, :, :, :] if self.dimension == 3 else vol_input[:, channel, :, :] ) t_sum = np.sum(t_vol, axis=0) t_avg = np.divide(t_sum, reps) t_log = np.log(t_avg) t_entropy = -np.multiply(t_avg, t_log) entropy = entropy + t_entropy else: t_vol = np.squeeze(vol_input) t_sum = np.sum(t_vol, axis=0) t_avg = np.divide(t_sum, reps) t_log = np.log(t_avg) t_entropy = -np.multiply(t_avg, t_log) entropy = entropy + t_entropy # Returns a 3D volume of entropy return entropy
[docs] def variance_volume(self, vol_input, ignore_nans=True): if ignore_nans: vol_input[vol_input <= 0] = 0.0005 vari = np.nanvar(vol_input.cpu().detach().numpy(), axis=0) # torch.var vs np.nanvar (ignore_nans) variance = np.sum(vari, axis=0) else: variance_metric = VarianceMetric(threshold=0.0005, spatial_map=True, scalar_reduction="sum") variance = variance_metric(vol_input) if self.dimension == 3: variance = np.expand_dims(variance, axis=0) variance = np.expand_dims(variance, axis=0) return variance
def __call__(self, request, datastore: Datastore): logger.info("Starting Epistemic Uncertainty scoring") model_file = self.infer_task.get_path() model_ts = int(os.stat(model_file).st_mtime) if model_file and os.path.exists(model_file) else 1 self.infer_task.clear_cache() # Performing Epistemic for all unlabeled images skipped = 0 unlabeled_images = datastore.get_unlabeled_images() max_samples = request.get("max_samples", self.max_samples) simulation_size = request.get("simulation_size", self.simulation_size) if simulation_size < 2: simulation_size = 2 logger.warning("EPISTEMIC:: Fixing 'simulation_size=2' as min 2 simulations are needed to compute entropy") logger.info(f"EPISTEMIC:: Total unlabeled images: {len(unlabeled_images)}; max_samples: {max_samples}") t_start = time.time() image_ids = [] for image_id in unlabeled_images: image_info = datastore.get_image_info(image_id) prev_ts = image_info.get("epistemic_ts", 0) if prev_ts == model_ts: skipped += 1 continue image_ids.append(image_id) image_ids = image_ids[:max_samples] if max_samples else image_ids max_workers = request.get("max_workers", 2) multi_gpu = request.get("multi_gpu", False) multi_gpus = request.get("gpus", "all") gpus = ( list(range(torch.cuda.device_count())) if not multi_gpus or multi_gpus == "all" else multi_gpus.split(",") ) device = name_to_device(request.get("device", "cuda")) device_ids = [f"cuda:{id}" for id in gpus] if multi_gpu else [device] max_workers = max_workers if max_workers else max(1, multiprocessing.cpu_count() // 2) max_workers = min(max_workers, multiprocessing.cpu_count()) if len(image_ids) > 1 and (max_workers == 0 or max_workers > 1): logger.info(f"MultiGpu: {multi_gpu}; Using Device(s): {device_ids}; Max Workers: {max_workers}") futures = [] with ThreadPoolExecutor(max_workers if max_workers else None, "ScoreInfer") as e: for image_id in image_ids: futures.append(e.submit(self.run_scoring, image_id, simulation_size, model_ts, datastore)) for future in futures: future.result() else: for image_id in image_ids: self.run_scoring(image_id, simulation_size, model_ts, datastore) summary = { "total": len(unlabeled_images), "skipped": skipped, "executed": len(image_ids), "latency": round(time.time() - t_start, 3), } logger.info(f"EPISTEMIC:: {summary}") self.infer_task.clear_cache() return summary
[docs] def run_scoring(self, image_id, simulation_size, model_ts, datastore): start = time.time() request = { "image": datastore.get_image_uri(image_id), "logging": "error", "cache_transforms": False, } accum_unl_outputs = [] for i in range(simulation_size): data = self.infer_task(request=request) pred = data[self.infer_task.output_label_key] if isinstance(data, dict) else None if pred is not None: logger.debug(f"EPISTEMIC:: {image_id} => {i} => pred: {pred.shape}; sum: {np.sum(pred)}") accum_unl_outputs.append(pred) else: logger.info(f"EPISTEMIC:: {image_id} => {i} => pred: None") accum = torch.stack(accum_unl_outputs) accum = torch.squeeze(accum) # Accum Expected shape for 2D images is (N, C, H, W) for 3D (N, C, H, W, D) # To handle cases where only a single class of segmentation is present, an extra dimension is added if self.dimension == 2 and len(accum.shape) == 3: accum = torch.unsqueeze(accum, dim=1) elif self.dimension == 3 and len(accum.shape) == 4: accum = torch.unsqueeze(accum, dim=1) entropy = self.variance_volume(accum) if self.use_variance else self.entropy_volume(accum) entropy = float(np.nanmean(entropy)) latency = time.time() - start logger.info( "EPISTEMIC:: {} => iters: {}; entropy: {}; latency: {};".format( image_id, simulation_size, round(entropy, 4), round(latency, 3), ) ) # Add epistemic_entropy in datastore info = {self.key_output_entropy: entropy, self.key_output_ts: model_ts} datastore.update_image_info(image_id, info)