Source code for seas.filemanager

#!/usr/bin/env python3
'''
Functions used for loading and finding files for brain analysis.

Authors: Sydney C. Weiser
Date: 2017-07-28
'''
import os
import re
import sys
import time
from subprocess import call
import yaml
from typing import List


[docs]def find_files(folder_path: str, match_string: str, subdirectories: bool = False, suffix: bool = False, regex: bool = False, verbose: bool = True) -> List[str]: ''' Finds files in folder_path that match a match_string, either at the end of the path if suffix=True, or anywhere if suffix=False. Searches subdirectories if subdirectories = True Arguments: folder_path: The folder to search for files in. match_string: The match string to search for. subdirectories: Whether to search subdirectories. suffix: The suffix to search for. regex: Whether to use regex to search for the match string. verbose: Whether to produce verbose output. Returns: results: A list of file paths. ''' assert os.path.isdir(folder_path), 'Folder input was not a valid directory' files = [] if subdirectories: result = os.walk(folder_path) for i, f in enumerate(result): print(i) root, folder, file_list = f for file in file_list: print(file) files.append(os.path.join(root, file)) else: for file in os.listdir(folder_path): files.append(os.path.join(folder_path, file)) if verbose: print("all files found in folder '{0}':".format(folder_path)) results = [] for filepath in files: file = os.path.basename(filepath) if os.path.isfile(filepath): if verbose: print('\t', file) if regex: # search using regular expressions if re.match(match_string, file): results.append(filepath) else: # search using string functions if suffix: if file.endswith(match_string): results.append(filepath) else: if file.find(match_string) >= 0: results.append(filepath) if verbose: print('matching files found in folder:') [print('\t', os.path.basename(file)) for file in results] return results
[docs]def movie_sorter(pathlist: List[str], matchstr: str = None, verbose: bool = True) -> dict: ''' Takes list of paths, sorts into experiments, and orders files by extension number. Returns dict of experiments with associated files. Arguments: pathlist: A list of paths, generally produced by find_files. Returns: experiments: A dict, containing top level entries representing unique results of the matchstring search, each containing a list of tif files which matched that experiment. ''' n_files = len(pathlist) exp_list = [] fnum_list = [] # Only match movie files that have a specific file format. if matchstr is None: matchstr = r'(\d{6}_\d{2})(?:[@-](\d{4}))?\.tif' for i, file in enumerate(pathlist): name = os.path.basename(file) match = re.match(matchstr, name) if match is not None: exp, fnum = re.match(matchstr, name).groups() exp_list.append(exp) fnum_list.append(fnum) experiments = {} for exp in set(exp_list): indices = [i for i, exp_i in enumerate(exp_list) if exp == exp_i] if len(indices) == 1: experiments[exp] = [pathlist[indices[0]]] else: fnum_set = [fnum_list[i] for i in indices] # Sort file number extensions by order, get new indices. for n, fnum in enumerate(fnum_set): if fnum is None: fnum_set[n] = 0 else: fnum_set[n] = int(fnum) _, indices_sorted = zip(*sorted(zip(fnum_set, indices))) experiments[exp] = [pathlist[i] for i in indices_sorted] if verbose: print('\nExperiments\n-----------------------') for exp in experiments: print(exp + ':') [print('\t', fname) for fname in experiments[exp]] return experiments
[docs]def experiment_sorter(folder_path: str, experimentstr: str = None, verbose: bool = True) -> dict: ''' Finds all files associated with an experiment in a particular folder, organizes them by filetype: movie files, processed files, metadata files. Arguments: folder_path: A path specifying which folder to search for experiment files within. experimentstr: The matchstring to search for relevant paths with. verbose: Whether to produce verbose output. Returns: experiment_files: A dictionary containing all the types of relevant files found for the given experiment specified by experimentstr. ''' assert os.path.isdir(folder_path), 'Folder input was not a valid directory' # Determine whether experimentstr matches the expected format. if experimentstr is not None: if re.match(r'^\d{6}_\d{2}-\d{2}$', experimentstr) is not None: print('matching multiple experiments') match = re.match(r'^(\d{6})_(\d{2})-(\d{2})$', experimentstr) groups = match.groups() experimentlist = [groups[0]+'_{:02d}'.format(i) \ for i in range(int(groups[1]), int(groups[2])+1)] else: assert re.match(r'^\d{6}_\d{2}$', experimentstr) is not None, \ 'experimentstr input was not a valid YYMMDD_EE experiment name' experimentlist = [experimentstr] else: experimentlist = [r'\d{6}_\d{2}'] files = os.listdir(folder_path) if verbose: print("all matching found in folder '{0}':".format(folder_path)) movies = [] meta = [] ica = [] processed = [] roi = [] dfof = [] body = [] oflow = [] videodata = [] for experimentstr in experimentlist: movies_unsorted = [] moviestr = experimentstr + r'(?:[@-](\d{4}))?\.tif' metastr = experimentstr + r'_meta\.yaml' icastr = experimentstr + r'_(.*)(ica|pca)\.hdf5' processedstr = experimentstr + r'_(ica|pca)(.+)\.hdf5' roistr = experimentstr + r'_roiset\.zip' dfofstr = experimentstr + r'_(\d+x)_dfof\.mp4' bodystr = experimentstr + r'_c(\d)-body_cam\.mp4' oflowstr = experimentstr + r'_(\w+)OpticFlow\.hdf5' ###### videodatastr = experimentstr + r'_videodata\.hdf5' for file in files: filepath = os.path.join(folder_path, file) if verbose: if re.match(experimentstr, file): print('\t', file) if re.match(moviestr, file, re.IGNORECASE): movies_unsorted.append(filepath) elif re.match(metastr, file, re.IGNORECASE): meta.append(filepath) elif re.match(icastr, file, re.IGNORECASE): ica.append(filepath) elif re.match(processedstr, file, re.IGNORECASE): processed.append(filepath) elif re.match(roistr, file, re.IGNORECASE): roi.append(filepath) elif re.match(dfofstr, file, re.IGNORECASE): dfof.append(filepath) elif re.match(bodystr, file, re.IGNORECASE): body.append(filepath) elif re.match(oflowstr, file, re.IGNORECASE): oflow.append(filepath) elif re.match(videodatastr, file, re.IGNORECASE): videodata.append(filepath) movies.extend( movie_sorter(movies_unsorted, verbose=False)[experimentstr]) experiment_files = { 'movies': movies, 'meta': meta, 'processed': processed, 'ica': ica, 'roi': roi, 'dfof': dfof, 'body': body, 'oflow': oflow, 'videodata': videodata } if verbose: print('Matches:') for key in experiment_files: if len(experiment_files[key]) > 0: print('\t' + key + ':') [ print('\t\t' + os.path.basename(item)) for item in experiment_files[key] ] return experiment_files
[docs]def sort_experiments(files: List[str], experiment_format_string: str = None, verbose: bool = True) -> dict: ''' Given a list of files, sort them into relevant experiments. Arguments: files: A list of files to search for a given experiment format string. experiment_format_string: the experiment match string. Returns: experiments_found: A dictionary containing the unique experiments found by the format string. ''' if verbose: print('\nSorting Keys\n-----------------------') if experiment_format_string is not None: assert re.match(r'\d{6}_\d{2}', experiment_format_string) is not None, \ 'experiment_format_string input was not a valid YYMMDD_EE experiment name' else: experiment_format_string = r'(\d{6}_\d{2})' experiments_found = {} for i, file in enumerate(files): match = re.match(experiment_format_string, os.path.basename(file)) if match is not None: exp = match.groups()[0] if exp not in experiments_found.keys(): experiments_found[exp] = [file] else: experiments_found[exp].append(file) if verbose: for expname in experiments_found: print(expname) [print('\t', key) for key in experiments_found[expname]] return experiments_found
[docs]def get_exp_span_string(experiments: List[str]) -> str: ''' Creates a formatted string based on the experiments found in the experiments list. Args: experiments: A list of experiments found. e.g. 120244_12, 120244_13, 120244_12. Returns: experiment_span_string: A string representing the experiment name and experiment span. e.g. 120244_12-14. ''' if len(experiments) == 1: experiment_span_string = [ get_basename(experiment) for experiment in experiments ] return experiment_span_string[0] else: experimentstr = r'(\d{6})_(\d{2})' explist = {} for exp in experiments: match = re.match(experimentstr, exp) if match is not None: date = match.groups()[0] if date not in explist: explist[date] = [] explist[date].append(match.groups()[1]) # In-place sort experiment numbers. [explist[date].sort() for date in explist] experiment_span_list = [ date + '_' + '-'.join(explist[date]) for date in explist ] experiment_span_string = '_'.join(experiment_span_list) return experiment_span_string
[docs]def get_basename(path: str): ''' Get the experiment basename, stripping any extensions or tiff file fount extensions. Arguments: path: The unformatted filepath. e.g.: ./example/directory/filename@0001.tif Returns: name: The formatted name. e.g.: filename, from the example above. ''' name = os.path.basename(path) name = re.sub(r'(\.)(\w){3,4}$', '', name) # remove extension name = re.sub(r'([@-])(\d){4}', '', name) # remove @0001 from path return name
[docs]def read_yaml(path: str) -> dict: ''' Loads nested dictionaries from .yaml formated files. Arguments: path: the path to read yaml data from. Returns: yaml_contents: The contents of the yaml file. ''' yaml_contents = dict() with open(path, 'r') as data: try: yaml_contents = yaml.load(data) except yaml.YAMLError as exc: print(exc) return yaml_contents