Source code for seas.domains

#!/usr/bin/env python3
'''
Functions for creating and manipulating domain maps, created from maximum projections of independent components detected from mesoscale calcium imaging videos. 

Authors: Sydney C. Weiser
Date: 2019-06-16
'''
import numpy as np
import os
import scipy.ndimage
import cv2

from seas.hdf5manager import hdf5manager
from seas.video import save, rescale, rotate
from seas.ica import rebuild_mean_roi_timecourse, filter_mean
from seas.rois import make_mask
from seas.colormaps import save_colorbar, REGION_COLORMAP, DEFAULT_COLORMAP


[docs]def get_domain_map(components: dict, blur: int = 21, min_size_ratio: float = 0.1, map_only: bool = True, apply_filter_mean: bool = True, max_loops: int = 2, ignore_small: bool = True): ''' Creates a domain map from extracted independent components. A pixelwise maximum projection of the blurred signal components is taken through the n_components axis, to create a flattened representation of where a domain was maximally significant across the cortical surface. Components with multiple noncontiguous significant regions are counted as two distinct domains. Arguments: components: The dictionary of components returned from seas.ica.project. Domains are most interesting if artifacts has already been assigned through seas.gui.run_gui. blur: An odd integer kernel Gaussian blur to run before segmenting. Domains look smoother with larger blurs, but you can lose some smaller domains. map_only: If true, compute the map only, do not rebuild time courses under each domain. apply_filter_mean: Whether to compute the filtered mean when calculating ROI rebuild timecourses. min_size_ratio: The minimum size ratio of the mean component size to allow for a component. If a the size of a component is under (min_size_ratio x mean_domain_size), and the next most significant domain over the pixel would result in a larger size domain, this next domain is chosen. max_loops: The number of times to check if the next most significant domain would result in a larger domain size. To entirely disable this, set max_loops to 0. ignore_small: If True, assign undersize domains that were not reassigned during max_loops to np.nan. Returns: output: a dictionary containing the results of the operation, containing the following keys domain_blur: The Gaussian blur value used when generating the map component_assignment: A map showing the index of which *component* was maximally significant over a given pixel. Here, This is in contrast to the domain map, where each domain is a unique integer. domain_ROIs: The computed np.array domain map (x,y). Each domain is represented by a unique integer, and represents a discrete continuous unit. Values that are masked, or where large enough domains were not detected are set to np.nan. if not map_only, the following are also included in the output dictionary: ROI_timecourses: The time courses rebuilt from the video under each ROI. The frame mean is not included in this calculation, and must be re-added from mean_filtered. mean_filtered: The frame mean, filtered by the default method. ''' print('\nExtracting Domain ROIs\n-----------------------') output = {} output['domain_blur'] = blur eig_vec = components['eig_vec'].copy() shape = components['shape'] shape = (shape[1], shape[2]) if 'roimask' in components.keys() and components['roimask'] is not None: roimask = components['roimask'] maskind = np.where(roimask.flat == 1)[0] else: roimask = None if 'artifact_components' in components.keys(): artifact_components = components['artifact_components'] print('Switching to signal indices only for domain detection') if 'noise_components' in components.keys(): noise_components = components['noise_components'] signal_indices = np.where((artifact_components + noise_components) == 0)[0] else: print('no noise components found') signal_indices = np.where(artifact_components == 0)[0] eig_vec = eig_vec[:, signal_indices] if blur: print('blurring domains...') assert type(blur) is int, 'blur was not valid' if blur % 2 != 1: blur += 1 eigenbrain = np.empty(shape) eigenbrain[:] = np.NAN for index in range(eig_vec.shape[1]): if roimask is not None: eigenbrain.flat[maskind] = eig_vec.T[index] blurred = cv2.GaussianBlur(eigenbrain, (blur, blur), 0) eig_vec.T[index] = blurred.flat[maskind] else: eigenbrain.flat = eig_vec.T[index] blurred = cv2.GaussianBlur(eigenbrain, (blur, blur), 0) eig_vec.T[index] = blurred.flat domain_ROIs_vector = np.argmax(np.abs(eig_vec), axis=1).astype('float16') if blur: domain_ROIs_vector[np.isnan(eig_vec[:, 0])] = np.nan if roimask is not None: domain_ROIs = np.empty(shape) domain_ROIs[:] = np.NAN domain_ROIs.flat[maskind] = domain_ROIs_vector else: domain_ROIs = np.reshape(domain_ROIs_vector, shape) output['component_assignment'] = domain_ROIs.copy() # remove small domains, separate if more than one domain per component ndomains = np.nanmax(domain_ROIs) print('domain_ROIs max:', ndomains) _, size = np.unique(domain_ROIs[~np.isnan(domain_ROIs)].astype('uint16'), return_counts=True) meansize = size.mean() minsize = meansize * min_size_ratio def replaceindex(): if n_loops < max_loops: if roimask is not None: roislice = np.delete(eig_vec[np.where(cc.flat[maskind] == n + 1)[0], :], i, axis=1) else: roislice = np.delete(eig_vec[np.where(cc.flat == n + 1)[0], :], i, axis=1) newindices = np.argmax(np.abs(roislice), axis=1) newindices[newindices > i] += 1 domain_ROIs[np.where(cc == n + 1)] = newindices else: if ignore_small: domain_ROIs[np.where(cc == n + 1)] = np.nan n_loops = 0 while n_loops < max_loops: n_found = 0 for i in np.arange(np.nanmax(domain_ROIs) + 1, dtype='uint16'): roi = np.zeros(domain_ROIs.shape, dtype='uint8') roi[np.where(domain_ROIs == i)] = 1 cc, n_objects = scipy.ndimage.measurements.label(roi) if n_objects > 1: objects = scipy.ndimage.measurements.find_objects(cc) for n, obj in enumerate(objects): domain_size = np.where(cc[obj] == n + 1)[0].size if domain_size < minsize: n_found += 1 replaceindex() elif n_objects == 0: continue else: objects = scipy.ndimage.measurements.find_objects(cc) domain_size = np.where(roi == 1)[0].size if domain_size < minsize: n = 0 obj = objects[0] n_found += 1 replaceindex() n_loops += 1 print('n undersize objects found:', n_found, '\n') print('n domains', np.unique(domain_ROIs[~np.isnan(domain_ROIs)]).size) print('nanmax:', np.nanmax(domain_ROIs)) # split components with multiple centroids for i in np.arange(np.nanmax(domain_ROIs) + 1, dtype='uint16'): roi = np.zeros(domain_ROIs.shape, dtype='uint8') roi[np.where(domain_ROIs == i)] = 1 cc, n_objects = scipy.ndimage.measurements.label(roi) if n_objects > 1: objects = scipy.ndimage.measurements.find_objects(cc) for n, obj in enumerate(objects): if n > 0: ind = np.where(cc == n + 1) domain_ROIs[ind] = np.nanmax(domain_ROIs) + 1 print('n domains', np.unique(domain_ROIs[~np.isnan(domain_ROIs)]).size) print('nanmax:', np.nanmax(domain_ROIs)) # adjust indexing to remove domains with no spatial footprint domain_offset = np.diff(np.unique(domain_ROIs[~np.isnan(domain_ROIs)])) adjust_indices = np.where(domain_offset > 1)[0] for i in adjust_indices: domain_ROIs[np.where(domain_ROIs > i + 1)] -= (domain_offset[i] - 1) domain_offset = np.diff(np.unique(domain_ROIs[~np.isnan(domain_ROIs)])) if ('expmeta' in components.keys()): if 'rois' in components['expmeta'].keys(): padmask = get_padded_borders( domain_ROIs, blur, components['expmeta']['rois'], components['expmeta']['n_roi_rotations'], components['expmeta']['bounding_box']) domain_ROIs[np.where(padmask == 0)] = np.nan else: print('Couldnt make padded mask') output['domain_ROIs'] = domain_ROIs if not map_only: timecourseresults = get_domain_rebuilt_timecourses( domain_ROIs, components, apply_filter_mean=apply_filter_mean) for key in timecourseresults: output[key] = timecourseresults[key] else: print('not calculating domain time courses') return output
[docs]def save_domain_map(domain_ROIs: np.ndarray, basepath: str, blur_level: int, n_rotations: int = 0): ''' Saves domain maps to pngs for visualization. Two files are saved to basepath_xb.png and basepath_xb_edges.png. One is the visualization of the domain indices, saved in black and white, the other is just the edge visualization. Arguments: domain_ROIs: basepath: The path to save at, including everything but the file extension. blur_level: The Gaussian blur kernel size, only used for generating the image name n_rotations: The number of CCW rotations to implement before saving images Returns: Nothing. ''' if blur_level is not None: blurname = str(blur_level) else: blurname = '?' savepath = basepath + blurname + 'b.png' edges = get_domain_edges(domain_ROIs) domain_ROIs = rotate(domain_ROIs, n_rotations) edges = rotate(edges, n_rotations) save(domain_ROIs.copy() + edges, savepath, apply_cmap=False, rescale_range=True) save(edges, savepath.replace('.png', '_edges.png'), apply_cmap=False, rescale_range=True)
[docs]def get_domain_rebuilt_timecourses(domain_ROIs: np.ndarray, components: dict, apply_filter_mean: bool = True): ''' Get time courses for each domain ROI. The filtered movie is rebuilt under each ROI (one at a time). The mean is taken under each domain ROI. The ROI_timecourses and mean_filtered are returned in output. Arguments: domain_ROIs: The domain ROI map built by get_domain_map components : The results of seas.ica.project, including all eigenvector components. Used for rebuilding the movie. apply_filter_mean: Whether to calculate the filtered mean Returns: output, a dictionary with the following keys: ROI_timecourses: A np array of shape (n_domains, t), with the mean of a given domain, i, under ROI_timecourses[i]. mean_filtered: The frame mean, of shape (t), filtered by the default filtering method. ''' output = {} print('\nExtracting Domain ROI Timecourses\n-----------------------') ROI_timecourses = rebuild_mean_roi_timecourse(components, mask=domain_ROIs) output['ROI_timecourses'] = ROI_timecourses if apply_filter_mean: mean_filtered = filter_mean(components['mean']) output['mean_filtered'] = mean_filtered return output
[docs]def get_domain_edges(domain_ROIs: np.ndarray, clear_bg: bool = False, linepad: int = None): ''' Get the edges of the domain map using canny edge detection. Arguments: domain_ROIs: The domain map built by get_domain_map. clear_bg: True if background values should be nan, otherwise background values are 0. linepad: Line padding kernel for thicker borders. Must be an odd integer. Returns: output, a dictionary with the following keys: ROI_timecourses: A np array of shape (n_domains, t), with the mean of a given domain, i, under ROI_timecourses[i]. mean_filtered: The frame mean, of shape (t), filtered by the default filtering method. ''' # Make sure domains are detected when there are more than 256 of them, by adding an offset # Avoid a RuntimeWarning by explicitly converting to uint8 with no np.nan. domain_ROIs = domain_ROIs.copy() + 1 domain_ROIs[np.where(np.isnan(domain_ROIs))] = 0 domain_ROIs = domain_ROIs.astype('uint8') edges = cv2.Canny(domain_ROIs, 1, 1) edges += cv2.Canny((domain_ROIs + 1), 1, 1) if linepad is not None: assert type( linepad) is int, 'invalid line pad. Provide an odd integer.' kernel = np.ones((linepad, linepad), np.uint8) edges = cv2.filter2D(edges, -1, kernel) if clear_bg: edges = edges.astype('float64') edges[np.where(edges == 0)] = np.nan return edges
[docs]def get_padded_borders(domain_ROIs: np.ndarray, blur: int, rois: dict, n_roi_rotations: int = 0, bounding_box: bool = None): ''' Since a gaussian blur between a value with a number and a nan border will result in an increasingly large nan border, get this new border roi boundary. Rois are used to separate hemispheres rather than blur together. Arguments: domain_ROIs: The domain map built by get_domain_map. blur: True if background values should be nan, otherwise background values are 0. rois: The roi dict used to generate the roimask. This should be saved in components['expmeta'] during decomposition. n_roi_rotations: The number of times the rois were rotated when loading. bounding_box: The bounding box applied before ICA decomposition. Returns: output, a dictionary with the following keys: ROI_timecourses: A np array of shape (n_domains, t), with the mean of a given domain, i, under ROI_timecourses[i]. mean_filtered: The frame mean, of shape (t), filtered by the default filtering method. ''' shape = domain_ROIs.shape padmask = np.zeros((shape[0], shape[1]), dtype='uint8') for i, roi in enumerate(rois): mask = make_mask(rois[roi], shape, bounding_box) mask = np.pad(mask.astype('float64'), 1, 'constant', constant_values=np.nan) mask[np.where(mask == 0)] = np.nan blurred = cv2.GaussianBlur(mask, (blur, blur), 0) blurred = blurred[1:-1, 1:-1] padmask[np.where(~np.isnan(blurred))] = 1 padmask = rotate(padmask, n_roi_rotations) return padmask
[docs]def domain_map(domain_ROIs: np.ndarray, values: np.ndarray = None): ''' Used to generate a domain map with a specific coloration scheme. If values are a calculated metric, such as pearson correlation, each domain i in the domain map will be colored by its value values[i] in the input vector. Arguments: domain_ROIs: The domain map built by get_domain_map. values: An array of values to color the domain_ROIs. It must be the same length as the number of unique indices in domain_ROIs. If no values are provided, the domain_ROIs are returned as-is. Returns: domain_ROIs_colored: the domain_ROIs, with each domain reassigned from its index, to its value given by input 'values'. ''' if values is not None: domainmap = np.zeros(domain_ROIs.shape) domainmap[np.where(np.isnan(domain_ROIs))] = np.nan if values.ndim > 1: domainmap = np.tile(domainmap[:, :, None], values.shape[1]) for i in np.arange(np.nanmax(domain_ROIs) + 1).astype('uint16'): domainmap[np.where(domain_ROIs == i)] = values[i] else: domainmap = domain_ROIs return domainmap
[docs]def mosaic_movie(domain_ROIs: np.ndarray, ROI_timecourses: np.ndarray, savepath: str = None, t_start: int = None, t_stop: int = None, n_rotations: int = 0, colormap: np.ndarray = None, resize_factor: int = 1, codec: str = None, speed: int = 1, fps: int = 10): ''' Creates a mosaic movie, where the original movie is played back as a series of domain timecourses, each displayed over its original domain. Arguments: domain_ROIs: The domain map built by get_domain_map. ROI_timecourses: The time series of each ROI, built by get_domain_map if map_only is False, or reconstructed with 'get_domain_rebuilt_timecourses' savepath: Where to save the movie to. In most cases, should be an avi or mp4 file path. t_start: Frame number to start the movie at. If not provided, movie starts at 0. t_stop: Frame number to stop the movie at. If not provided, movie ends at last frame. n_rotations: Number of CCW rotations to apply before saving. colormap: The colormap to apply. If not provided, loads the default from the defaults config file. resize_factor: The factor to resize by when writing the video. Returns: Nothing. ''' print('\nRebuilding Mosiac Movie\n-----------------------') if colormap is None: colormap = DEFAULT_COLORMAP t, x, y = (ROI_timecourses.shape[1], domain_ROIs.shape[0], domain_ROIs.shape[1]) if (t_start is not None) or (t_stop is not None): frames = np.arange(t) frames = frames[t_start:t_stop] t = frames.size movie = np.zeros((domain_ROIs.size, t), dtype='float16') for roi in np.arange(ROI_timecourses.shape[0]): roi_ind = np.where(domain_ROIs.flat == roi)[0] movie[roi_ind, :] = ROI_timecourses[roi, t_start:t_stop] movie = movie.T.reshape((t, x, y)) overlay = np.isnan(domain_ROIs).astype('uint8') movie = rotate(movie, n_rotations) overlay = rotate(overlay, n_rotations) if savepath is None: print('Finished rebuilding. Returning movie...') return movie else: print('Finished rebuilding. Saving file...') save(movie, savepath, rescale_range=True, save_cbar=True, overlay=overlay, colormap=colormap, resize_factor=1, codec=None, speed=1, fps=10)
[docs]def rolling_mosaic_movie(domain_ROIs: np.ndarray, ROI_timecourses: np.ndarray, savepath: str, t_start: int = None, t_stop: int = None, n_rotations: int = 0, colormap: np.ndarray = None, resize_factor: int = 1, codec: str = None, speed: int = 1, fps: int = 10): ''' A low memory version of mosaic_movie. The functionality is the same, except each frame is written one at at a time. Movie scale values may be slightly different, since they are calculated from the first frame instead of on the entire movie. Creates a mosaic movie, where the original movie is played back as a series of domain timecourses, each displayed over its original domain. Arguments: domain_ROIs: The domain map built by get_domain_map. ROI_timecourses: The time series of each ROI, built by get_domain_map if map_only is False, or reconstructed with 'get_domain_rebuilt_timecourses' savepath: Where to save the movie to. In most cases, should be an avi or mp4 file path. t_start: Frame number to start the movie at. If not provided, movie starts at 0. t_stop: Frame number to stop the movie at. If not provided, movie ends at last frame. n_rotations: Number of CCW rotations to apply before saving. colormap: The colormap to apply. If not provided, loads the default from the defaults config file. Returns: Nothing. ''' print('\nWriting Rolling Mosiac Movie\n-----------------------') if colormap is None: colormap = DEFAULT_COLORMAP # Initialize Parameters. resize_factor = 1 / resize_factor x, y = domain_ROIs.shape n_domains = ROI_timecourses.shape[0] t = np.arange(ROI_timecourses.shape[1]) t = t[t_start:t_stop] # Set up resizing factors. w = int(x // resize_factor) h = int(y // resize_factor) # Find codec to use, if not specified. if codec is None: if savepath.endswith('.mp4'): if os.name == 'posix': codec = 'X264' elif os.name == 'nt': codec = 'XVID' else: if os.name == 'posix': codec = 'MP4V' elif os.name == 'nt': codec = 'XVID' else: raise TypeError('Unknown os type: {0}'.format(os.name)) # Initialize movie writer. display_speed = fps * speed fourcc = cv2.VideoWriter_fourcc(*codec) out = cv2.VideoWriter(savepath, fourcc, display_speed, (h, w), isColor=True) def write_frame(frame): # rescale and convert to uint8. frame = rescale(frame, min_max=(scale['min'], scale['max']), cap=False, verbose=False).astype('uint8') frame = cv2.resize(frame, (h, w), interpolation=cv2.INTER_AREA) frame = rotate(frame, n_rotations) # Apply colormap, write frame to .avi. if colormap is not None: frame = cv2.applyColorMap(frame, colormap) else: frame = np.repeat(frame[:, :, None], 3, axis=2) out.write(frame) print('Saving dfof video to: ' + savepath) frame = np.empty((x, y)) for f in t: if f % 10 == 0: print('on frame:', f, '/', t.size) frame[:] = np.nan for i in range(n_domains): frame[np.where(domain_ROIs == i)] = ROI_timecourses[i, f] # If first frame, calculate scaling parameters. if (f == 0): mean = np.nanmean(frame) std = np.nanstd(frame) fmin = mean - 3 * std fmax = mean + 7 * std scale = {'scale': 255.0 / (fmax - fmin), 'min': fmin, 'max': fmax} write_frame(frame) out.release() print('Video saved to:', savepath) cbarpath = os.path.splitext(savepath)[0] + '_colorbar.pdf' print('Saving Colorbar to:' + cbarpath) save_colorbar(scale, cbarpath, colormap=colormap)