Source code for cicada.preprocessing.convert_suite_2p_rois_to_nwb

from cicada.preprocessing.convert_to_nwb import ConvertToNWB
from cicada.preprocessing.utils import load_tiff_movie_in_memory, update_frames_to_add
from pynwb.ophys import ImageSegmentation, Fluorescence
import numpy as np
from PIL import ImageSequence
import PIL
import PIL.Image as pil_image
import os


[docs]class ConvertSuite2pRoisToNWB(ConvertToNWB): """Class to convert ROIs data from Suite2P to NWB if raw_traces from Suite2p are available they are loaded. Otherwise if the movie is available, build the raw_traces. create_roi_response_series """ def __init__(self, nwb_file): super().__init__(nwb_file)
[docs] def convert(self, **kwargs): """Convert the data and add to the nwb_file Args: **kwargs: arbitrary arguments """ super().convert(**kwargs) if "suite2p_dir" not in kwargs: raise Exception(f"'suite2p_dir' argument should be pass to convert " f"function in class {self.__class__.__name__}") suite2p_dir = kwargs["suite2p_dir"] # looking for the motion_corrected_ci_movie, return None if it doesn't exists # TODO: take in consideration the movie is not available # then don't construct image mask and don't build raw-traces, use F.npy is available image_series = self.nwb_file.acquisition.get("motion_corrected_ci_movie") mod = self.nwb_file.create_processing_module('ophys', 'contains optical physiology processed data') img_seg = ImageSegmentation(name="segmentation_suite2p") mod.add_data_interface(img_seg) imaging_plane = self.nwb_file.get_imaging_plane("my_imgpln") ci_sampling_rate = imaging_plane.imaging_rate # description, imaging_plane, name=None ps = img_seg.create_plane_segmentation(description='output from segmenting', imaging_plane=imaging_plane, name='my_plane_seg', reference_images=image_series) stat = np.load(os.path.join(suite2p_dir, "stat.npy"), allow_pickle=True) is_cell = np.load(os.path.join(suite2p_dir, "iscell.npy"), allow_pickle=True) # TODO: load f.npy for raw_traces if available if image_series.format == "tiff": dim_y, dim_x = image_series.data.shape[1:] n_frames = image_series.data.shape[0] print(f"dim_y, dim_x: {image_series.data.shape[1:]}") elif image_series.format == "external": im = PIL.Image.open(image_series.external_file[0]) n_frames = len(list(ImageSequence.Iterator(im))) dim_y, dim_x = np.array(im).shape print(f"dim_y, dim_x: {np.array(im).shape}") else: raise Exception(f"Format of calcium movie imaging {image_series.format} not yet implemented") n_cells = 0 # Add rois for cell in np.arange(len(stat)): if is_cell[cell][0] == 0: continue n_cells += 1 pix_mask = [(x, y, 1) for x, y in zip(stat[cell]["xpix"], stat[cell]["ypix"])] image_mask = np.zeros((dim_y, dim_x)) for pix in pix_mask: image_mask[pix[1], pix[0]] = pix[2] # we can id to identify the cell (int) otherwise it will be incremented at each step ps.add_roi(pixel_mask=pix_mask, image_mask=image_mask) fl = Fluorescence(name="fluorescence_suite2p") mod.add_data_interface(fl) rt_region = ps.create_roi_table_region('all cells', region=list(np.arange(n_cells))) if image_series.format == "external": print(f"external: {image_series.external_file[0]}") if image_series.external_file[0].endswith(".tiff") or \ image_series.external_file[0].endswith(".tif"): frames_to_add = dict() # print(f"ci_sampling_rate {ci_sampling_rate}") update_frames_to_add(frames_to_add=frames_to_add, nwb_file=self.nwb_file, ci_sampling_rate=ci_sampling_rate) ci_movie = load_tiff_movie_in_memory(image_series.external_file[0], frames_to_add=frames_to_add) else: raise Exception(f"Calcium imaging format not supported yet {image_series.external_file[0]}") else: ci_movie = image_series.data if ci_movie is not None: raw_traces = np.zeros((n_cells, ci_movie.shape[0])) for cell in np.arange(n_cells): img_mask = ps['image_mask'][cell] img_mask = img_mask.astype(bool) raw_traces[cell, :] = np.mean(ci_movie[:, img_mask], axis=1) print(f"######## fl.create_roi_response_series raw_traces") rrs = fl.create_roi_response_series(name='raw_traces', data=raw_traces, unit='lumens', rois=rt_region, timestamps=np.arange(n_frames), description="raw traces")