from cicada.preprocessing.convert_to_nwb import ConvertToNWB
from cicada.preprocessing.utils import load_tiff_movie_in_memory, update_frames_to_add
from PIL import ImageSequence
import PIL
import PIL.Image as pil_image
import time
from pynwb.ophys import TwoPhotonSeries, OpticalChannel, CorrectedImageStack
import yaml
import numpy as np
import os
import hdf5storage
from pynwb.base import TimeSeries
from pynwb.device import Device
[docs]class ConvertCiMovieToNWB(ConvertToNWB):
"""Class to convert Calcium Imaging movies to NWB"""
def __init__(self, nwb_file):
"""
Initialize some parameters
Args:
nwb_file (NWB.File) : NWB file object
"""
super().__init__(nwb_file)
self.ci_sampling_rate = None
# dict with key an int representing the frame index after which add frames.
# the value is the number of frames to add (integer)
self.frames_to_add = dict()
[docs] def convert(self, **kwargs):
"""Convert the data and add to the nwb_file
Args:
**kwargs: arbitrary arguments
"""
super().convert(**kwargs)
# ### setting parameters ####
formats_implemented = ["external", "tiff"]
if not kwargs.get("format"):
raise Exception(f"'format' argument should be pass to convert function in class {self.__class__.__name__}")
elif kwargs["format"] not in formats_implemented:
raise Exception(f"'format' argument should have one of these values {formats_implemented} "
f"for the convert function in class {self.__class__.__name__}")
movie_format = kwargs["format"]
if not kwargs.get("motion_corrected_file_name"):
raise Exception(f"'motion_corrected_file_name' attribute should be pass to convert "
f"function in class {self.__class__.__name__}")
motion_corrected_file_name = kwargs["motion_corrected_file_name"]
if "original_movie_file_name" in kwargs:
original_movie_file_name = kwargs["original_movie_file_name"]
else:
original_movie_file_name = None
if "xy_translation_file_name" in kwargs:
xy_translation_file_name = kwargs["xy_translation_file_name"]
else:
xy_translation_file_name = None
# Open YAML file with metadata if existing then dump all data in a dict
if ("yaml_file_name" in kwargs) and kwargs["yaml_file_name"] is not None:
with open(kwargs["yaml_file_name"], 'r') as stream:
yaml_data = yaml.safe_load(stream)
else:
raise Exception(f"'yaml_file_name' attribute should be pass to convert "
f"function in class {self.__class__.__name__}")
# a calcium imaging rate has to be given, either trought the yaml file, either as argument
# self.ci_sampling_rate can be obtained by the abf_converter
if "ci_sampling_rate" in kwargs and (kwargs["ci_sampling_rate"] is not None):
self.ci_sampling_rate = kwargs["ci_sampling_rate"]
elif "imaging_rate" in yaml_data:
self.ci_sampling_rate = yaml_data["imaging_rate"]
else:
raise Exception(f"No 'imaging_rate' provided for movie {motion_corrected_file_name} in the yaml file "
f"{kwargs['yaml_file_name']} or throught argument 'ci_sampling_rate' to function convert() "
f"of the class {self.__class__.__name__}")
print(f"self.ci_sampling_rate {self.ci_sampling_rate}")
print(f"ConvertCiMovieToNWB: motion_corrected_file_name {motion_corrected_file_name}")
if "indicator" in yaml_data:
indicator = yaml_data["indicator"]
else:
raise Exception(f"No 'indicator' provided for movie {motion_corrected_file_name} in the yaml file "
f"{kwargs['yaml_file_name']}")
if "excitation_lambda" in yaml_data:
excitation_lambda = yaml_data["excitation_lambda"]
else:
raise Exception(f"No 'excitation_lambda' provided for movie {motion_corrected_file_name} in the yaml file "
f"{kwargs['yaml_file_name']}")
if "emission_lambda" in yaml_data:
emission_lambda = yaml_data["emission_lambda"]
else:
raise Exception(f"No 'emission_lambda' provided for movie {motion_corrected_file_name} in the yaml file "
f"{kwargs['yaml_file_name']}")
if "image_plane_location" in yaml_data:
image_plane_location = yaml_data["image_plane_location"]
else:
raise Exception(
f"No 'image_plane_location' provided for movie {motion_corrected_file_name} in the yaml file "
f"{kwargs['yaml_file_name']}")
update_frames_to_add(frames_to_add=self.frames_to_add, nwb_file=self.nwb_file,
ci_sampling_rate=self.ci_sampling_rate)
# ### end setting parameters ####
device = Device('2P_device')
self.nwb_file.add_device(device)
optical_channel = OpticalChannel('my_optchan', 'description', emission_lambda)
imaging_plane = self.nwb_file.create_imaging_plane(name='my_imgpln',
optical_channel=optical_channel,
description='a very interesting part of the brain',
device=device,
excitation_lambda=excitation_lambda,
imaging_rate=float(self.ci_sampling_rate),
indicator=indicator,
location=image_plane_location)
if movie_format != "external":
# not using self.frames_to_add
tiff_movie = load_tiff_movie_in_memory(motion_corrected_file_name,
frames_to_add=None)
n_frames, n_lines, n_cols = tiff_movie.shape
print(f"Movie dimensions n_frames, n_lines, n_cols :{n_frames, n_lines, n_cols}")
motion_corrected_img_series = TwoPhotonSeries(name='motion_corrected_ci_movie',
dimension=[n_frames, n_lines, n_cols],
data=tiff_movie,
imaging_plane=imaging_plane,
starting_frame=[0], format=movie_format,
rate=float(self.ci_sampling_rate),
unit="lux")
if original_movie_file_name is not None:
original_tiff_movie = load_tiff_movie_in_memory(original_movie_file_name,
frames_to_add=self.frames_to_add)
n_frames, n_lines, n_cols = tiff_movie.shape
print(f"Movie dimensions n_frames, n_lines, n_cols :{n_frames, n_lines, n_cols}")
original_img_series = TwoPhotonSeries(name='original_ci_movie',
dimension=[n_frames, n_lines, n_cols],
data=original_tiff_movie,
imaging_plane=imaging_plane,
starting_frame=[0], format=movie_format,
rate=float(self.ci_sampling_rate),
unit="lux")
else:
im = PIL.Image.open(motion_corrected_file_name)
n_frames = len(list(ImageSequence.Iterator(im)))
n_lines, n_cols = np.array(im).shape
print(f"Movie dimensions n_frames, n_lines, n_cols :{n_frames, n_lines, n_cols}")
motion_corrected_img_series = TwoPhotonSeries(name='motion_corrected_ci_movie',
dimension=[n_frames, n_lines, n_cols],
external_file=[motion_corrected_file_name],
imaging_plane=imaging_plane,
starting_frame=[0], format=movie_format,
rate=float(self.ci_sampling_rate),
unit="lux")
if original_movie_file_name is not None:
im = PIL.Image.open(original_movie_file_name)
n_lines, n_cols = np.array(im).shape
print(f"Movie dimensions n_frames, n_lines, n_cols :{n_frames, n_lines, n_cols}")
original_img_series = TwoPhotonSeries(name='original_ci_movie',
dimension=[n_frames, n_lines, n_cols],
external_file=[original_movie_file_name],
imaging_plane=imaging_plane,
starting_frame=[0], format=movie_format,
rate=float(self.ci_sampling_rate),
unit="lux")
self.nwb_file.add_acquisition(motion_corrected_img_series)
if original_movie_file_name is not None:
self.nwb_file.add_acquisition(original_img_series)
if xy_translation_file_name is not None:
if xy_translation_file_name.endswith(".mat"):
mvt_x_y = hdf5storage.loadmat(os.path.join(xy_translation_file_name))
x_shifts = mvt_x_y['xshifts'][0]
y_shifts = mvt_x_y['yshifts'][0]
elif xy_translation_file_name.endswith(".npy"):
ops = np.load(os.path.join(xy_translation_file_name))
ops = ops.item()
x_shifts = ops['xoff']
y_shifts = ops['yoff']
xy_translation = np.zeros((n_frames, 2), dtype="int16")
frame_index = 0
for frame in np.arange(len(x_shifts)):
xy_translation[frame_index, 0] = x_shifts[frame]
xy_translation[frame_index, 1] = y_shifts[frame]
# adding frames is necessary, in case the movie would be a concatenation of movie for exemple
if frame in self.frames_to_add:
frame_index += self.frames_to_add[frame]
xy_translation_time_series = TimeSeries(name="xy_translation", data=xy_translation)
corrected_image_stack = CorrectedImageStack(name="CorrectedImageStack",
corrected=motion_corrected_img_series,
original=original_img_series,
xy_translation=xy_translation_time_series)
self.nwb_file.add_acquisition(corrected_image_stack)