Source code for mlca.utilities

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# utilities.py
"""
Utilities for DVHA-MLCA
"""
# Copyright (c) 2016-2021 Dan Cutright
# This file is part of DVH Analytics MLC Analyzer, released under a BSD license
#    See the file LICENSE included with this distribution, also
#    available at https://github.com/cutright/DVHA-MLCA

import argparse
from datetime import datetime
from mlca._version import __version__
import numpy as np
import pydicom
from os import walk
from os.path import join
from mlca.options import DEFAULT_OPTIONS
from multiprocessing import Pool
from tqdm import tqdm
import warnings
import csv


[docs]def get_xy_path_lengths(shapely_object): """Get the x and y path lengths of a Shapely object Parameters ---------- shapely_object : GeometryCollection, MultiPolygon, Polygon A shapely polygon-like object Returns ------- list Perimeter lengths in the x and y directions """ path = np.array([0.0, 0.0]) if shapely_object.type == "GeometryCollection": for geometry in shapely_object.geoms: if geometry.type in {"MultiPolygon", "Polygon"}: path = np.add(path, get_xy_path_lengths(geometry)) elif shapely_object.type == "MultiPolygon": for shape in shapely_object: path = np.add(path, get_xy_path_lengths(shape)) elif shapely_object.type == "Polygon": x, y = np.array(shapely_object.exterior.xy[0]), np.array( shapely_object.exterior.xy[1] ) path = np.array( [np.sum(np.abs(np.diff(x))), np.sum(np.abs(np.diff(y)))] ) return path.tolist()
[docs]def flatten_list_of_lists(some_list, remove_duplicates=False, sort=False): """Convert a list of lists into one list of all values Parameters ---------- some_list : list a list such that each element is a list remove_duplicates : bool, optional if True, return a unique list, otherwise keep duplicated values sort : bool, optional if True, sort the list Returns ------- list A new list containing all values in ``some_list`` """ data = [item for sublist in some_list for item in sublist] if remove_duplicates: if sort: return list(set(data)) else: ans = [] for value in data: if value not in ans: ans.append(value) return ans elif sort: return sorted(data) return data
[docs]def get_file_paths(init_dir): """Find all files in a directory and sub-directories Parameters ---------- init_dir : str Top-level directory to search for files Returns ------- list Absolute file paths """ file_paths = [] # iterate through files and all sub-directories for dirName, subdirList, fileList in walk(init_dir): for file_name in fileList: file_paths.append(join(dirName, file_name)) return file_paths
[docs]def is_file_dicom(file_path, modality=None, verbose=False): """ Parameters ---------- file_path : str File path to potential DICOM file modality : str, optional Return False if file is not this Modality (0008,0060) verbose : bool, optional Print results to terminal Returns ------- bool True if file_path points to a DICOM file, will return False if SOPClassUID (0008,0016) is not found """ kwargs = {"stop_before_pixels": True, "force": True} try: with warnings.catch_warnings(): warnings.simplefilter("ignore") ds = pydicom.read_file(file_path, **kwargs) # Assuming SOPClassUID is a required tag if modality is None and "SOPClassUID" in ds: if verbose: print("DICOM File Found: %s" % file_path) return True if ds.Modality.upper() == modality.upper(): if verbose: print("DICOM %s File Found: %s" % (modality, file_path)) return True except Exception as e: if verbose: print("Non-DICOM File Found: %s" % file_path) print(str(e)) return False
[docs]def get_dicom_files(file_paths, modality=None, verbose=False, processes=1): """Find all DICOM-RT Plan files in a list of file paths Parameters ---------- file_paths : list A list of file paths modality : str, optional Specify Modality (0008,0060) verbose : bool, optional Print results to terminal processes : int Number of processes for multiprocessing. Returns ------- list Absolute file paths to DICOM-RT Plans """ if processes == 1: return [f for f in file_paths if is_file_dicom(f, modality, verbose)] queue = [(f, modality, verbose) for f in file_paths] ans = run_multiprocessing(_get_dicom_files_worker, queue, processes) return [f for f in ans if f is not None]
def _get_dicom_files_worker(args): """Worker for get_dicom_files""" return args[0] if is_file_dicom(*args) else None
[docs]def run_multiprocessing(worker, queue, processes): """Parallel processing Parameters ---------- worker : callable single parameter function to be called on each item in queue queue : iterable A list of arguments for worker processes : int Number of processes for multiprocessing.Pool Returns ------- list List of returns from worker """ progress_kwargs = { "total": len(queue), "bar_format": "{desc:<5.5}{percentage:3.0f}%|{bar:30}{r_bar}", } data = [] with Pool(processes=processes) as pool: with tqdm(**progress_kwargs) as pbar: for item in pool.imap_unordered(worker, queue): data.append(item) pbar.update() return data
[docs]def create_cmd_parser(): """Get an argument parser for mlca.main Returns ------- argparse.ArgumentParser argument parser """ cmd_parser = argparse.ArgumentParser( description="Command line DVHA MLC Analyzer" ) cmd_parser.add_argument( "init_dir", nargs="?", help="Directory containing DICOM-RT Plan files", default=None, ) cmd_parser.add_argument( "-of", "--output-file", dest="output_file", help="Output will be saved as dvha_mlca_<version>_results_" "<time-stamp>.csv by default.", default=None, ) cmd_parser.add_argument( "-xw", "--x-weight", dest="complexity_weight_x", help="Complexity coefficient for x-dimension: default = %0.1f" % DEFAULT_OPTIONS["complexity_weight_x"], default=DEFAULT_OPTIONS["complexity_weight_x"], ) cmd_parser.add_argument( "-yw", "--y-weight", dest="complexity_weight_y", help="Complexity coefficient for y-dimension: default = %0.1f" % DEFAULT_OPTIONS["complexity_weight_y"], default=DEFAULT_OPTIONS["complexity_weight_y"], ) cmd_parser.add_argument( "-xs", "--x-max-field-size", dest="max_field_size_x", help="Maximum field size in the x-dimension: default = %0.1f (mm)" % DEFAULT_OPTIONS["max_field_size_x"], default=DEFAULT_OPTIONS["max_field_size_x"], ) cmd_parser.add_argument( "-ys", "--y-max-field-size", dest="max_field_size_y", help="Maximum field size in the y-dimension: default = %0.1f (mm)" % DEFAULT_OPTIONS["max_field_size_y"], default=DEFAULT_OPTIONS["max_field_size_y"], ) cmd_parser.add_argument( "-ver", "--version", dest="print_version", help="Print the DVHA-MLCA version", default=False, action="store_true", ) cmd_parser.add_argument( "-v", "--verbose", dest="verbose", help="Print final results and plan summaries as they are analyzed", default=False, action="store_true", ) cmd_parser.add_argument( "-n", "--processes", dest="processes", help="Enable multiprocessing, set number of parallel processes", default=1, ) return cmd_parser
[docs]def get_default_output_filename(): """Get the default output file name for mlca.main.process Returns ------- str dvha_mlca_<version>_results_<timestamp>.csv """ time_stamp = str(datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) return "dvha_mlca_%s_results_%s.csv" % ( __version__, time_stamp, )
[docs]def write_csv(file_path, rows, mode="w", newline=""): """Create csv.writer, call writerows(rows) Parameters ---------- file_path : str path to file rows : list, iterable Items to be written to file_pointer (input for csv.writer.writerows) mode : str optional string that specifies the mode in which the file is opened newline : str controls how universal newlines mode works. It can be None, '', '\n', '\r', and '\r\n' """ with open(file_path, mode, encoding="utf-8", newline=newline) as f: writer = csv.writer( f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL ) writer.writerows(rows)