Source code for cicada.analysis.cicada_analysis_nwb_wrapper

from cicada.analysis.cicada_analysis_format_wrapper import CicadaAnalysisFormatWrapper
from pynwb.ophys import ImageSegmentation, TwoPhotonSeries, Fluorescence, PlaneSegmentation, DfOverF
from pynwb.image import ImageSeries
from pynwb.base import TimeSeries
import os
from pynwb import NWBHDF5IO
import numpy as np

[docs]class CicadaAnalysisNwbWrapper(CicadaAnalysisFormatWrapper): DATA_FORMAT = "CI_DATA" WRAPPER_ID = "NWB_CI" """ Allows to communicate with the nwb format """ def __init__(self, data_ref, load_data=True): CicadaAnalysisFormatWrapper.__init__(self, data_ref=data_ref, data_format="nwb", load_data=load_data) self._nwb_data = None if self.load_data_at_init: self.load_data()
[docs] @staticmethod def is_data_valid(data_ref): """ Check if the data can be an input for this wrapper as data_ref Args: data_ref: file or directory Returns: a boolean """ if not os.path.isfile(data_ref): return False if data_ref.endswith(".nwb"): return True return False
[docs] def load_data(self): CicadaAnalysisFormatWrapper.load_data(self) io = NWBHDF5IO(self._data_ref, 'r') self._nwb_data =
# if we close it, then later on we have an exception such as: ValueError: Not a dataset (not a dataset) # io.close()
[docs] @staticmethod def grouped_by(): """ Indicate by which factor the data can be grouped ex: age, species, contains behavior, categories of age.. :return: a dictionary with key the name of the group (str) that will be displayed in the GUI, and as value the a sring representing either an argument or a method of the wrapper class (hasattr() and callable() will be use to get them) """ groups_dict = dict() groups_dict["Age"] = "age_sort" groups_dict["Genotype"] = "genotype" groups_dict["Species"] = "species" groups_dict["Subject ID"] = "subject_id" groups_dict["Sex"] = "sex" groups_dict["Weight"] = "weight" # with or without behavior and if from piezzo or camera and manual or citnps groups_dict["Behavior epochs status"] = "behavior_epochs_status" groups_dict["Raster status"] = "raster_status" groups_dict["Cell type status"] = "cell_types_status" groups_dict["CI pause status"] = "ci_pause_status" groups_dict["With red ins cell type"] = "with_red_ins_cell_type" groups_dict["ABF status"] = "abf_status" groups_dict["FOV plan location"] = "image_plane_location" groups_dict["Belt type"] = "belt_type" return groups_dict
# -------------------------------------------------------- # -------------------------------------------------------- # Methods for menu grouping sessions # -------------------------------------------------------- # -------------------------------------------------------- def behavior_epochs_status(self): behavior_epochs_names = self.get_behavioral_epochs_names() if len(behavior_epochs_names) == 0: return "No behavior epochs" # check if there is a behavior movie behavior_movies = self.get_behavior_movies() if len(behavior_movies) == 0: return "Behavior epochs from piezo" # then we look in epoch names from "twitches" it means it comes from CITNPS if "twitches" in behavior_epochs_names: return "Behavior epochs from CITNPS" return "Behavior epochs with manual GT"
[docs] def abf_status(self): """ Using neuronal data and timestamps to infer if the abf has been used to create the NWB :return: """ roi_response_series_dict = self.get_roi_response_series() for ophys_name, ophys_dict in roi_response_series_dict.items(): for fluo_name, roi_response_series in ophys_dict.items(): for roi_response_name in roi_response_series: if "raster" in roi_response_name.lower(): neuronal_data_timestamps = self.get_roi_response_serie_timestamps(keys=[ophys_name, fluo_name, roi_response_name], verbose=False) if (len(neuronal_data_timestamps) > 2) and \ (neuronal_data_timestamps[1] - neuronal_data_timestamps[0] != 1): return "With ABF" return "No ABF"
def with_red_ins_cell_type(self): cell_types = self.get_all_cell_types() if len(cell_types) == 0: return "Without red INs" if "red_ins" in cell_types: return "With red INs" return "Without red INs" def ci_pause_status(self): if 'ci_recording_on_pause' in self.get_intervals_names(): return "CI on pause epochs" return "No CI on pause" def raster_status(self): roi_response_series_dict = self.get_roi_response_series() for ophys_name, ophys_dict in roi_response_series_dict.items(): for fluo_name, roi_response_series in ophys_dict.items(): for roi_response_name in roi_response_series: if "raster" in roi_response_name.lower(): return "Raster available" return "no raster"
[docs] def cell_types_status(self): """ Return the cells types in a string, or a message saying no cell types :return: """ cell_types = self.get_all_cell_types() if len(cell_types) == 0: return "NA" return ", ".join(cell_types)
def image_plane_location(self): img_plan = self.get_imaging_plan_location(only_2_photons=True) if img_plan is None: return "NA" return img_plan def belt_type(self): belt_type = self.get_belt_type() return belt_type def age_sort(self): nwb_age = self._nwb_data.subject.age return nwb_age # -------------------------------------------------------- # -------------------------------------------------------- # End methods for menu grouping sessions # -------------------------------------------------------- # -------------------------------------------------------- @property def identifier(self): return self._nwb_data.identifier @property def age(self): """ Age of the subject (int), in days or months :return: None if age unknown, int otherwise, months or days is found with age_unit """ nwb_age = self._nwb_data.subject.age if nwb_age in [None, '', ' ']: return None else: int_age = nwb_age[1: -1] try: return int(int_age) except ValueError: return int_age @property def age_unit(self): """ Unit for the animal age :return:'D' for days, 'M' for months """ nwb_age = self._nwb_data.subject.age if nwb_age in [None, '', ' ']: return None else: age_scale = nwb_age[-1] return age_scale @property def genotype(self): """ Genotype of the subject :return: None if age unknown """ return self._nwb_data.subject.genotype @property def species(self): """ Species of the subject :return: None if age unknown """ return self._nwb_data.subject.species @property def subject_id(self): """ Id of the subject :return: None if subject_id unknown """ return self._nwb_data.subject.subject_id @property def session_id(self): """ Id of the subject :return: None if subject_id unknown """ return self._nwb_data.session_id @property def weight(self): """ Id of the subject :return: None if weight unknown """ weight = self._nwb_data.subject.weight if weight in [None, '', ' ']: return None else: return weight @property def sex(self): """ Sex (gender) of the subject :return: None if sex unknown """ return @property def lab(self): return self._nwb_data.lab @property def experimenter(self): if self._nwb_data.experimenter is not None: return self._nwb_data.experimenter[0]
[docs] def get_segmentations(self): """ Returns: a dict that for each step till plane_segmentation represents the different option. First dict will have as keys the name of the modules, then for each modules the value will be a new dict with keys the ImageSegmentation names and then the value will be a list representing the segmentation plane """ segmentation_dict = dict() for name_mod, mod in self._nwb_data.modules.items(): segmentation_dict[name_mod] = dict() no_keys_added = True for key, value in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of ImageSegmentation if isinstance(value, ImageSegmentation): no_keys_added = False image_seg = value # key is the name of segmentation, and value a list of plane_segmentation segmentation_dict[name_mod][key] = [] # print(f"get_segmentations {name_mod} key {key}") for plane_seg_name in image_seg.plane_segmentations.keys(): # print(f"get_segmentations plane_seg_name {plane_seg_name}") segmentation_dict[name_mod][key].append(plane_seg_name) if no_keys_added: del segmentation_dict[name_mod] # it could be empty, but if it would matter, it should have been check by method check_data in CicadaAnalysis return segmentation_dict
[docs] def get_signals_info(self): """ Returns: a dict that for each step till the TimeSeries name represents the different option. First dict will have as keys the name of the modules, then for each modules the value will be the name of the TimeSeries representing the signal """ signal_dict = dict() for name_mod, mod in self._nwb_data.modules.items(): signal_dict[name_mod] = [] no_keys_added = True for key, value in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of Timeseries if isinstance(value, TimeSeries): signal_dict[name_mod].append(key) no_keys_added = False if no_keys_added: del signal_dict[name_mod] return signal_dict
[docs] def get_signal_by_keyword(self, keyword, exact_keyword=False): """ Look for a signal with this keyword. Returns the first instance found that matches it. Args: keyword: (str) exact_keyword: (bool) if True, the name of the TimeSeries representing the signal should be the same as keyword, otherwise keyword should be in the name of the TimeSeries Returns: a two 1d array representing a signal and its timestamps. If no sginal with this keyword found, return None, None """ for name_mod, mod in self._nwb_data.modules.items(): for key, value in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of TimeSeries if isinstance(value, TimeSeries): valid_key = False if exact_keyword: if keyword == key: valid_key = True else: if keyword in key: valid_key = True if valid_key: data = np.array( timestamps = value.timestamps if timestamps is None: timestamps = (np.arange(0, len(data)) * (1/value.rate)) + value.starting_time return np.transpose(data), np.array(timestamps) return None, None
[docs] def get_signal_keys(self): """ Return a list of str representing the key of the signals available. Signals are TimeSeries instance registered as data interfaces in modules. Returns: """ keys = [] for name_mod, mod in self._nwb_data.modules.items(): for key, value in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of TimeSeries if isinstance(value, TimeSeries): keys.append(key) return keys
[docs] def get_roi_response_serie_data(self, keys): """ Args: keys: lsit of string allowing to get the roi repsonse series wanted Returns: """ if len(keys) < 3: return None if keys[0] not in self._nwb_data.modules: return None if keys[1] not in self._nwb_data.modules[keys[0]].data_interfaces: return None if keys[2] not in self._nwb_data.modules[keys[0]].data_interfaces[keys[1]].roi_response_series: return None return np.transpose(np.array(self._nwb_data.modules[keys[0]]. data_interfaces[keys[1]].roi_response_series[keys[2]].data))
[docs] def get_roi_response_serie_data_by_keyword(self, keys, keyword): """ Return a dict with other last key data Args: keys: list of string allowing to get the roi response series in data_interfaces keyword: Returns: """ if len(keys) < 2: return dict() if keys[0] not in self._nwb_data.modules: return dict() if keys[1] not in self._nwb_data.modules[keys[0]].data_interfaces: return dict() result_dict = dict() for key_data, rrs in self._nwb_data.modules[keys[0]].data_interfaces[keys[1]].roi_response_series.items(): if isinstance(keyword, str): if keyword in key_data: result_dict[key_data] = np.transpose(np.array( all_key_in_key_data = [] if isinstance(keyword, list): for key in keyword: if key in key_data: all_key_in_key_data.append('True') continue else: all_key_in_key_data.append('False') if all(all_key_in_key_data): result_dict[key_data] = np.transpose(np.array( return result_dict else: return dict() return result_dict
[docs] def get_all_cell_types(self): """ Return a list of all cell types identified in this session. If the list is empty, it means the type of the cells is not identified Returns: """ cell_types = [] for name_mod, mod in self._nwb_data.modules.items(): for key, fluorescence in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of pynwb.ophys.Fluorescence if isinstance(fluorescence, Fluorescence): for name_rrs, rrs in fluorescence.roi_response_series.items(): cell_type_names = rrs.control_description if cell_type_names is not None: cell_types.extend(list(cell_type_names)) # keeping only unique values return list(set(cell_types))
[docs] def get_cell_indices_by_cell_type(self, roi_serie_keys): """ Return a dict with key the cell_type name and value an array of int representing the cell indices of this type Args: roi_serie_keys: Returns: """ rrs = self._get_roi_response_serie(keys=roi_serie_keys) if rrs is None: return {} # rrs.control is an array (uint8) as long as n_cells, with a code for each cell type # rrs.control_description is the list of cell type names, as long as n_cells if rrs.control_description is not None: cell_type_names = list(set(rrs.control_description)) else: cell_type_names = [] code_by_cell_type = dict() for cell_type_name in cell_type_names: index = list(rrs.control_description).index(cell_type_name) code_by_cell_type[cell_type_name] = rrs.control[index] cell_type_names.sort() cell_indices_by_cell_type = dict() for cell_type_name in cell_type_names: code = code_by_cell_type[cell_type_name] cell_indices_by_cell_type[cell_type_name] = np.where(np.array(rrs.control) == code)[0] return cell_indices_by_cell_type
def _get_roi_response_serie(self, keys): """ Args: keys: list of string allowing to get the roi repsonse series wanted Returns: """ if len(keys) < 3: return None if keys[0] not in self._nwb_data.modules: return None if keys[1] not in self._nwb_data.modules[keys[0]].data_interfaces: return None if keys[2] not in self._nwb_data.modules[keys[0]].data_interfaces[keys[1]].roi_response_series: return None return self._nwb_data.modules[keys[0]].data_interfaces[keys[1]].roi_response_series[keys[2]]
[docs] def get_roi_response_serie_timestamps(self, keys, verbose=True): """ Args: keys: lsit of string allowing to get the roi repsonse series wanted verbose: print info Returns: """ if len(keys) < 3: return None if keys[0] not in self._nwb_data.modules: return None if keys[1] not in self._nwb_data.modules[keys[0]].data_interfaces: return None if keys[2] not in self._nwb_data.modules[keys[0]].data_interfaces[keys[1]].roi_response_series: return None rrs = self._nwb_data.modules[keys[0]].data_interfaces[keys[1]].roi_response_series[keys[2]] rrs_ts = rrs.timestamps if rrs_ts is not None: if verbose: print(f"Timestamps directly provided for this RoiResponseSeries ({keys[2]})") return rrs_ts else: if verbose: print(f"Timestamps not directly provided for this RoiResponseSeries ({keys[2]})") rrs_start_time = rrs.starting_time rrs_rate = rrs.rate if (rrs_rate is not None) and (rrs_rate < 1): if verbose: print(f"Found a rate of {np.round(rrs_rate, decimals=3)} Hz, assume it is in fact 1/rate. " f"New rate: {np.round(1 / rrs_rate, decimals=2)} Hz.") rrs_rate = 1 / rrs_rate if (rrs_start_time is not None) and (rrs_rate is not None): if verbose: print(f"Build timestamps from starting time ({rrs_start_time} s) and " f"rate ({np.round(rrs_rate, decimals=2)} Hz)") n_times =[0] rrs_ts = (np.arange(0, n_times) * (1/rrs_rate)) + rrs_start_time return rrs_ts else: if verbose: print(f"Starting time and rate not provided neither, no timestamps can be returned") return None
[docs] def get_roi_response_series(self, keywords_to_exclude=None): """ param: keywords_to_exclude: if not None, list of str, if one of neuronal data has this keyword, then we don't add it to the choices Returns: a list or dict of objects representing all roi response series (rrs) names rrs could represents raw traces, or binary raster, and its link to a given segmentation. The results returned should allow to identify the segmentation associated. Object could be strings, or a list of strings, that identify a rrs and give information how to get there. """ rrs_dict = dict() for name_mod, mod in self._nwb_data.modules.items(): rrs_dict[name_mod] = dict() for key, fluorescence in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of pynwb.ophys.Fluorescence if (isinstance(fluorescence, Fluorescence)) or (isinstance(fluorescence, DfOverF)): rrs_dict[name_mod][key] = [] for name_rrs, rrs in fluorescence.roi_response_series.items(): if keywords_to_exclude is not None: to_exclude = False for keyword in keywords_to_exclude: if keyword in name_rrs: to_exclude = True break if to_exclude: continue rrs_dict[name_mod][key].append(name_rrs) if len(rrs_dict[name_mod][key]) == 0: del rrs_dict[name_mod][key] if len(rrs_dict[name_mod]) == 0: del rrs_dict[name_mod] # then we remove modules without Fluorescence instances # keys_to_remove = [] # for key, value_dict in rrs_dict.items(): # if len(value_dict) == 0: # keys_to_remove.append(key) # for key in keys_to_remove: # del rrs_dict[key] return rrs_dict
[docs] def get_roi_response_series_list(self, keywords_to_exclude=None): """ param: keywords_to_exclude: if not None, list of str, if one of neuronal data has this keyword, then we don't add it to the list Returns: a list with all roi response series (rrs) names """ rrs_names_list = [] for name_mod, mod in self._nwb_data.modules.items(): for key, fluorescence in mod.data_interfaces.items(): # we want to check that the object in Module is an Instance of pynwb.ophys.Fluorescence if (isinstance(fluorescence, Fluorescence)) or (isinstance(fluorescence, DfOverF)): for name_rrs, rrs in fluorescence.roi_response_series.items(): if keywords_to_exclude is not None: to_exclude = False for keyword in keywords_to_exclude: if keyword in name_rrs: to_exclude = True break if to_exclude: continue rrs_names_list.append(name_rrs) rrs_list_names = np.unique(rrs_names_list) return rrs_list_names
[docs] def get_pixel_mask(self, segmentation_info): """ Return pixel_mask which is a list of list of pair of integers representing the pixels coordinate (x, y) for each cell. the list length is the same as the number of cells. Args: segmentation_info: a list of 3 elements: first one being the name of the module, then the name of image_segmentation and then the name of the segmentation plane. Returns: """ if len(segmentation_info) < 3: return None name_module = segmentation_info[0] mod = self._nwb_data.modules[name_module] name_mode = segmentation_info[1] name_plane_seg = segmentation_info[2] plane_seg = mod[name_mode].get_plane_segmentation(name_plane_seg) if 'pixel_mask' in plane_seg: print("Found pixel mask in plane segmentation") return plane_seg['pixel_mask'] else: print("No pixel mask found in plane segmentation") if 'image_mask' not in plane_seg: print(f"No image mask found in plane segmentation") return None else: print("Found image mask in plane segmentation: Convert it to pixel mask") img_mask = plane_seg['image_mask'] img_mask_data = img_mask[:] image_masks = list(img_mask_data) n_cells = len(image_masks) pixel_masks = [] print(f"{n_cells} image masks to convert to pixel masks") for cell in range(n_cells): if (cell > 0) and (cell % 250 == 0): print(f"{cell} cells converted") img_mask = image_masks[cell] pixel_mask = PlaneSegmentation.image_to_pixel(img_mask) pixel_masks.append(pixel_mask) return pixel_masks
[docs] def contains_ci_movie(self, consider_only_2_photons): """ Indicate if the data object contains at least one calcium imaging movie represented by an instance of pynwb.image.ImageSeries Args: consider_only_2_photons: boolean, it True means we consider only 2 photons calcium imaging movies, if other exists but not 2 photons, then False will be return Returns: True if it's the case, False otherwise """ # a TwoPhotonSeries is an instance of ImageSeries has_one = False for key, acquisition_data in self._nwb_data.acquisition.items(): if consider_only_2_photons: if isinstance(acquisition_data, TwoPhotonSeries): has_one = True else: if isinstance(acquisition_data, ImageSeries): has_one = True if not has_one: return False return True
[docs] def get_behavior_movies(self, key_to_identify="behavior"): """ Return a dict with as key a string identifying the movie, and as value a dict of behavior movies a string as file_name if external, or a 3d array Args: key_to_identify: string, key to identify that a movie is a behavior movie Returns: """ behavior_movies_dict = dict() # print(f"self._nwb_data.acquisition.keys() {list(self._nwb_data.acquisition.keys())}") for key, acquisition_data in self._nwb_data.acquisition.items(): if key_to_identify not in key: continue if isinstance(acquisition_data, ImageSeries): image_series = acquisition_data if image_series.format == "external": movie_file_name = image_series.external_file[0] movie_data = movie_file_name behavior_movies_dict[key] = movie_data return behavior_movies_dict
[docs] def get_ci_movies(self, only_2_photons): """ Return a dict with as key a string identifying the movie, and as value a dict of CI movies a string as file_name if external, or a 3d array Args: only_2_photons: return only the 2 photon movies Returns: """ ci_movies_dict = dict() for key, acquisition_data in self._nwb_data.acquisition.items(): if only_2_photons: if isinstance(acquisition_data, ImageSeries) and \ (not isinstance(acquisition_data, TwoPhotonSeries)): continue if isinstance(acquisition_data, ImageSeries): image_series = acquisition_data if image_series.format == "external": movie_file_name = image_series.external_file[0] movie_data = movie_file_name else: movie_data = ci_movies_dict[key] = movie_data return ci_movies_dict
[docs] def get_ci_movies_sample(self, only_2_photons, n_frames=2000): """ Return a dict with as key a string identifying the movie, and as value a dict of CI movies a string as file_name if external, or a 3d array Args: only_2_photons: return only the 2 photon movies n_frames: number of frames to take from movie to serve as data sample Returns: """ ci_movies_dict = dict() for key, acquisition_data in self._nwb_data.acquisition.items(): if only_2_photons: if isinstance(acquisition_data, ImageSeries) and \ (not isinstance(acquisition_data, TwoPhotonSeries)): continue if isinstance(acquisition_data, ImageSeries): image_series = acquisition_data if image_series.format == "external": movie_file_name = image_series.external_file[0] movie_data = movie_file_name else: if n_frames < len([:, 0, 0]): movie_data =[0:n_frames, :, :] else: movie_data =[:] ci_movies_dict[key] = movie_data return ci_movies_dict
[docs] def get_ci_movie_sampling_rate(self, only_2_photons=False, ci_movie_name=None): """ Args: only_2_photons: if True only 2 photons one are considere ci_movie_name: (string) if not None, return the sampling rate for a given ci_movie, otherwise the first one found Returns: (float) sampling rate of the movie, return None if no movie is found """ for key, acquisition_data in self._nwb_data.acquisition.items(): if ci_movie_name is not None and (key != ci_movie_name): continue if only_2_photons: if isinstance(acquisition_data, ImageSeries) and \ (not isinstance(acquisition_data, TwoPhotonSeries)): continue if isinstance(acquisition_data, ImageSeries): image_series = acquisition_data rate = image_series.rate if rate is not None: print(f"Sampling rate : {np.round(rate, 2)}") return image_series.rate return None
[docs] def get_rrs_sampling_rate(self, keys): """ Args: Returns: (float) sampling rate of the movie, return None if no sampling rate is found """ rrs = self._get_roi_response_serie(keys=keys) sampling_rate = rrs.rate if sampling_rate is not None: print(f"Sampling rate is directly provided in {keys[2]}") print(f"Sampling rate: {sampling_rate} Hz") return sampling_rate else: print(f"Sampling rate is not directly provided in {keys[2]}:" f" Estimate rate from timestamps of first 100 frames") rrs_ts_sample = rrs.timestamps[0: 100] if rrs_ts_sample is None: print(f"Found neither rate nor timestamps") return None else: sampling_rate = np.round(1 / (np.nanmedian(np.diff(rrs_ts_sample))), decimals=2) print(f"Sampling rate: {sampling_rate} Hz") return sampling_rate
[docs] def get_identifier(self, session_data): """ Get the identifier of one of the data to analyse Args: session_data: Data we want to know the identifier Returns: A hashable object identfying the data """ return session_data.identifier
[docs] def get_intervals_names(self): """ Return a list representing the intervals contains in this data Returns: """ if self._nwb_data.intervals is None: return [] # TODO: include that it can be a generic table with different stimulation epochs with the same name ie 'epochs' # each name can contain a table with different epochs # (see NWBs from Allen) intervals = [] for name_interval in self._nwb_data.intervals.keys(): intervals.append(name_interval) return intervals
[docs] def get_interval_as_data_frame(self, interval_name): """ Return an interval time as a pandas data frame. Args: interval_name: Name of the interval to retrieve Returns: None if the interval doesn't exists or a pandas data frame otherwise """ if interval_name not in self._nwb_data.intervals: return None return self._nwb_data.intervals[interval_name].to_dataframe()
[docs] def get_interval_times(self, interval_name): """ Return an interval times (start and stop in seconds) as a numpy array of 2*n_times. Args: interval_name: Name of the interval to retrieve Returns: None if the interval doesn't exists or a 2d array """ if interval_name not in self._nwb_data.intervals: return None df = self._nwb_data.intervals[interval_name].to_dataframe() # TODO: See to make it more modulable in case someone will use another name if ("start_time" not in df) or \ ("stop_time" not in df): return None # TODO: include that it can be a generic table with different stimulation epochs (see NWBs from Allen) # time series start_time_ts = df["start_time"] stop_time_ts = df["stop_time"] # it shouldn't be the case if len(start_time_ts) != len(stop_time_ts): print(f"len(start_time_ts) {len(start_time_ts)} != {len(stop_time_ts)} len(stop_time_ts)") return None data = np.zeros((2, len(start_time_ts))) data[0] = np.array(start_time_ts) data[1] = np.array(stop_time_ts) return data
[docs] def get_behavioral_epochs_names(self): """ The name of the different behavioral Returns: """ if 'behavior' not in self._nwb_data.processing: if 'Behavior' not in self._nwb_data.processing: return [] try: behavior_nwb_module = self._nwb_data.processing['behavior'] except KeyError: behavior_nwb_module = self._nwb_data.processing['Behavior'] try: behavioral_epochs = behavior_nwb_module.get(name='BehavioralEpochs') except KeyError: return [] # a dictionary containing the IntervalSeries in this BehavioralEpochs container interval_series = behavioral_epochs.interval_series return list(interval_series.keys())
[docs] def get_behavioral_epochs_times(self, epoch_name): """ Return an interval times (start and stop in seconds) as a numpy array of 2*n_times. Args: epoch_name: Name of the interval to retrieve Returns: None if the interval doesn't exists or a 2d array """ if 'behavior' not in self._nwb_data.processing: if 'Behavior' not in self._nwb_data.processing: return [] try: behavior_nwb_module = self._nwb_data.processing['behavior'] except KeyError: behavior_nwb_module = self._nwb_data.processing['Behavior'] try: behavioral_epochs = behavior_nwb_module.get(name='BehavioralEpochs') except KeyError: return None # a dictionary containing the IntervalSeries in this BehavioralEpochs container interval_series = behavioral_epochs.interval_series if epoch_name not in interval_series: return None interval_serie = interval_series[epoch_name] # data: >0 if interval started, <0 if interval ended. # timestamps: Timestamps for samples stored in data # so far we use only one type of integer, but otherwise as describe in the doc: """ Stores intervals of data. The timestamps field stores the beginning and end of intervals. The data field stores whether the interval just started (>0 value) or ended (<0 value). Different interval types can be represented in the same series by using multiple key values (eg, 1 for feature A, 2 for feature B, 3 for feature C, etc). The field data stores an 8-bit integer. This is largely an alias of a standard TimeSeries but that is identifiable as representing time intervals in a machine-readable way. """ data = time_stamps = interval_serie.timestamps data = np.zeros((2, int(len(time_stamps) / 2))) index_data = 0 for i in np.arange(0, len(time_stamps), 2): data[0, index_data] = time_stamps[i] data[1, index_data] = time_stamps[i+1] index_data += 1 return data
[docs] def get_interval_original_frames(self, interval_name): """ Return an interval times (start and stop in frames) as a numpy array of 2*n_times. Args: interval_name: Name of the interval to retrieve Returns: None if the interval doesn't exists or a 2d array """ if interval_name not in self._nwb_data.intervals: return None df = self._nwb_data.intervals[interval_name].to_dataframe() # TODO: See to make it more modulable in case someone will use another name if ("start_original_frame" not in df) or \ ("stop_original_frame" not in df): return None # time series start_frame = df["start_original_frame"] stop_frame = df["stop_original_frame"] # it shouldn't be the case if len(start_frame) != len(stop_frame): print(f"len(start_time_ts) {len(start_frame)} != {len(stop_frame)} len(stop_time_ts)") return None data = np.zeros((2, len(start_frame))) data[0] = np.array(start_frame) data[1] = np.array(stop_frame) return data
def __str__(self): """ Return a string representing the session. Here session.identifier :return: """ return self._nwb_data.identifier
[docs] def get_behaviors_movie_time_stamps(self): """ return a dict with key the cam id and value np.array with the timestamps of each frame of the behavior movie return None if non available Returns: """ time_stamps_dict = dict() for name, acquisition_data in self._nwb_data.acquisition.items(): if name.startswith("cam_"): time_stamps_dict[name] = np.array(acquisition_data.timestamps) return time_stamps_dict
[docs] def get_ci_movie_time_stamps(self): """ return a np.array with the timestamps of each frame of the CI movie return None if non available Returns: """ if "ci_frames" not in self._nwb_data.acquisition: print(f"No 'ci frames' time serie in this NWB") return None ci_frames_time_serie = self._nwb_data.acquisition["ci_frames"] ci_frames_ts = ci_frames_time_serie.timestamps ci_frames_ts = ci_frames_ts[:] return ci_frames_ts
[docs] def get_timestamps_range(self): """ Return a tuple of float representing the first and last time stamp with movie recording (behavior or ci movie) Returns: """ min_time_stamp = None max_time_stamp = 0 ci_movie_time_stamps = self.get_ci_movie_time_stamps() if ci_movie_time_stamps is not None: max_time_stamp = max(max_time_stamp, np.max(ci_movie_time_stamps)) min_time_stamp = np.min(ci_movie_time_stamps) behavior_time_stamps_dict = self.get_behaviors_movie_time_stamps() for behavior_name, behavior_time_stamps in behavior_time_stamps_dict.items(): max_time_stamp = max(max_time_stamp, np.max(behavior_time_stamps)) if min_time_stamp is None: min_time_stamp = np.min(behavior_time_stamps) else: min_time_stamp = min(min_time_stamp, np.min(behavior_time_stamps)) return min_time_stamp, max_time_stamp
[docs] def get_mouse_speed_info(self): """ Returns: """ # First start with what we were doing (changed 17/02/2022 in convert_abf_to_nwb) scan_nwb_file = False if "running_speed" in self._nwb_data.acquisition: print("Found 'running_speed' in acquisitions") mouse_speed = self._nwb_data.acquisition["running_speed"] # print(f"mouse_speed: {mouse_speed}") if mouse_speed.description is not None: print(f"Description: {mouse_speed.description}") speed = # print(f"speed: {speed}") speed_by_time = speed[:] if speed_by_time is not None: return np.transpose(speed_by_time) else: return None # Else look in 'BehavioralTimeSeries' (inspired by NWB from Giocomo lab NWBs found on DANDI) else: print("No 'speed' found as acquisition: Look for 'Speed' in " "BehavioralTimeSeries in a 'behavior' processsing module") bhv_time_series_names = self.get_behavioral_time_series_names() good_name = None for bhv_time_series_name in bhv_time_series_names: if 'speed' in bhv_time_series_name: good_name = bhv_time_series_name break if good_name is not None: print(f"Get the '{good_name}' time series from 'BehavioralTimeSeries'") bhv = self._nwb_data.processing['behavior'] bhv_timeseries = bhv.get(name='BehavioralTimeSeries') time_series = bhv_timeseries.time_series speed_times_series = time_series.get(good_name) if speed_times_series.description is not None: print(f"Description: {speed_times_series.description}") speed_unit = speed_times_series.unit speed_data =[:] if speed_unit in ['m', 'm/s', 'm.s-1']: speed_data = speed_data * 100 return np.transpose(speed_data) else: print(f"No 'speed' found yet. Scan the NWB file") scan_nwb_file = True if scan_nwb_file: nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'speed' in data_to_take = obj break else: continue if data_to_take is None: print(f"No object with 'speed' in its name found, return None") return None else: speed_data =[:] prt = data_to_take.parent prt_name = prt_type = prt.neurodata_type prt_names_list = [prt_name] prt_type_list = [prt_type] while prt_type != "NWBFile": up_prt = prt.parent prt_names_list.append( prt_type_list.append(up_prt.neurodata_type) prt_type = up_prt.neurodata_type prt = up_prt print(f"Found a '{}' {data_to_take.neurodata_type}. " f"From 'parents' named : {prt_names_list} with type: {prt_type_list}") return np.transpose(speed_data)
[docs] def get_mouse_position_info(self): """ Returns: """ if 'behavior' not in self._nwb_data.processing: return None scan_nwb_file = False # Start by what we are doing using the SpatialSeries in Position behavior_nwb_module = self._nwb_data.processing['behavior'] try_from_bhv_timeseries = False try: position_spatial_series = behavior_nwb_module.get(name='Position') except KeyError: try_from_bhv_timeseries = True position_spatial_series = None if try_from_bhv_timeseries is False: mouse_position_spatial_series = position_spatial_series.get_spatial_series(name="mouse_position") print(f"Get the 'mouse_position' spatial series from 'SpatialSeries' in 'Position'") if mouse_position_spatial_series.description is not None: print(f"Description: {mouse_position_spatial_series.description}") position_by_time =[:] if len(position_by_time) == 0: print(f"Length of found spatial series is 0, return None") return None return np.transpose(position_by_time) # Look for position in 'BehavioralTimeSeries' (Giocomo lab do this way) else: bhv_time_series_names = self.get_behavioral_time_series_names() good_name = None for bhv_time_series_name in bhv_time_series_names: if 'pos' in bhv_time_series_name: good_name = bhv_time_series_name break bhv = self._nwb_data.processing['behavior'] try: bhv_timeseries = bhv.get(name='BehavioralTimeSeries') except KeyError: print(f"No 'BehavioralTimeSeries' return None") return None time_series = bhv_timeseries.time_series if good_name is not None: print(f"Get the '{good_name}' time series from 'BehavioralTimeSeries'") position_times_series = time_series.get(good_name) if position_times_series.description is not None: print(f"Description: {position_times_series.description}") position =[:] if position_times_series.unit in ['m', 'm/s', 'm.s-1']: position = position * 100 return np.transpose(position) else: print(f"No 'Position' found yet. Scan the NWB file") scan_nwb_file = True if scan_nwb_file: nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'position' in data_to_take = obj break else: continue if data_to_take is None: print(f"No object with 'position' in its name found, return None") return None else: position_data =[:] prt = data_to_take.parent prt_name = prt_type = prt.neurodata_type prt_names_list = [prt_name] prt_type_list = [prt_type] while prt_type != "NWBFile": up_prt = prt.parent prt_names_list.append( prt_type_list.append(up_prt.neurodata_type) prt_type = up_prt.neurodata_type prt = up_prt print(f"Found a '{}' {data_to_take.neurodata_type}. " f"From 'parents' named : {prt_names_list} with type: {prt_type_list}") return np.transpose(position_data)
[docs] def get_opto_stimulation_time_serie(self): # TODO: put ogen in a much proper way when we'll do ogen experiment """ Returns: """ if "ogen_stim" not in self._nwb_data.acquisition: return None ogen_stim = self._nwb_data.acquisition["ogen_stim"] opto_stim = opto_stm_ts = ogen_stim.timestamps optogenetic_stim = opto_stim[:] optogenetic_stim_ts = opto_stm_ts[:] return optogenetic_stim, optogenetic_stim_ts
[docs] def get_acquisition_names(self): """ Returns: """ acquisitions = self._nwb_data.acquisition names = acquisitions.keys() names_list = list(names) return names_list
def get_ci_movie_dimension(self, only_2_photons=False, ci_movie_name=None): # Try to get dimensions from TwoPhotonSeries.dimension for key, acquisition_data in self._nwb_data.acquisition.items(): if ci_movie_name is not None and (key != ci_movie_name): continue if only_2_photons: if isinstance(acquisition_data, ImageSeries) and \ (not isinstance(acquisition_data, TwoPhotonSeries)): continue if isinstance(acquisition_data, ImageSeries): image_series = acquisition_data dimensions = image_series.dimension[:] # dimensions should be n_times x n_lines x n_cols ('z', 'y', 'x' when open in Fiji) if len(dimensions) == 3: print(f"Found 3 dimensions in 'TwoPhotonSeries.dimension': assume it is n_times x n_lines x n_cols") n_lines = image_series.dimension[1] n_cols = image_series.dimension[2] if len(dimensions) == 2: # mean it is likely that number of frames is not given print(f"Found only 2 dimensions in 'TwoPhotonSeries.dimension': " f"take them as n_lines x n_cols if it is a square") n_lines = image_series.dimension[0] n_cols = image_series.dimension[1] if n_lines != n_cols: print(f"The 2 dimensions are not the same: not sure which one is 'n_lines' / 'n_cols'") n_lines = None n_cols = None if (n_lines is not None) and (n_cols is not None): print(f"Dimensions extracted") return n_lines, n_cols else: print(f"Try to get the dimensions from 'tiff' movie shape") n_lines =[0, :, :].shape[0] n_cols =[0, :, :].shape[1] if (n_lines is not None) and (n_cols is not None): print(f"Dimensions of CI movie given from shape of 'tiff' data") return n_lines, n_cols else: print(f"No dimension returned") return None, None def get_ci_dimension_from_img_mask(self, segmentation_info): if len(segmentation_info) < 3: return None name_module = segmentation_info[0] mod = self._nwb_data.modules[name_module] name_mode = segmentation_info[1] name_plane_seg = segmentation_info[2] plane_seg = mod[name_mode].get_plane_segmentation(name_plane_seg) if 'image_mask' not in plane_seg: print("No 'image_mask' provided to extract image size") return None, None else: img_mask = plane_seg['image_mask'] img_mask_data = img_mask[:] image_masks = list(img_mask_data) first_mask = image_masks[0] n_lines, n_cols = first_mask.shape print(f"Dimensions of CI movie given by shape of the 'image_mask' of the cell 0") return n_lines, n_cols
[docs] def get_imaging_plan_location(self, only_2_photons=False, ci_movie_name=None): """ Args: only_2_photons: if True only 2 photons one are considere ci_movie_name: (string) if not None, return the sampling rate for a given ci_movie, otherwise the first one found Returns: (str) imaging plan location """ for key, acquisition_data in self._nwb_data.acquisition.items(): if ci_movie_name is not None and (key != ci_movie_name): continue if only_2_photons: if isinstance(acquisition_data, ImageSeries) and \ (not isinstance(acquisition_data, TwoPhotonSeries)): continue if isinstance(acquisition_data, ImageSeries): image_series = acquisition_data return image_series.imaging_plane.location return None
def get_belt_type(self): session_description = self._nwb_data.session_description if "uncued" in session_description: return "Uncued belt" elif "cued" in session_description: return "Cued belt" else: return 'NA' def get_sessions_info(self): info_dict = dict() if self.identifier is not None: session_identifier = str(self.identifier) else: session_identifier = 'Unknown identifier' info_dict['identifier'] = session_identifier if self.subject_id is not None: animal_id = self.subject_id else: animal_id = 'Unknown subject' info_dict['subject_id'] = animal_id if self.session_id is not None: session_id = self.session_id else: session_id = 'Unknown session' info_dict['session_id'] = session_id if self.age is not None: try: animal_age = int(self.age) except ValueError: animal_age = int(0) else: animal_age = int(0) info_dict['age'] = animal_age if self.age_unit is not None: age_unit = self.age_unit else: age_unit = 'NS' info_dict['age_unit'] = age_unit if self.weight is not None: animal_weight = self.weight if animal_weight[-1] == 'g': animal_weight = animal_weight[0: -1] if self.weight in ['na', 'NA']: animal_weight = float(0) animal_weight = float(animal_weight) else: animal_weight = float(0) info_dict['weight'] = animal_weight return info_dict
[docs] def get_behavioral_time_series_names(self): """ The name of the different behavioral Returns: """ if 'behavior' not in self._nwb_data.processing: return [] behavior_nwb_module = self._nwb_data.processing['behavior'] try: behavioral_timeseries = behavior_nwb_module.get(name='BehavioralTimeSeries') except KeyError: return [] # a dictionary containing the IntervalSeries in this BehavioralEpochs container time_series = behavioral_timeseries.time_series return list(time_series.keys())
def get_stimulus_names(self): # Based on Giocomo example not sure it is how it should be done if self._nwb_data.stimulus is None: print(f"No 'stimulus' found in this NWB") return None else: stim_names = self._nwb_data.stimulus.keys() return stim_names def has_epoch_table(self): nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'epoch' in data_to_take = obj break else: continue if data_to_take is None: return False else: return True def get_epochs_names_from_table(self): if not self.has_epoch_table(): return None else: # Get the epoch table object nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'epoch' in data_to_take = obj break else: continue column_names = data_to_take.colnames variable_columns = [] variable_columns_values = [] epoch_dicts = [] for col_index, col_name in enumerate(column_names): if col_name == 'start_time' or col_name == 'stop_time': continue else: col_val = data_to_take.get(col_name) col_val_data =[:] unique_col_val_data = np.unique(col_val_data[~np.isnan(col_val_data)]) is_variable = len(unique_col_val_data) > 1 is_index = len(np.unique(col_val_data[~np.isnan(col_val_data)])) == len([:]) is_times_series = 'timeseries' in col_name unique_col_val_data = list(unique_col_val_data) if is_variable and (not is_index) and (not is_times_series): variable_columns.append(col_name) variable_columns_values.append(unique_col_val_data) epoch_dict = dict(zip(variable_columns, variable_columns_values)) epoch_dicts.append(epoch_dict) return epoch_dicts def get_epochs_timestamps_from_table(self, requirements_dict): if not self.has_epoch_table(): return None else: # Get the epoch table object nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'epoch' in data_to_take = obj break else: continue epoch_data_frame = data_to_take.to_dataframe() for column_name, column_requirements in requirements_dict.items(): column_values_type = type(epoch_data_frame[column_name].values[0]) column_requirements = [column_values_type(requirement) for requirement in column_requirements] epoch_data_frame = epoch_data_frame.loc[epoch_data_frame[column_name].isin(column_requirements)] n_epochs = len(epoch_data_frame.index) epochs_timestamps = np.zeros((2, n_epochs)) epochs_timestamps[0, :] = np.array(epoch_data_frame.get('start_time').values[:]) epochs_timestamps[1, :] = np.array(epoch_data_frame.get('stop_time').values[:]) # Try to see if it is really timestamps and not frames epoch_start_timestamps_frac = [value % 1 for value in epochs_timestamps[0, :]] is_all_zero_starts = all(number == 0 for number in epoch_start_timestamps_frac) epoch_stop_timestamps_frac = [value % 1 for value in epochs_timestamps[1, :]] is_all_zero_stops = all(number == 0 for number in epoch_stop_timestamps_frac) if is_all_zero_starts and is_all_zero_stops: print(f"Seems like 'start_time' and 'stop_time' in epoch table are given in frames instead of seconds") time_unit = "frames" else: time_unit = "seconds" return epochs_timestamps, time_unit def has_trial_table(self): nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'trial' in data = obj if isinstance(data, TimeSeries): continue else: data_to_take = data break else: continue if data_to_take is None: return False else: return True def get_trial_table(self): if self.has_trial_table() is False: return None else: # Get the trial table object nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None trial_dicts = [] for ind, obj in enumerate(objects_list): if 'trial' in data = obj if isinstance(data, TimeSeries): continue else: data_to_take = data break else: continue trial_data_frame = data_to_take.to_dataframe() return trial_data_frame def get_trials_names_from_table(self): if self.has_trial_table() is False: return None else: # Get the trial table object nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None trial_dicts = [] for ind, obj in enumerate(objects_list): if 'trial' in data = obj if isinstance(data, TimeSeries): continue else: data_to_take = data break else: continue column_names = data_to_take.colnames variable_columns = [] variable_columns_values = [] for col_index, col_name in enumerate(column_names): if col_name == 'start_time' or col_name == 'stop_time' or col_name == 'reaction_time': continue else: col_val = data_to_take.get(col_name) col_val_data =[:] if isinstance(col_val_data[0], str): unique_col_val_data = np.unique(col_val_data) else: unique_col_val_data = np.unique(col_val_data[~np.isnan(col_val_data)]) is_variable = len(unique_col_val_data) > 1 is_index = len(unique_col_val_data) == len([:]) is_times_series = 'timeseries' in col_name unique_col_val_data = list(unique_col_val_data) if is_variable and (not is_index) and (not is_times_series): variable_columns.append(col_name) variable_columns_values.append(unique_col_val_data) trial_dict = dict(zip(variable_columns, variable_columns_values)) trial_dicts.append(trial_dict) return trial_dicts def get_trials_timestamps_from_table(self, requirements_dict): if not self.has_trial_table(): return None else: # Get the epoch table object nwb_objects = self._nwb_data.objects objects_list = [data for key, data in nwb_objects.items()] data_to_take = None for ind, obj in enumerate(objects_list): if 'trial' in data = obj if isinstance(data, TimeSeries): continue else: data_to_take = data break else: continue trial_data_frame = data_to_take.to_dataframe() for column_name, column_requirements in requirements_dict.items(): column_values_type = type(trial_data_frame[column_name].values[0]) column_requirements = [column_values_type(requirement) for requirement in column_requirements] trial_data_frame = trial_data_frame.loc[trial_data_frame[column_name].isin(column_requirements)] n_epochs = len(trial_data_frame.index) epochs_timestamps = np.zeros((2, n_epochs)) epochs_timestamps[0, :] = np.array(trial_data_frame.get('start_time').values[:]) epochs_timestamps[1, :] = np.array(trial_data_frame.get('stop_time').values[:]) # Try to see if it is really timestamps and not frames epoch_start_timestamps_frac = [value % 1 for value in epochs_timestamps[0, :]] is_all_zero_starts = all(number == 0 for number in epoch_start_timestamps_frac) epoch_stop_timestamps_frac = [value % 1 for value in epochs_timestamps[1, :]] is_all_zero_stops = all(number == 0 for number in epoch_stop_timestamps_frac) if is_all_zero_starts and is_all_zero_stops: print(f"Seems like 'start_time' and 'stop_time' in trial table are given in frames instead of seconds") time_unit = "frames" else: time_unit = "seconds" return epochs_timestamps, time_unit
[docs] def get_behavioral_events_names(self): """ The name of the different behavioral Returns: """ if 'behavior' not in self._nwb_data.processing: if 'Behavior' not in self._nwb_data.processing: return [] try: behavior_nwb_module = self._nwb_data.processing['behavior'] except KeyError: behavior_nwb_module = self._nwb_data.processing['Behavior'] try: behavioral_events = behavior_nwb_module.get(name='BehavioralEvents') except KeyError: return [] # a dictionary containing the TimeSeries in this BehavioralEvents container time_series = behavioral_events.time_series return list(time_series.keys())
[docs] def get_behavioral_events_times(self, event_name): """ The name of the different behavioral Returns: """ if 'behavior' not in self._nwb_data.processing: if 'Behavior' not in self._nwb_data.processing: return [] try: behavior_nwb_module = self._nwb_data.processing['behavior'] except KeyError: behavior_nwb_module = self._nwb_data.processing['Behavior'] try: behavioral_events = behavior_nwb_module.get(name='BehavioralEvents') except KeyError: return [] # a dictionary containing the TimeSeries in this BehavioralEvents container time_series = behavioral_events.time_series events_time_serie = time_series[event_name] one_d_event_timestamps = events_time_serie.timestamps[:] event_timestamps = np.tile(one_d_event_timestamps, (2, 1)) return event_timestamps