diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 675265cb2..fb0567296 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -110,7 +110,8 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, if self.data.ap_meta: rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRMS.npy") if rms_file.exists() and not overwrite: - _logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.') + _logger.warning(f'RMS map already exists for .ap data in {self.probe_path}, skipping. ' + f'Use overwrite option.') median_rms = np.load(rms_file) else: rl = self.data.ap_meta.fileTimeSecs diff --git a/ibllib/io/extractors/base.py b/ibllib/io/extractors/base.py index 7f34d2e8a..082bde5c1 100644 --- a/ibllib/io/extractors/base.py +++ b/ibllib/io/extractors/base.py @@ -167,6 +167,18 @@ def _get_task_types_json_config(): return task_types +def get_task_protocol(session_path): + try: + settings = load_settings(get_session_path(session_path)) + except json.decoder.JSONDecodeError: + _logger.error(f"Can't read settings for {session_path}") + return + if settings: + return settings.get('PYBPOD_PROTOCOL', None) + else: + return + + def get_task_extractor_type(task_name): """ Returns the task type string from the full pybpod task name: @@ -176,13 +188,8 @@ def get_task_extractor_type(task_name): :return: one of ['biased', 'habituation', 'training', 'ephys', 'mock_ephys', 'sync_ephys'] """ if isinstance(task_name, Path): - try: - settings = load_settings(get_session_path(task_name)) - except json.decoder.JSONDecodeError: - return - if settings: - task_name = settings.get('PYBPOD_PROTOCOL', None) - else: + task_name = get_task_protocol(task_name) + if task_name is None: return task_types = _get_task_types_json_config() task_type = next((task_types[tt] for tt in task_types if tt in task_name), None) @@ -225,7 +232,7 @@ def _get_pipeline_from_task_type(stype): :param stype: session_type or task extractor type :return: """ - if 'ephys' in stype: + if stype in ['ephys_biased_opto', 'ephys', 'ephys_training', 'mock_ephys', 'sync_ephys']: return 'ephys' elif stype in ['habituation', 'training', 'biased', 'biased_opto']: return 'training' diff --git a/ibllib/io/extractors/ephys_fpga.py b/ibllib/io/extractors/ephys_fpga.py index 9410197b9..9b7d0f3b3 100644 --- a/ibllib/io/extractors/ephys_fpga.py +++ b/ibllib/io/extractors/ephys_fpga.py @@ -544,6 +544,20 @@ def _get_all_probes_sync(session_path, bin_exists=True): return ephys_files +def get_wheel_positions(sync, chmap): + """ + Gets the wheel position from synchronisation pulses + :param sync: + :param chmap: + :return:wheel: dictionary with keys 'timestamps' and 'position' + moves: dictionary with keys 'intervals' and 'peakAmplitude' + """ + ts, pos = extract_wheel_sync(sync=sync, chmap=chmap) + moves = extract_wheel_moves(ts, pos) + wheel = {'timestamps': ts, 'position': pos} + return wheel, moves + + def get_main_probe_sync(session_path, bin_exists=False): """ From 3A or 3B multiprobe session, returns the main probe (3A) or nidq sync pulses @@ -561,7 +575,6 @@ def get_main_probe_sync(session_path, bin_exists=False): elif version == '3B': # the sync master is the nidq breakout box sync_box_ind = np.argmax([1 if ef.get('nidq') else 0 for ef in ephys_files]) - sync = ephys_files[sync_box_ind].sync sync_chmap = ephys_files[sync_box_ind].sync_map return sync, sync_chmap @@ -684,16 +697,16 @@ def _extract(self, sync=None, chmap=None, **kwargs): out.update({k: self.bpod2fpga(bpod_trials[k][ibpod]) for k in bpod_rsync_fields}) out.update({k: fpga_trials[k][ifpga] for k in sorted(fpga_trials.keys())}) # extract the wheel data + wheel, moves = get_wheel_positions(sync=sync, chmap=chmap) from ibllib.io.extractors.training_wheel import extract_first_movement_times - ts, pos = extract_wheel_sync(sync=sync, chmap=chmap) - moves = extract_wheel_moves(ts, pos) settings = raw_data_loaders.load_settings(session_path=self.session_path) min_qt = settings.get('QUIESCENT_PERIOD', None) first_move_onsets, *_ = extract_first_movement_times(moves, out, min_qt=min_qt) out.update({'firstMovement_times': first_move_onsets}) assert tuple(filter(lambda x: 'wheel' not in x, self.var_names)) == tuple(out.keys()) - return [out[k] for k in out] + [ts, pos, moves['intervals'], moves['peakAmplitude']] + return [out[k] for k in out] + [wheel['timestamps'], wheel['position'], + moves['intervals'], moves['peakAmplitude']] def extract_all(session_path, save=True, bin_exists=False): diff --git a/ibllib/io/extractors/extractor_types.json b/ibllib/io/extractors/extractor_types.json index 4a58b9d7e..e69e65270 100644 --- a/ibllib/io/extractors/extractor_types.json +++ b/ibllib/io/extractors/extractor_types.json @@ -1,5 +1,4 @@ -{ - "ksocha_ephysOptoStimulation": "ephys_passive_opto", +{"ksocha_ephysOptoStimulation": "ephys_passive_opto", "ksocha_ephysOptoChoiceWorld": "ephys_biased_opto", "passiveChoiceWorld": "ephys_replay", "opto_ephysChoiceWorld": "ephys_biased_opto", diff --git a/ibllib/io/extractors/opto_trials.py b/ibllib/io/extractors/opto_trials.py index da44966e0..c29eadae3 100644 --- a/ibllib/io/extractors/opto_trials.py +++ b/ibllib/io/extractors/opto_trials.py @@ -11,8 +11,8 @@ class LaserBool(BaseBpodTrialsExtractor): """ Extracts the laser probabilities from the bpod jsonable """ - save_names = ('_ibl_trials.laser_stimulation.npy', '_ibl_trials.laser_probability.npy') - var_names = ('laser_stimulation', 'laser_probability') + save_names = ('_ibl_trials.laserStimulation.npy', '_ibl_trials.laserProbability.npy') + var_names = ('laserStimulation', 'laserProbability') def _extract(self, **kwargs): _logger.info('Extracting laser datasets') @@ -41,11 +41,11 @@ def _extract(self, **kwargs): if np.all(np.isnan(lprob)): # this prevents the file from being saved when no data - self.save_names = ('_ibl_trials.laser_stimulation.npy', None) + self.save_names = ('_ibl_trials.laserStimulation.npy', None) _logger.warning('No laser probability found in bpod data') if np.all(np.isnan(lstim)): # this prevents the file from being saved when no data - self.save_names = (None, '_ibl_trials.laser_probability.npy') + self.save_names = (None, '_ibl_trials.laserProbability.npy') _logger.warning('No laser stimulation found in bpod data') return lstim, lprob diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index c7c8c2665..b41096698 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -58,7 +58,7 @@ class RawEphysQC(tasks.Task): io_charge = 30 # this jobs reads raw ap files priority = 10 # a lot of jobs depend on this one level = 0 # this job doesn't depend on anything - input_files = signatures.RAWEPHYSQC + signature = {'input_files': signatures.RAWEPHYSQC, 'output_files': ()} def _run(self, overwrite=False): eid = self.one.path2eid(self.session_path) @@ -69,19 +69,26 @@ def _run(self, overwrite=False): pids = [p['id'] for p in create_alyx_probe_insertions(self.session_path, one=self.one)] qc_files = [] for pid in pids: - eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) - qc_files.extend(eqc.run(update=True, overwrite=overwrite)) + try: + eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) + qc_files.extend(eqc.run(update=True, overwrite=overwrite)) + except AssertionError: + self.status = -1 + continue return qc_files class EphysAudio(tasks.Task): """ - Computes raw electrophysiology QC + Compresses the microphone wav file in a lossless flac file """ cpu = 2 priority = 10 # a lot of jobs depend on this one level = 0 # this job doesn't depend on anything + signature = {'input_files': ('_iblrig_micData.raw.wav', 'raw_behavior_data', True), + 'output_files': ('_iblrig_micData.raw.flac', 'raw_behavior_data', True), + } def _run(self, overwrite=False): command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}" @@ -106,7 +113,7 @@ class SpikeSorting(tasks.Task): ) SPIKE_SORTER_NAME = 'pykilosort' PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort') - input_files = signatures.SPIKESORTING + signature = {'input_files': signatures.SPIKESORTING, 'output_files': ()} @staticmethod def _sample2v(ap_file): @@ -285,7 +292,7 @@ def _run(self, **kwargs): class EphysTrials(tasks.Task): priority = 90 level = 1 - input_files = signatures.EPHYSTRIALS + signature = {'input_files': signatures.EPHYSTRIALS, 'output_files': ()} def _behaviour_criterion(self): """ @@ -454,7 +461,7 @@ class EphysPassive(tasks.Task): cpu = 1 io_charge = 90 level = 1 - input_files = signatures.EPHYSPASSIVE + signature = {'input_files': signatures.EPHYSPASSIVE, 'output_files': ()} def _run(self): """returns a list of pathlib.Paths. """ diff --git a/ibllib/pipes/local_server.py b/ibllib/pipes/local_server.py index 4c7443d89..08bf72bb1 100644 --- a/ibllib/pipes/local_server.py +++ b/ibllib/pipes/local_server.py @@ -10,12 +10,28 @@ from one.api import ONE -from ibllib.io.extractors.base import get_session_extractor_type, get_pipeline -from ibllib.pipes import ephys_preprocessing, training_preprocessing, tasks +from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type +from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing from ibllib.time import date2isostr import ibllib.oneibl.registration as registration _logger = logging.getLogger('ibllib') +LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'EphysDLC', 'TrainingDLC', 'SpikeSorting'] + + +def _get_pipeline_class(session_path, one): + pipeline = get_pipeline(session_path) + if pipeline == 'training': + PipelineClass = training_preprocessing.TrainingExtractionPipeline + elif pipeline == 'ephys': + PipelineClass = ephys_preprocessing.EphysExtractionPipeline + else: + # try and look if there is a custom extractor in the personal projects extraction class + import projects.base + task_type = get_session_extractor_type(session_path) + PipelineClass = projects.base.get_pipeline(task_type) + _logger.info(f"Using {PipelineClass} pipeline for {session_path}") + return PipelineClass(session_path=session_path, one=one) def _get_lab(one): @@ -101,16 +117,10 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None): session_path, one=one, max_md5_size=max_md5_size) if dsets is not None: all_datasets.extend(dsets) - pipeline = get_pipeline(session_path) - if pipeline == 'training': - pipe = training_preprocessing.TrainingExtractionPipeline(session_path, one=one) - # only start extracting ephys on a raw_session.flag - elif pipeline == 'ephys' and flag_file.name == 'raw_session.flag': - pipe = ephys_preprocessing.EphysExtractionPipeline(session_path, one=one) - else: - _logger.info(f'Session type {get_session_extractor_type(session_path)}' - f'as no matching pipeline pattern {session_path}') - continue + pipe = _get_pipeline_class(session_path, one) + if pipe is None: + task_protocol = get_task_protocol(session_path) + _logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}') if rerun: rerun__status__in = '__all__' else: @@ -125,11 +135,12 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None): return all_datasets -def job_runner(subjects_path, lab=None, dry=False, one=None, count=5): +def job_runner(subjects_path, mode='all', lab=None, dry=False, one=None, count=5): """ Function to be used as a process to run the jobs as they are created on the database This will query waiting jobs from the specified Lab :param subjects_path: on servers: /mnt/s0/Data/Subjects. Contains sessions + :param mode: Whether to run all jobs, or only small or large (video compression, DLC, spike sorting) jobs :param lab: lab name as per Alyx :param dry: :param count: @@ -141,8 +152,18 @@ def job_runner(subjects_path, lab=None, dry=False, one=None, count=5): lab = _get_lab(one) if lab is None: return # if the lab is none, this will return empty tasks each time - tasks = one.alyx.rest('tasks', 'list', status='Waiting', - django=f'session__lab__name__in,{lab}') + # Filter for tasks + if mode == 'all': + tasks = one.alyx.rest('tasks', 'list', status='Waiting', + django=f'session__lab__name__in,{lab}', no_cache=True) + elif mode == 'small': + tasks_all = one.alyx.rest('tasks', 'list', status='Waiting', + django=f'session__lab__name__in,{lab}', no_cache=True) + tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS] + elif mode == 'large': + tasks = one.alyx.rest('tasks', 'list', status='Waiting', + django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True) + tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry) diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index 470f2504f..6f758ae1d 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -38,7 +38,7 @@ class Task(abc.ABC): time_out_secs = None version = version.ibllib() log = '' - input_files = None + signature = {'input_files': (), 'output_files': ()} # tuple (filename, collection, required_flag) def __init__(self, session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, aws=None, location='server'): @@ -254,12 +254,9 @@ def _getData(self): :return: """ assert self.one - - # This will be improved by Olivier new filters - session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path), - details=True) + session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path), details=True) df = pd.DataFrame(columns=self.one._cache.datasets.columns) - for file in self.input_files: + for file in self.signature['input_files']: df = df.append(filter_datasets(session_datasets, filename=file[0], collection=file[1], wildcards=True, assert_unique=False)) return df @@ -278,6 +275,24 @@ def _cleanUp_SDSC(self): assert SDSC_PATCH_PATH.parts[0:4] == self.session_path.parts[0:4] shutil.rmtree(self.session_path) + def assert_expected_outputs(self): + """ + After a run, asserts that all signature files are present at least once in the output files + Mainly useful for integration tests + :return: + """ + assert self.status == 0 + everthing_is_fine = True + for expected_file in self.signature['output_files']: + actual_files = list(self.session_path.rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) + if len(actual_files) == 0: + everthing_is_fine = False + _logger.error(f"Signature file expected {expected_file} not found in the output") + if not everthing_is_fine: + for out in self.outputs: + _logger.error(f"{out}") + raise FileNotFoundError("Missing outputs after task completion") + class Pipeline(abc.ABC): """ diff --git a/ibllib/tests/test_pipes.py b/ibllib/tests/test_pipes.py index 03bf36558..443448f5a 100644 --- a/ibllib/tests/test_pipes.py +++ b/ibllib/tests/test_pipes.py @@ -32,6 +32,7 @@ def test_task_to_pipeline(self): ("mock_ephys", "ephys"), ("sync_ephys", "ephys"), ("ephys", "ephys"), + ("ephys_passive_opto", "ephys_passive_opto") ] for typ, exp in pipe_out: assert ibllib.io.extractors.base._get_pipeline_from_task_type(typ) == exp diff --git a/release_notes.md b/release_notes.md index 513065a80..7e38a9379 100644 --- a/release_notes.md +++ b/release_notes.md @@ -9,6 +9,9 @@ ### Release Notes 2.1.2 2021-10-14 - Fix issue with RawEphysQC that was not looking in local Subjects folder for data - Fix ensure_required_data in DlcQc +### Release Notes 2.1.3 2021-10-19 +- Split jobs.py run function in two, one running large tasks (video compression, dlc, spike sorting), one the rest +- Ensure RawEphysQC runs both probes if one fails ## Release Notes 2.0 ### Release Notes 2.0.1 2021-08-07 diff --git a/setup.py b/setup.py index 221116253..ddf332c01 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ setup( name='ibllib', - version='2.1.2', + version='2.1.3', python_requires='>={}.{}'.format(*REQUIRED_PYTHON), description='IBL libraries', license="MIT",