Skip to content

Commit

Permalink
Merge pull request #7 from AllenNeuralDynamics/refactor
Browse files Browse the repository at this point in the history
Simply factory logic
  • Loading branch information
bruno-f-cruz authored Nov 10, 2024
2 parents 2cd5581 + 4294739 commit 55a3b6b
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 283 deletions.
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,54 @@ A repository with core primitives for analysis shared across all `Aind.Behavior`

This repository is part of a bigger infrastructure that is summarized [here](https://github.com/AllenNeuralDynamics/Aind.Behavior.Services).

## Getting started and API usage

The current goal of the API is to provide users with a way to instantiate "data contracts" and corresponding data ingestion logic. For instance, loading the data from different streams and converting them into a common format (e.g. `pandas.DataFrame`) can be done by:

```python

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Union

from aind_behavior_core_analysis.io._core import DataStreamCollection
from aind_behavior_core_analysis.io.data_stream import (
CsvStream,
DataStreamCollectionFromFilePattern,
HarpDataStreamCollectionFactory,
SoftwareEventStream,
)

NodeType = Union[Dict[str, DataStreamCollection], DataStreamCollection]

@dataclass
class DataContract:
behavior: Dict[str, NodeType]

root_path = Path(r"test_2024-11-05T190325Z")

dataset = DataContract(
behavior={
"Behavior": HarpDataStreamCollectionFactory(
path=root_path / "behavior" / "Behavior.harp", default_inference_mode="register_0"
).build(),
"LoadCells": HarpDataStreamCollectionFactory(
path=root_path / "behavior" / "LoadCells.harp", default_inference_mode="register_0"
).build(),
"RendererSynchState": DataStreamCollectionFromFilePattern(
path=root_path / "behavior" / "Renderer", pattern="RendererSynchState.csv", stream_type=CsvStream
).build(),
"SoftwareEvents": DataStreamCollectionFromFilePattern(
path=root_path / "behavior" / "SoftwareEvents", pattern="*.json", stream_type=SoftwareEventStream
).build(),
},
)

load_cell_data = dataset.behavior["LoadCells"]["LoadCellData"].load()
load_cell_data.plot()

```

## Installing and Upgrading

if you choose to clone the repository, you can install the package by running the following command from the root directory of the repository:
Expand Down
257 changes: 44 additions & 213 deletions src/aind_behavior_core_analysis/io/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,20 @@
from typing import (
Any,
Generic,
List,
NamedTuple,
Optional,
Protocol,
Sequence,
Type,
Self,
TypeVar,
overload,
runtime_checkable,
)

from aind_behavior_core_analysis.io._utils import StrPattern

TData = TypeVar("TData", bound=Any)


class DataStream(abc.ABC, Generic[TData]):
_data: Optional[TData]
_path: Optional[Path]
_name: str
_auto_load: bool

def __init__(
self,
/,
Expand Down Expand Up @@ -62,18 +59,13 @@ def name(self) -> str:
def path(self) -> Optional[Path]:
return self._path

@path.setter
def path(self, value: PathLike) -> None:
self._path = Path(value)

@classmethod
@abc.abstractmethod
def _file_reader(cls, path: PathLike, *args, **kwargs) -> Any:
pass

@classmethod
@abc.abstractmethod
def _reader(cls, value, *args, **kwargs) -> TData:
def _parser(self, *args, **kwargs) -> TData:
pass

@property
Expand All @@ -87,14 +79,23 @@ def data(self) -> TData:
)
return self._data

def load(self, /, path: Optional[PathLike] = None, *, force_reload: bool = False, **kwargs) -> TData:
def load(self) -> TData:
return self._load()

def reload(self) -> TData:
return self._load(force_reload=True)

def from_file(self, path: PathLike) -> Any:
return self._parser(self._file_reader(path))

def _load(self, /, path: Optional[PathLike] = None, *, force_reload: bool = False, **kwargs) -> TData:
if force_reload is False and self._data:
pass
else:
path = Path(path) if path is not None else self.path
if path:
self.path = path
self._data = self._reader(self._file_reader(path))
self._path = path
self._data = self._parser(self._file_reader(path))
else:
raise ValueError("Path attribute is not defined. Cannot load data.")
return self._data
Expand All @@ -103,203 +104,14 @@ def __str__(self) -> str:
return f"{self.__class__.__name__} stream with data{'' if self._data is not None else 'not'} loaded."


@runtime_checkable
class _DataStreamSourceBuilder(Protocol):
def build(self, /, source: Optional[DataStreamSource] = None, **kwargs) -> StreamCollection:
...


class DataStreamBuilderPattern(NamedTuple):
stream_type: Type[DataStream]
pattern: StrPattern


class DataStreamSource:
"""Represents a data stream source, usually comprised of various files from a single folder.
These folders usually result from a single data acquisition logger"""

@overload
def __init__(
self,
/,
path: PathLike,
builder: Type[DataStream],
*,
name: Optional[str] = None,
auto_load: bool = False,
**kwargs,
) -> None:
...

@overload
def __init__(
self,
/,
path: PathLike,
builder: DataStreamBuilderPattern,
*,
name: Optional[str] = None,
auto_load: bool = False,
**kwargs,
) -> None:
...

@overload
def __init__(
self,
/,
path: PathLike,
builder: Sequence[DataStreamBuilderPattern],
*,
name: Optional[str] = None,
auto_load: bool = False,
**kwargs,
) -> None:
...

@overload
def __init__(
self,
/,
path: PathLike,
builder: _DataStreamSourceBuilder,
*,
name: Optional[str] = None,
auto_load: bool = False,
**kwargs,
) -> None:
...

@overload
def __init__(
self,
/,
path: PathLike,
builder: None,
*,
name: Optional[str] = None,
auto_load: bool = False,
**kwargs,
) -> None:
...

def __init__(
self,
/,
path: PathLike,
builder: None
| Type[DataStream]
| DataStreamBuilderPattern
| Sequence[DataStreamBuilderPattern]
| _DataStreamSourceBuilder = None,
*,
name: Optional[str] = None,
auto_load: bool = False,
**kwargs,
) -> None:
self._streams: StreamCollection
self._path = Path(path)

if not self._path.is_dir():
raise FileExistsError(f"Path {self._path} is not a directory")
self._name = name if name else self._path.name

# Build the StreamCollection object
self._builder = builder

if self._builder is None:
raise NotImplementedError(
"builder must not be provided. Support for automatic inference is not yet implemented."
)
if isinstance(self._builder, _DataStreamSourceBuilder):
self._streams = self._builder.build(self)

elif isinstance(self._builder, (type(DataStream), Sequence)):
self._builder = self._normalize_builder_from_data_stream(self._builder)
self._streams = self._build_from_data_stream(self.path, self._builder)

else:
raise TypeError("Builder type is not supported.")

if auto_load is True:
self.reload_streams()

@staticmethod
def _normalize_builder_from_data_stream(
builder: Type[DataStream] | DataStreamBuilderPattern | Sequence
) -> Sequence[DataStreamBuilderPattern]:
_builder: Sequence
if isinstance(builder, type(DataStream)): # If only a single data stream class is provided
_builder = (DataStreamBuilderPattern(stream_type=builder, pattern="*"),)
if isinstance(builder, DataStreamBuilderPattern): # If only a single data stream class is provided
_builder = (builder,)

for _tuple in _builder:
if not isinstance(_tuple.stream_type, type(DataStream)):
raise ValueError("builder must be a DataStream type")
return _builder

@property
def name(self) -> str:
return self._name

@property
def path(self) -> Path:
if self._path is None:
raise ValueError("Path is not defined")
return Path(self._path)

@property
def streams(self) -> StreamCollection:
return self._streams

@staticmethod
def _get_data_streams_helper(
path: PathLike, stream_type: Type[DataStream], pattern: StrPattern
) -> List[DataStream]:
_path = Path(path)
if isinstance(pattern, str):
pattern = [pattern]
files: List[Path] = []
for pat in pattern:
files.extend(_path.glob(pat))
files = list(set(files))
streams: List[DataStream] = [stream_type(file) for file in files]
return streams

@classmethod
def _build_from_data_stream(cls, path: PathLike, builder: Sequence[DataStreamBuilderPattern]) -> StreamCollection:
streams = StreamCollection()
for stream_builder in builder:
_this_type_stream = cls._get_data_streams_helper(path, stream_builder.stream_type, stream_builder.pattern)
for stream in _this_type_stream:
if stream.name is None:
raise ValueError(f"Stream {stream} does not have a name")
else:
streams.try_append(stream.name, stream)
return streams

def reload_streams(self, force_reload: bool = False) -> None:
for stream in self._streams.values():
stream.load(force_reload=force_reload)

def __str__(self) -> str:
return f"DataStreamSource from {self._path}" + f"\n{str(self._streams)}"

def __repr__(self) -> str:
return f"DataStreamSource from {self._path}"

def __getitem__(self, key: str) -> DataStream:
return self._streams[key]

def __iter__(self):
return self._streams.__iter__()
class DataStreamCollectionFactory(abc.ABC):
@abc.abstractmethod
def build(self) -> DataStreamCollection: ...

def __next__(self):
return self._streams.__next__()

class DataStreamCollection(UserDict[str, DataStream]):
"""Represents a collection of data streams."""

class StreamCollection(UserDict[str, DataStream]):
def __str__(self):
table = []
table.append(["Stream Name", "Stream Type", "Is Loaded"])
Expand All @@ -320,7 +132,7 @@ def __str__(self):

return table_str

def try_append(self, key: str, value: DataStream) -> None:
def try_append(self, key: str, value: DataStream) -> Self:
"""
Tries to append a key-value pair to the dictionary.
Expand All @@ -335,3 +147,22 @@ def try_append(self, key: str, value: DataStream) -> None:
raise KeyError(f"Key {key} already exists in dictionary")
else:
self[key] = value
return self

def merge(self, *others: DataStreamCollection) -> Self:
for other in others:
for key, value in other.items():
_ = self.try_append(key, value)
return self

def reload_streams(self) -> None:
for stream in self.values():
stream.reload()

def load_streams(self) -> None:
for stream in self.values():
stream.load()

@classmethod
def from_merge(cls, *others: DataStreamCollection) -> DataStreamCollection:
return cls().merge(*others)
Loading

0 comments on commit 55a3b6b

Please sign in to comment.