diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 2880b050..db3934a8 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -42,10 +42,18 @@ class Block(dj.Manual): @schema class BlockDetection(dj.Computed): - definition = """ + definition = """ # Detecting new block(s) for each new Chunk -> acquisition.Environment + --- + execution_time=null: datetime """ + class IdentifiedBlock(dj.Part): + definition = """ # the block(s) identified in this BlockDetection + -> master + -> Block + """ + key_source = acquisition.Environment - {"experiment_name": "social0.1-aeon3"} def make(self, key): @@ -70,12 +78,9 @@ def make(self, key): block_state_query = acquisition.Environment.BlockState & exp_key & chunk_restriction block_state_df = fetch_stream(block_state_query) if block_state_df.empty: - self.insert1(key) + # self.insert1(key) return - block_state_df.index = block_state_df.index.round( - "us" - ) # timestamp precision in DJ is only at microseconds block_state_df = block_state_df.loc[ (block_state_df.index > chunk_start) & (block_state_df.index <= chunk_end) ] @@ -103,7 +108,10 @@ def make(self, key): ) Block.insert(block_entries, skip_duplicates=True) - self.insert1(key) + # self.insert1({**key, "execution_time": datetime.now(UTC)}) + self.IdentifiedBlock.insert( + {**key, "block_start": entry["block_start"]} for entry in block_entries + ) # ---- Block Analysis and Visualization ---- @@ -316,6 +324,15 @@ def make(self, key): if _df.type.iloc[-1] != "Exit": subject_names.append(subject_name) + # Check for ExperimentTimeline to validate subjects in this block + timeline_query = (acquisition.ExperimentTimeline + & acquisition.ExperimentTimeline.Subject + & key + & f"start <= '{block_start}' AND end >= '{block_end}'") + timeline_subjects = (acquisition.ExperimentTimeline.Subject & timeline_query).fetch("subject") + if len(timeline_subjects): + subject_names = [s for s in subject_names if s in timeline_subjects] + if use_blob_position and len(subject_names) > 1: raise ValueError( f"Without SLEAPTracking, BlobPosition can only handle a single-subject block. "