Skip to content

Commit

Permalink
Merge pull request #321 from ttngu207/datajoint_pipeline
Browse files Browse the repository at this point in the history
social0.2 ingestion and worker setup for automation
  • Loading branch information
JaerongA authored Feb 2, 2024
2 parents 27259a2 + 85b4c97 commit 804c77f
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 39 deletions.
18 changes: 17 additions & 1 deletion aeon/dj_pipeline/analysis/block_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aeon.dj_pipeline.analysis.visit import filter_out_maintenance_periods, get_maintenance_periods

schema = dj.schema(get_schema_name("block_analysis"))
logger = dj.logger


@schema
Expand Down Expand Up @@ -74,6 +75,21 @@ def make(self, key):
key["experiment_name"], block_start, block_end
)

# Ensure the relevant streams ingestion are caught up to this block
chunk_keys = (acquisition.Chunk & key & chunk_restriction).fetch("KEY")
streams_tables = (
streams.UndergroundFeederDepletionState,
streams.UndergroundFeederBeamBreak,
streams.UndergroundFeederEncoder,
tracking.SLEAPTracking,
)
for streams_table in streams_tables:
if len(streams_table & chunk_keys) < len(streams_table.key_source & chunk_keys):
logger.info(
f"{streams_table.__name__} not yet fully ingested for block: {key}. Skip BlockAnalysis (to retry later)..."
)
return

self.insert1({**key, "block_duration": (block_end - block_start).total_seconds() / 3600})

# Patch data - TriggerPellet, DepletionState, Encoder (distancetravelled)
Expand Down Expand Up @@ -117,7 +133,7 @@ def make(self, key):
encoder_df, maintenance_period, block_end, dropna=True
)

encoder_df["distance_travelled"] = analysis_utils.distancetravelled(encoder_df.angle)
encoder_df["distance_travelled"] = -1 * analysis_utils.distancetravelled(encoder_df.angle)

patch_rate = depletion_state_df.rate.unique()
assert len(patch_rate) == 1 # expects a single rate for this block
Expand Down
4 changes: 2 additions & 2 deletions aeon/dj_pipeline/docs/notebooks/diagram.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"datajoint_analysis_diagram.svg datajoint_overview_diagram.svg \u001b[0m\u001b[01;34mnotebooks\u001b[0m/\r\n"
"datajoint_analysis_diagram.svg datajoint_overview_diagram.svg \u001B[0m\u001B[01;34mnotebooks\u001B[0m/\r\n"
]
}
],
Expand Down Expand Up @@ -898,4 +898,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
10 changes: 8 additions & 2 deletions aeon/dj_pipeline/populate/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@
import datajoint as dj
from datajoint_utilities.dj_worker import parse_args

from aeon.dj_pipeline.populate.worker import acquisition_worker, logger, mid_priority, streams_worker, pyrat_worker
from aeon.dj_pipeline.populate.worker import (
acquisition_worker,
logger,
analysis_worker,
streams_worker,
pyrat_worker,
)

# ---- some wrappers to support execution as script or CLI

configured_workers = {
"acquisition_worker": acquisition_worker,
"mid_priority": mid_priority,
"analysis_worker": analysis_worker,
"streams_worker": streams_worker,
"pyrat_worker": pyrat_worker,
}
Expand Down
18 changes: 12 additions & 6 deletions aeon/dj_pipeline/populate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
"WorkerLog",
"ErrorLog",
"logger",
"AutomatedExperimentIngestion",
]

# ---- Some constants ----
logger = dj.logger
worker_schema_name = db_prefix + "worker"


# ---- Manage experiments for automated ingestion ----

schema = dj.Schema(worker_schema_name)
Expand Down Expand Up @@ -58,11 +58,13 @@ def ingest_environment_visits():
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
max_idled_cycle=6,
sleep_duration=1200,
)
acquisition_worker(ingest_epochs_chunks)
acquisition_worker(acquisition.Environment)
acquisition_worker(ingest_environment_visits)
acquisition_worker(acquisition.EpochActiveRegion)
# acquisition_worker(ingest_environment_visits)
acquisition_worker(block_analysis.BlockDetection)

# configure a worker to handle pyrat sync
Expand All @@ -71,7 +73,8 @@ def ingest_environment_visits():
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=10,
max_idled_cycle=400,
sleep_duration=30,
)

pyrat_worker(subject.CreatePyratIngestionTask)
Expand All @@ -85,7 +88,8 @@ def ingest_environment_visits():
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=1200,
max_idled_cycle=3,
sleep_duration=10,
)

for attr in vars(streams).values():
Expand All @@ -101,7 +105,9 @@ def ingest_environment_visits():
worker_schema_name=worker_schema_name,
db_prefix=db_prefix,
run_duration=-1,
sleep_duration=3600,
max_idled_cycle=6,
sleep_duration=1200,
)

analysis_worker(block_analysis.BlockAnalysis)
analysis_worker(block_analysis.BlockAnalysis, max_calls=6)
analysis_worker(block_analysis.BlockPlots, max_calls=6)
2 changes: 1 addition & 1 deletion aeon/dj_pipeline/utils/load_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
logger = dj.logger
_weight_scale_rate = 100
_weight_scale_nest = 1
_aeon_schemas = ["social01"]
_aeon_schemas = ["social01", "social02"]


def insert_stream_types():
Expand Down
69 changes: 46 additions & 23 deletions aeon/dj_pipeline/webapps/sciviz/specsheet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ SciViz:
route: /exp_form
x: 0
y: 0
height: 0.5
height: 0.4
width: 1
type: form
tables:
Expand Down Expand Up @@ -139,7 +139,7 @@ SciViz:
New Experiment Subject:
route: /exp_subject_form
x: 0
y: 0.5
y: 0.4
height: 0.3
width: 1
type: form
Expand All @@ -156,7 +156,7 @@ SciViz:
New Experiment Note:
route: /exp_note_form
x: 0
y: 0.8
y: 0.7
height: 0.3
width: 1
type: form
Expand All @@ -176,10 +176,33 @@ SciViz:
input: Note
destination: note

New Experiment Directory:
route: /exp_note_form
x: 0
y: 1.0
height: 0.3
width: 1
type: form
tables:
- aeon_acquisition.Experiment.Directory
map:
- type: table
input: Experiment Name
destination: aeon_acquisition.Experiment
- type: table
input: Directory Type
destination: aeon_acquisition.DirectoryType
- type: table
input: Pipeline Repository
destination: aeon_acquisition.PipelineRepository
- type: attribute
input: Directory Path
destination: directory_path

New Experiment Type:
route: /exp_type_form
x: 0
y: 1.1
y: 1.3
height: 0.3
width: 1
type: form
Expand Down Expand Up @@ -538,8 +561,8 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
return {'query': aeon_analysis.Block * aeon_analysis.BlockAnalysis, 'fetch_args': {'order_by': 'block_end DESC'}}
PerBlockReport:
Expand All @@ -563,8 +586,8 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
query = aeon_analysis.Block * aeon_analysis.BlockAnalysis
return dict(query=query, fetch_args=[])
comp2:
Expand All @@ -578,9 +601,9 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
return {'query': aeon_test_analysis.BlockPlots(), 'fetch_args': ['subject_positions_plot']}
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
return {'query': aeon_block_analysis.BlockPlots(), 'fetch_args': ['subject_positions_plot']}
comp3:
route: /subject_weights_plot
x: 0
Expand All @@ -592,9 +615,9 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
return {'query': aeon_test_analysis.BlockPlots(), 'fetch_args': ['subject_weights_plot']}
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
return {'query': aeon_block_analysis.BlockPlots(), 'fetch_args': ['subject_weights_plot']}
comp4:
route: /patch_distance_travelled_plot
Expand All @@ -607,9 +630,9 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
return {'query': aeon_test_analysis.BlockPlots(), 'fetch_args': ['patch_distance_travelled_plot']}
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
return {'query': aeon_block_analysis.BlockPlots(), 'fetch_args': ['patch_distance_travelled_plot']}
comp5:
route: /patch_rate_plot
Expand All @@ -622,9 +645,9 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
return {'query': aeon_test_analysis.BlockPlots(), 'fetch_args': ['patch_rate_plot']}
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
return {'query': aeon_block_analysis.BlockPlots(), 'fetch_args': ['patch_rate_plot']}
comp6:
route: /cumulative_pellet_plot
Expand All @@ -637,9 +660,9 @@ SciViz:
def restriction(**kwargs):
return dict(**kwargs)
dj_query: >
def dj_query(aeon_test_analysis):
aeon_analysis = aeon_test_analysis
return {'query': aeon_test_analysis.BlockPlots(), 'fetch_args': ['cumulative_pellet_plot']}
def dj_query(aeon_block_analysis):
aeon_analysis = aeon_block_analysis
return {'query': aeon_block_analysis.BlockPlots(), 'fetch_args': ['cumulative_pellet_plot']}
VideoStream:
route: /videostream
Expand Down
8 changes: 4 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ services:
condition: service_started
deploy:
mode: replicated
replicas: 3
replicas: 2
command: [ "aeon_ingest", "streams_worker" ]

ingest_mid:
analysis_worker:
<<: *aeon-ingest-common
depends_on:
acquisition_worker:
condition: service_started
deploy:
mode: replicated
replicas: 2
command: [ "aeon_ingest", "mid_priority" ]
replicas: 1
command: [ "aeon_ingest", "analysis_worker" ]

0 comments on commit 804c77f

Please sign in to comment.