Skip to content

Commit

Permalink
Merge branch 'release/2.1.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
juhuntenburg committed Oct 19, 2021
2 parents 91abb56 + 8b2c162 commit 93be6b9
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 48 deletions.
3 changes: 2 additions & 1 deletion ibllib/ephys/ephysqc.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None,
if self.data.ap_meta:
rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRMS.npy")
if rms_file.exists() and not overwrite:
_logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.')
_logger.warning(f'RMS map already exists for .ap data in {self.probe_path}, skipping. '
f'Use overwrite option.')
median_rms = np.load(rms_file)
else:
rl = self.data.ap_meta.fileTimeSecs
Expand Down
23 changes: 15 additions & 8 deletions ibllib/io/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ def _get_task_types_json_config():
return task_types


def get_task_protocol(session_path):
try:
settings = load_settings(get_session_path(session_path))
except json.decoder.JSONDecodeError:
_logger.error(f"Can't read settings for {session_path}")
return
if settings:
return settings.get('PYBPOD_PROTOCOL', None)
else:
return


def get_task_extractor_type(task_name):
"""
Returns the task type string from the full pybpod task name:
Expand All @@ -176,13 +188,8 @@ def get_task_extractor_type(task_name):
:return: one of ['biased', 'habituation', 'training', 'ephys', 'mock_ephys', 'sync_ephys']
"""
if isinstance(task_name, Path):
try:
settings = load_settings(get_session_path(task_name))
except json.decoder.JSONDecodeError:
return
if settings:
task_name = settings.get('PYBPOD_PROTOCOL', None)
else:
task_name = get_task_protocol(task_name)
if task_name is None:
return
task_types = _get_task_types_json_config()
task_type = next((task_types[tt] for tt in task_types if tt in task_name), None)
Expand Down Expand Up @@ -225,7 +232,7 @@ def _get_pipeline_from_task_type(stype):
:param stype: session_type or task extractor type
:return:
"""
if 'ephys' in stype:
if stype in ['ephys_biased_opto', 'ephys', 'ephys_training', 'mock_ephys', 'sync_ephys']:
return 'ephys'
elif stype in ['habituation', 'training', 'biased', 'biased_opto']:
return 'training'
Expand Down
21 changes: 17 additions & 4 deletions ibllib/io/extractors/ephys_fpga.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,20 @@ def _get_all_probes_sync(session_path, bin_exists=True):
return ephys_files


def get_wheel_positions(sync, chmap):
"""
Gets the wheel position from synchronisation pulses
:param sync:
:param chmap:
:return:wheel: dictionary with keys 'timestamps' and 'position'
moves: dictionary with keys 'intervals' and 'peakAmplitude'
"""
ts, pos = extract_wheel_sync(sync=sync, chmap=chmap)
moves = extract_wheel_moves(ts, pos)
wheel = {'timestamps': ts, 'position': pos}
return wheel, moves


def get_main_probe_sync(session_path, bin_exists=False):
"""
From 3A or 3B multiprobe session, returns the main probe (3A) or nidq sync pulses
Expand All @@ -561,7 +575,6 @@ def get_main_probe_sync(session_path, bin_exists=False):
elif version == '3B':
# the sync master is the nidq breakout box
sync_box_ind = np.argmax([1 if ef.get('nidq') else 0 for ef in ephys_files])

sync = ephys_files[sync_box_ind].sync
sync_chmap = ephys_files[sync_box_ind].sync_map
return sync, sync_chmap
Expand Down Expand Up @@ -684,16 +697,16 @@ def _extract(self, sync=None, chmap=None, **kwargs):
out.update({k: self.bpod2fpga(bpod_trials[k][ibpod]) for k in bpod_rsync_fields})
out.update({k: fpga_trials[k][ifpga] for k in sorted(fpga_trials.keys())})
# extract the wheel data
wheel, moves = get_wheel_positions(sync=sync, chmap=chmap)
from ibllib.io.extractors.training_wheel import extract_first_movement_times
ts, pos = extract_wheel_sync(sync=sync, chmap=chmap)
moves = extract_wheel_moves(ts, pos)
settings = raw_data_loaders.load_settings(session_path=self.session_path)
min_qt = settings.get('QUIESCENT_PERIOD', None)
first_move_onsets, *_ = extract_first_movement_times(moves, out, min_qt=min_qt)
out.update({'firstMovement_times': first_move_onsets})

assert tuple(filter(lambda x: 'wheel' not in x, self.var_names)) == tuple(out.keys())
return [out[k] for k in out] + [ts, pos, moves['intervals'], moves['peakAmplitude']]
return [out[k] for k in out] + [wheel['timestamps'], wheel['position'],
moves['intervals'], moves['peakAmplitude']]


def extract_all(session_path, save=True, bin_exists=False):
Expand Down
3 changes: 1 addition & 2 deletions ibllib/io/extractors/extractor_types.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"ksocha_ephysOptoStimulation": "ephys_passive_opto",
{"ksocha_ephysOptoStimulation": "ephys_passive_opto",
"ksocha_ephysOptoChoiceWorld": "ephys_biased_opto",
"passiveChoiceWorld": "ephys_replay",
"opto_ephysChoiceWorld": "ephys_biased_opto",
Expand Down
8 changes: 4 additions & 4 deletions ibllib/io/extractors/opto_trials.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class LaserBool(BaseBpodTrialsExtractor):
"""
Extracts the laser probabilities from the bpod jsonable
"""
save_names = ('_ibl_trials.laser_stimulation.npy', '_ibl_trials.laser_probability.npy')
var_names = ('laser_stimulation', 'laser_probability')
save_names = ('_ibl_trials.laserStimulation.npy', '_ibl_trials.laserProbability.npy')
var_names = ('laserStimulation', 'laserProbability')

def _extract(self, **kwargs):
_logger.info('Extracting laser datasets')
Expand Down Expand Up @@ -41,11 +41,11 @@ def _extract(self, **kwargs):

if np.all(np.isnan(lprob)):
# this prevents the file from being saved when no data
self.save_names = ('_ibl_trials.laser_stimulation.npy', None)
self.save_names = ('_ibl_trials.laserStimulation.npy', None)
_logger.warning('No laser probability found in bpod data')
if np.all(np.isnan(lstim)):
# this prevents the file from being saved when no data
self.save_names = (None, '_ibl_trials.laser_probability.npy')
self.save_names = (None, '_ibl_trials.laserProbability.npy')
_logger.warning('No laser stimulation found in bpod data')
return lstim, lprob

Expand Down
21 changes: 14 additions & 7 deletions ibllib/pipes/ephys_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RawEphysQC(tasks.Task):
io_charge = 30 # this jobs reads raw ap files
priority = 10 # a lot of jobs depend on this one
level = 0 # this job doesn't depend on anything
input_files = signatures.RAWEPHYSQC
signature = {'input_files': signatures.RAWEPHYSQC, 'output_files': ()}

def _run(self, overwrite=False):
eid = self.one.path2eid(self.session_path)
Expand All @@ -69,19 +69,26 @@ def _run(self, overwrite=False):
pids = [p['id'] for p in create_alyx_probe_insertions(self.session_path, one=self.one)]
qc_files = []
for pid in pids:
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
try:
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
except AssertionError:
self.status = -1
continue
return qc_files


class EphysAudio(tasks.Task):
"""
Computes raw electrophysiology QC
Compresses the microphone wav file in a lossless flac file
"""

cpu = 2
priority = 10 # a lot of jobs depend on this one
level = 0 # this job doesn't depend on anything
signature = {'input_files': ('_iblrig_micData.raw.wav', 'raw_behavior_data', True),
'output_files': ('_iblrig_micData.raw.flac', 'raw_behavior_data', True),
}

def _run(self, overwrite=False):
command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}"
Expand All @@ -106,7 +113,7 @@ class SpikeSorting(tasks.Task):
)
SPIKE_SORTER_NAME = 'pykilosort'
PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort')
input_files = signatures.SPIKESORTING
signature = {'input_files': signatures.SPIKESORTING, 'output_files': ()}

@staticmethod
def _sample2v(ap_file):
Expand Down Expand Up @@ -285,7 +292,7 @@ def _run(self, **kwargs):
class EphysTrials(tasks.Task):
priority = 90
level = 1
input_files = signatures.EPHYSTRIALS
signature = {'input_files': signatures.EPHYSTRIALS, 'output_files': ()}

def _behaviour_criterion(self):
"""
Expand Down Expand Up @@ -454,7 +461,7 @@ class EphysPassive(tasks.Task):
cpu = 1
io_charge = 90
level = 1
input_files = signatures.EPHYSPASSIVE
signature = {'input_files': signatures.EPHYSPASSIVE, 'output_files': ()}

def _run(self):
"""returns a list of pathlib.Paths. """
Expand Down
51 changes: 36 additions & 15 deletions ibllib/pipes/local_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,28 @@

from one.api import ONE

from ibllib.io.extractors.base import get_session_extractor_type, get_pipeline
from ibllib.pipes import ephys_preprocessing, training_preprocessing, tasks
from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type
from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing
from ibllib.time import date2isostr
import ibllib.oneibl.registration as registration

_logger = logging.getLogger('ibllib')
LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'EphysDLC', 'TrainingDLC', 'SpikeSorting']


def _get_pipeline_class(session_path, one):
pipeline = get_pipeline(session_path)
if pipeline == 'training':
PipelineClass = training_preprocessing.TrainingExtractionPipeline
elif pipeline == 'ephys':
PipelineClass = ephys_preprocessing.EphysExtractionPipeline
else:
# try and look if there is a custom extractor in the personal projects extraction class
import projects.base
task_type = get_session_extractor_type(session_path)
PipelineClass = projects.base.get_pipeline(task_type)
_logger.info(f"Using {PipelineClass} pipeline for {session_path}")
return PipelineClass(session_path=session_path, one=one)


def _get_lab(one):
Expand Down Expand Up @@ -101,16 +117,10 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
session_path, one=one, max_md5_size=max_md5_size)
if dsets is not None:
all_datasets.extend(dsets)
pipeline = get_pipeline(session_path)
if pipeline == 'training':
pipe = training_preprocessing.TrainingExtractionPipeline(session_path, one=one)
# only start extracting ephys on a raw_session.flag
elif pipeline == 'ephys' and flag_file.name == 'raw_session.flag':
pipe = ephys_preprocessing.EphysExtractionPipeline(session_path, one=one)
else:
_logger.info(f'Session type {get_session_extractor_type(session_path)}'
f'as no matching pipeline pattern {session_path}')
continue
pipe = _get_pipeline_class(session_path, one)
if pipe is None:
task_protocol = get_task_protocol(session_path)
_logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}')
if rerun:
rerun__status__in = '__all__'
else:
Expand All @@ -125,11 +135,12 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
return all_datasets


def job_runner(subjects_path, lab=None, dry=False, one=None, count=5):
def job_runner(subjects_path, mode='all', lab=None, dry=False, one=None, count=5):
"""
Function to be used as a process to run the jobs as they are created on the database
This will query waiting jobs from the specified Lab
:param subjects_path: on servers: /mnt/s0/Data/Subjects. Contains sessions
:param mode: Whether to run all jobs, or only small or large (video compression, DLC, spike sorting) jobs
:param lab: lab name as per Alyx
:param dry:
:param count:
Expand All @@ -141,8 +152,18 @@ def job_runner(subjects_path, lab=None, dry=False, one=None, count=5):
lab = _get_lab(one)
if lab is None:
return # if the lab is none, this will return empty tasks each time
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
django=f'session__lab__name__in,{lab}')
# Filter for tasks
if mode == 'all':
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
django=f'session__lab__name__in,{lab}', no_cache=True)
elif mode == 'small':
tasks_all = one.alyx.rest('tasks', 'list', status='Waiting',
django=f'session__lab__name__in,{lab}', no_cache=True)
tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS]
elif mode == 'large':
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True)

tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry)


Expand Down
27 changes: 21 additions & 6 deletions ibllib/pipes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Task(abc.ABC):
time_out_secs = None
version = version.ibllib()
log = ''
input_files = None
signature = {'input_files': (), 'output_files': ()} # tuple (filename, collection, required_flag)

def __init__(self, session_path, parents=None, taskid=None, one=None,
machine=None, clobber=True, aws=None, location='server'):
Expand Down Expand Up @@ -254,12 +254,9 @@ def _getData(self):
:return:
"""
assert self.one

# This will be improved by Olivier new filters
session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path),
details=True)
session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path), details=True)
df = pd.DataFrame(columns=self.one._cache.datasets.columns)
for file in self.input_files:
for file in self.signature['input_files']:
df = df.append(filter_datasets(session_datasets, filename=file[0], collection=file[1],
wildcards=True, assert_unique=False))
return df
Expand All @@ -278,6 +275,24 @@ def _cleanUp_SDSC(self):
assert SDSC_PATCH_PATH.parts[0:4] == self.session_path.parts[0:4]
shutil.rmtree(self.session_path)

def assert_expected_outputs(self):
"""
After a run, asserts that all signature files are present at least once in the output files
Mainly useful for integration tests
:return:
"""
assert self.status == 0
everthing_is_fine = True
for expected_file in self.signature['output_files']:
actual_files = list(self.session_path.rglob(str(Path(expected_file[1]).joinpath(expected_file[0]))))
if len(actual_files) == 0:
everthing_is_fine = False
_logger.error(f"Signature file expected {expected_file} not found in the output")
if not everthing_is_fine:
for out in self.outputs:
_logger.error(f"{out}")
raise FileNotFoundError("Missing outputs after task completion")


class Pipeline(abc.ABC):
"""
Expand Down
1 change: 1 addition & 0 deletions ibllib/tests/test_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_task_to_pipeline(self):
("mock_ephys", "ephys"),
("sync_ephys", "ephys"),
("ephys", "ephys"),
("ephys_passive_opto", "ephys_passive_opto")
]
for typ, exp in pipe_out:
assert ibllib.io.extractors.base._get_pipeline_from_task_type(typ) == exp
Expand Down
3 changes: 3 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
### Release Notes 2.1.2 2021-10-14
- Fix issue with RawEphysQC that was not looking in local Subjects folder for data
- Fix ensure_required_data in DlcQc
### Release Notes 2.1.3 2021-10-19
- Split jobs.py run function in two, one running large tasks (video compression, dlc, spike sorting), one the rest
- Ensure RawEphysQC runs both probes if one fails

## Release Notes 2.0
### Release Notes 2.0.1 2021-08-07
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

setup(
name='ibllib',
version='2.1.2',
version='2.1.3',
python_requires='>={}.{}'.format(*REQUIRED_PYTHON),
description='IBL libraries',
license="MIT",
Expand Down

0 comments on commit 93be6b9

Please sign in to comment.