Source code for cicada.preprocessing.utils

import os
import sys
import ntpath
import numpy as np
from itertools import groupby
import collections
from operator import itemgetter
from pynwb import NWBHDF5IO, NWBContainer
from pynwb.file import Subject
from PIL import ImageSequence
from ScanImageTiffReader import ScanImageTiffReader
import PIL.Image as pil_image
import time

def get_subfiles(current_path, relative_path=False, depth=1):
    # Get all files in the last directory of the path
    subfiles = []
    for (dirpath, dirnames, filenames) in os.walk(current_path):
        if relative_path:
            filenames = [os.path.join(dirpath, filename) for filename in filenames]
        subfiles.extend(filenames)
        depth -= 1
        if depth == 0:
            break
    return subfiles


def get_subdirs(current_path, depth=1):
    # Get all directories in the last directory of the path
    subdirs = []
    for (dirpath, dirnames, filenames) in os.walk(current_path):
        subdirs.extend(dirnames)
        depth -= 1
        if depth == 0:
            break
    return subdirs

def path_leaf(path):
    head, tail = ntpath.split(path)
    return tail or ntpath.basename(head)

[docs]def get_continous_time_periods(binary_array): """ take a binary array and return a list of tuples representing the first and last position(included) of continuous positive period :param binary_array: :return: """ binary_array = np.copy(binary_array).astype("int8") n_times = len(binary_array) d_times = np.diff(binary_array) # show the +1 and -1 edges pos = np.where(d_times == 1)[0] + 1 neg = np.where(d_times == -1)[0] + 1 if (pos.size == 0) and (neg.size == 0): if len(np.nonzero(binary_array)[0]) > 0: return [(0, n_times-1)] else: return [] elif pos.size == 0: # i.e., starts on an spike, then stops return [(0, neg[0])] elif neg.size == 0: # starts, then ends on a spike. return [(pos[0], n_times-1)] else: if pos[0] > neg[0]: # we start with a spike pos = np.insert(pos, 0, 0) if neg[-1] < pos[-1]: # we end with aspike neg = np.append(neg, n_times - 1) # NOTE: by this time, length(pos)==length(neg), necessarily h = np.matrix([pos, neg]) # print(f"len(h[1][0]) {len(h[1][0])} h[1][0] {h[1][0]} h.size {h.size}") if np.any(h): result = [] for i in np.arange(h.shape[1]): if h[1, i] == n_times-1: result.append((h[0, i], h[1, i])) else: result.append((h[0, i], h[1, i]-1)) return result return []
[docs]def merging_time_periods(time_periods, min_time_between_periods): """ Take a list of pair of values representing intervals (periods) and a merging thresholdd represented by min_time_between_periods. If the time between 2 periods are under this threshold, then we merge those periods. It returns a new list of periods. :param time_periods: list of list of 2 integers or floats. The second value represent the end of the period, the value being included in the period. :param min_time_between_periods: a float or integer value :return: a list of pair of list. """ n_periods = len(time_periods) merged_time_periods = [] index = 0 while index < n_periods: time_period = time_periods[index] if len(merged_time_periods) == 0: merged_time_periods.append([time_period[0], time_period[1]]) index += 1 continue # we check if the time between both is superior at min_time_between_periods last_time_period = time_periods[index - 1] beg_time = last_time_period[1] end_time = time_period[0] if (end_time - beg_time) <= min_time_between_periods: # then we merge them merged_time_periods[-1][1] = time_period[1] index += 1 continue else: merged_time_periods.append([time_period[0], time_period[1]]) index += 1 return merged_time_periods
[docs]def class_name_to_module_name(class_name): """ Transform the string representing a class_name, by removing the upper case letters, and inserting before them an underscore if 2 upper case letters don't follow. Underscore are also inserted before numbers ex: ConvertAbfToNWB -> convert_abf_to_nwb :param class_name: string :return: """ if len(class_name) == 1: return class_name.lower() new_class_name = class_name[0] for index in range(1, len(class_name)): letter = class_name[index] if letter.isdigit(): # first we check if the previous letter was not a digit if class_name[index - 1].isupper(): new_class_name = new_class_name + letter continue new_class_name = new_class_name + "_" + letter continue if not letter.isupper(): new_class_name = new_class_name + letter continue # first we check if the previous letter was not upper if class_name[index - 1].isupper(): new_class_name = new_class_name + letter continue new_class_name = new_class_name + "_" + letter return new_class_name.lower()
[docs]def module_name_to_class_name(module_name): """ Transform the string representing a module_name, by removing underscores, , and transforming as upper cases the following letter. ex: convert_abf_to_nwb -> ConvertAbfToNwb :param module_name: string :return: """ if len(module_name) == 1: return module_name.upper() new_module_name = module_name[0].upper() for index in range(1, len(module_name)): letter = module_name[index] if letter == "_": continue if letter.isdigit(): new_module_name = new_module_name + letter continue if letter.islower(): if module_name[index - 1] == "_": new_module_name = new_module_name + letter.upper() continue new_module_name = new_module_name + letter return new_module_name
# def init_group_and_sort(nwb_path_list, param_list): # # """ # # Args: # nwb_path_list (list): List of absolute path to NWB files # param_list (list): List of parameters to sort/group by # # Returns: # result (list): values of param used to sort/group by # # """ # # param_map = ["age", "sex", "genotype", "species", "subject_id", "weight", "date_of_birth", # "session_start_time", "file_create_date", "experimenter", "session_id", "institution", "keywords", # "pharmacology", "protocol", "related_pulication", "surgery", "virus", "lab"] # # data_map = ["twophotonseries", "fluorescence", "roiresponseseries", "imagesegmentation", "planesegmentation", # "device", "imagingplan", "opticalchannel", "raster"] # # data_list = list() # # def check_containers(container): # # recursive research of all containers (with 'children' field) # children = getattr(container, 'children', None) # for child in children: # if isinstance(child, NWBContainer) and not isinstance(child, Subject): # data_list.append(str(type(child)).split("'")[1].split(".")[-1].lower()) # check_containers(child) # # # Extract data from NWB and then sort it # result = [] # for nwb_path in nwb_path_list: # nwb_result = [] # io = NWBHDF5IO(nwb_path, 'r') # nwb_file = io.read() # check_containers(nwb_file) # to get all containers # # for param in param_list: # if param in param_map: # param_in_metadata = getattr(nwb_file, param, None) # param_in_subject_metadata = None # if getattr(nwb_file, 'subject', None): # param_in_subject_metadata = getattr(nwb_file.subject, param, None) # # if param_in_metadata: # attrib = param_in_metadata # elif param_in_subject_metadata: # attrib = param_in_subject_metadata # else: # attrib = None # # elif param.lower() in data_map: # if param.lower() in data_list: # attrib = True # else: # attrib = False # # else: # attrib = None # # nwb_result.append(attrib) # nwb_result.append(nwb_file.identifier) # io.close() # result.append(nwb_result) # # return result # # def group_by_param(nwb_path_list, param_list): # # """ # Group NWB files depending on a list of parameters # # Args: # nwb_path_list (list): List of absolute path to NWB files # param_list (list): List of parameters to group by # # Returns: # grouped_list (list): List of NWB files grouped # param_value_list (list) : List of values of the parameter that decided each group # # """ # # result = init_group_and_sort(nwb_path_list, param_list) # # grouped_list = [] # param_value_list = [] # sorted_list = sorted(result, key=lambda x: (x is None, x)) # for k, g in groupby(sorted_list, itemgetter(0)): # t = list(zip(*g)) # param_value_list.append(t[0][0]) # grouped_list.append(list(t[len(t)-1])) # return grouped_list, param_value_list
[docs]class ComparableItem: """ Make it possible to sort a list of items of different types, such as int and string """ def __init__(self, value): self.value = value def __lt__(self, other): if isinstance(self.value, int) or isinstance(self.value, float): if isinstance(other.value, int) or isinstance(other.value, float): return self.value < other.value if isinstance(other.value, str): return True if isinstance(self.value, str): if isinstance(other.value, int) or isinstance(other.value, float): return False if isinstance(other.value, str): return self.value < other.value return True def __str__(self): return str(self.value) def __eq__(self, other): return self.value == other.value def __hash__(self): return self.value.__hash__()
[docs]def sort_by_param(nwb_path_list, param_list): """ Sort NWB files depending on a list of parameters Args: nwb_path_list (list): List of absolute path to NWB files param_list (list): List of parameters to sort by Returns: nwb_sorted_list (list): List of NWB files sorted """ result = init_group_and_sort(nwb_path_list, param_list) sorted_list = sorted(result, key=lambda x: (x is None, x)) nwb_sorted_list = [nwb[len(sorted_list[0])-1] for nwb in sorted_list] return nwb_sorted_list
[docs]def flatten(list): """ Flatten a nested list no matter the nesting level Args: list (list): List to flatten Returns: List without nest Examples: >>> flatten([1,2,[[3,4],5],[7]]) [1,2,3,4,5,7] """ if isinstance(list, collections.Iterable) and not isinstance(list, (str, bytes)): return [a for i in list for a in flatten(i)] else: return [list]
[docs]def load_tiff_movie_in_memory_using_pil(tif_movie_file_name, frames_to_add=None): """ Load tiff movie from filename using PIL library Args: tif_movie_file_name (str) : Absolute path to tiff movie frames_to_add: dict with key an int representing the frame index after which add frames. the value is the number of frames to add (integer) Returns: tiff_movie (array) : Tiff movie as 3D-array """ if frames_to_add is None: frames_to_add = dict() start_time_timer = time.time() im = pil_image.open(tif_movie_file_name) n_frames = len(list(ImageSequence.Iterator(im))) dim_y, dim_x = np.array(im).shape print(f"n_frames {n_frames}, dim_x {dim_x}, dim_y {dim_y}") if len(frames_to_add) > 0: n_frames += np.sum(list(frames_to_add.values())) tiff_movie = np.zeros((n_frames, dim_y, dim_x), dtype="uint16") frame_index = 0 for frame, page in enumerate(ImageSequence.Iterator(im)): tiff_movie[frame_index] = np.array(page) frame_index += 1 # adding blank frames if frame in frames_to_add: frame_index += frames_to_add[frame] stop_time_timer = time.time() print(f"Time for loading movie: " f"{np.round(stop_time_timer - start_time_timer, 3)} s") return tiff_movie
[docs]def load_tiff_movie_in_memory(tif_movie_file_name, frames_to_add=None): """ Load tiff movie from filename using Scan Image Tiff Args: tif_movie_file_name (str) : Absolute path to tiff movie Returns: tiff_movie (array) : Tiff movie as 3D-array """ if tif_movie_file_name is not None: print(f"Loading movie") try: if (frames_to_add is not None) and (len(frames_to_add) > 0): return load_tiff_movie_in_memory_using_pil(tif_movie_file_name, frames_to_add) else: raise AttributeError() except AttributeError: try: start_time = time.time() tiff_movie = ScanImageTiffReader(tif_movie_file_name).data() stop_time = time.time() print(f"Time for loading movie with scan_image_tiff: " f"{np.round(stop_time - start_time, 3)} s") except Exception as e: return load_tiff_movie_in_memory_using_pil(tif_movie_file_name) return tiff_movie
[docs]def update_frames_to_add(frames_to_add, nwb_file, ci_sampling_rate): """ Update frames_to_add (dict), based on pause_intervals and ci_frames_time_series Args: frames_to_add: dict, with key an int representing the frame index after which add frames. the value is the number of frames to add (integer) nwb_file: nwb file, will get nwb_file.intervals['ci_recording_on_pause'] and nwb_file.get_acquisition("ci_frames"), to update frames_to_add Returns: """ try: pause_intervals = nwb_file.intervals['ci_recording_on_pause'] pause_intervals_df = pause_intervals.to_dataframe() start_times = pause_intervals_df.loc[:, "start_time"] stop_times = pause_intervals_df.loc[:, "stop_time"] try: ci_frames_time_series = nwb_file.get_acquisition("ci_frames") ci_frames = np.where(ci_frames_time_series.data)[0] ci_frames_timestamps = ci_frames_time_series.timestamps[ci_frames] for i, start_time in enumerate(start_times): frame_index = np.searchsorted(a=ci_frames_timestamps, v=start_time) n_frames_to_add = (stop_times[i] - start_time) * ci_sampling_rate frames_to_add[frame_index] = int(n_frames_to_add) except KeyError: pass except KeyError: # if no 'ci_recording_on_pause' in nwb_file.intervals pass