Skip to content

Commit

Permalink
Merge pull request #271 from ttngu207/datajoint_pipeline_pyrat
Browse files Browse the repository at this point in the history
Added `lab_id` and `available` to pyrat subjects
  • Loading branch information
JaerongA authored Nov 2, 2023
2 parents af44531 + 6b16fe7 commit 6a52fe2
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
3 changes: 3 additions & 0 deletions aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,9 @@ def _get_all_chunks(experiment_name, device_name):
if data_dir
}

if not raw_data_dirs:
raise ValueError(f"No raw data directory found for experiment: {experiment_name}")

chunkdata = io_api.load(
root=raw_data_dirs.values(),
reader=io_reader.Chunk(pattern=device_name + "*", extension="csv"),
Expand Down
9 changes: 4 additions & 5 deletions aeon/dj_pipeline/populate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datajoint_utilities.dj_worker.worker_schema import is_djtable

from aeon.dj_pipeline import subject, acquisition, analysis, db_prefix, qc, report, tracking
from aeon.dj_pipeline.utils import load_metadata, streams_maker
from aeon.dj_pipeline.utils import streams_maker

streams = streams_maker.main()

Expand Down Expand Up @@ -34,9 +34,8 @@ class AutomatedExperimentIngestion(dj.Manual):
"""


def ingest_colony_epochs_chunks():
"""Load and insert subjects from colony.csv. Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion."""
load_metadata.ingest_subject()
def ingest_epochs_chunks():
"""Ingest epochs and chunks for experiments specified in AutomatedExperimentIngestion."""
experiment_names = AutomatedExperimentIngestion.fetch("experiment_name")
for experiment_name in experiment_names:
acquisition.Epoch.ingest_epochs(experiment_name)
Expand All @@ -58,7 +57,7 @@ def ingest_environment_visits():
run_duration=-1,
sleep_duration=1200,
)
acquisition_worker(ingest_colony_epochs_chunks)
acquisition_worker(ingest_epochs_chunks)
acquisition_worker(acquisition.ExperimentLog)
acquisition_worker(acquisition.SubjectEnterExit)
acquisition_worker(acquisition.SubjectWeight)
Expand Down
50 changes: 39 additions & 11 deletions aeon/dj_pipeline/subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ class SubjectDetail(dj.Imported):
definition = """
-> Subject
---
-> lab.User.proj(responsible_user="user")
lab_id='': varchar(128) # pyrat 'labid'
responsible_fullname='': varchar(128)
-> [nullable] GeneticBackground
-> Strain
cage_number: varchar(32)
-> [nullable] Strain
cage_number='': varchar(32)
available=1: bool # is this animal available on pyrat
"""

def make(self, key):
eartag_or_id = key["subject"]
# cage id, sex, line/strain, genetic background, dob, weight history
# cage id, sex, line/strain, genetic background, dob, lab id
params = {
"k": _pyrat_animal_attributes,
"s": "eartag_or_id:asc",
Expand All @@ -64,27 +66,45 @@ def make(self, key):
"eartag": eartag_or_id,
}
animal_resp = get_pyrat_data(endpoint=f"animals", params=params)
assert len(animal_resp) == 1, f"Found {len(animal_resp)} with eartag {eartag_or_id}, expect one"
animal_resp = animal_resp[0]
if len(animal_resp) == 0:
if self & key:
self.update1(
{
**key,
"available": False,
}
)
else:
self.insert1(
{
**key,
"available": False,
}
)
return
elif len(animal_resp) > 1:
raise ValueError(f"Found {len(animal_resp)} with eartag {eartag_or_id}, expect one")
else:
animal_resp = animal_resp[0]

# Insert new subject
subj_key = {"subject": eartag_or_id}
Subject.update1(
{
**subj_key,
**key,
"sex": {"f": "F", "m": "M", "?": "U"}[animal_resp["sex"]],
"subject_birth_date": animal_resp["dateborn"],
}
)
user = (lab.User & {"responsible_id": animal_resp["responsible_id"]}).fetch1("user")
Strain.insert1(
{"strain_id": animal_resp["strain_id"], "strain_name": animal_resp["strain_id"]},
skip_duplicates=True,
)
entry = {
**key,
"responsible_user": user,
"responsible_fullname": animal_resp["responsible_fullname"],
"strain_id": animal_resp["strain_id"],
"cage_number": animal_resp["cagenumber"],
"lab_id": animal_resp["labid"],
}
if animal_resp["gen_bg_id"] is not None:
GeneticBackground.insert1(
Expand Down Expand Up @@ -249,14 +269,22 @@ class PyratCommentWeightProcedure(dj.Imported):
execution_duration: float # (s) duration of task execution
"""

key_source = (PyratIngestion * SubjectDetail) & "available = 1"

def make(self, key):
execution_time = datetime.utcnow()
logger.info(f"Extracting weights/comments/procedures")

eartag_or_id = key["subject"]
comment_resp = get_pyrat_data(endpoint=f"animals/{eartag_or_id}/comments")
if comment_resp == {"reponse code": 404}:
raise ValueError(f"{eartag_or_id} could not be found in Pyrat")
SubjectDetail.update1(
{
**key,
"available": False,
}
)
return

for cmt in comment_resp:
cmt["subject"] = eartag_or_id
Expand Down
3 changes: 3 additions & 0 deletions aeon/dj_pipeline/utils/load_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
from aeon.dj_pipeline.utils import streams_maker
from aeon.io import api as io_api

logger = dj.logger
_weight_scale_rate = 100
_weight_scale_nest = 1
_colony_csv_path = pathlib.Path("/ceph/aeon/aeon/colony/colony.csv")


def ingest_subject(colony_csv_path: pathlib.Path = _colony_csv_path) -> None:
"""Ingest subject information from the colony.csv file."""
logger.warning("The use of 'colony.csv' is deprecated starting Nov 2023", DeprecationWarning)

colony_df = pd.read_csv(colony_csv_path, skiprows=[1, 2])
colony_df.rename(columns={"Id": "subject"}, inplace=True)
colony_df["sex"] = "U"
Expand Down

0 comments on commit 6a52fe2

Please sign in to comment.