Skip to content

Commit

Permalink
Merge pull request #275 from JaerongA/dev
Browse files Browse the repository at this point in the history
Fix PyRat worker
  • Loading branch information
Thinh Nguyen authored Nov 6, 2023
2 parents 1914ae6 + 80849e0 commit 6f98e68
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
34 changes: 17 additions & 17 deletions aeon/dj_pipeline/subject.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
import os
import time
from datetime import datetime, timedelta

import requests
import json
import datajoint as dj
from datetime import datetime, timedelta
import requests

from . import get_schema_name, lab

Expand Down Expand Up @@ -65,7 +65,7 @@ def make(self, key):
"l": 10,
"eartag": eartag_or_id,
}
animal_resp = get_pyrat_data(endpoint=f"animals", params=params)
animal_resp = get_pyrat_data(endpoint="animals", params=params)
if len(animal_resp) == 0:
if self & key:
self.update1(
Expand Down Expand Up @@ -203,7 +203,7 @@ def get_reference_weight(cls, subject_name):

@schema
class PyratIngestionTask(dj.Manual):
"""Task to sync new animals from PyRAT"""
"""Task to sync new animals from PyRAT."""

definition = """
pyrat_task_scheduled_time: datetime # (UTC) scheduled time for task execution
Expand All @@ -212,14 +212,14 @@ class PyratIngestionTask(dj.Manual):

@schema
class PyratIngestion(dj.Imported):
"""Ingestion of new animals from PyRAT"""
"""Ingestion of new animals from PyRAT."""

definition = """
-> PyratIngestionTask
---
execution_time: datetime # (UTC) time of task execution
execution_duration: float # (s) duration of task execution
new_pyrat_entry_count: int # number of new PyRAT subject ingested in this round of ingestion
new_pyrat_entry_count: int # number of new PyRAT subject ingested in this round of ingestion
"""

key_source = (
Expand Down Expand Up @@ -250,7 +250,7 @@ def make(self, key):
new_eartags = []
for responsible_id in lab.User.fetch("responsible_id"):
# 1 - retrieve all animals from this user
animal_resp = get_pyrat_data(endpoint="animals", params=dict(responsible_id=responsible_id))
animal_resp = get_pyrat_data(endpoint="animals", params={"responsible_id": responsible_id})
for animal_entry in animal_resp:
# 2 - find animal with comment - Project Aeon
eartag_or_id = animal_entry["eartag_or_id"]
Expand Down Expand Up @@ -295,7 +295,7 @@ def make(self, key):

@schema
class PyratCommentWeightProcedure(dj.Imported):
"""Ingestion of new animals from PyRAT"""
"""Ingestion of new animals from PyRAT."""

definition = """
-> PyratIngestion
Expand All @@ -309,7 +309,7 @@ class PyratCommentWeightProcedure(dj.Imported):

def make(self, key):
execution_time = datetime.utcnow()
logger.info(f"Extracting weights/comments/procedures")
logger.info("Extracting weights/comments/procedures")

eartag_or_id = key["subject"]
comment_resp = get_pyrat_data(endpoint=f"animals/{eartag_or_id}/comments")
Expand Down Expand Up @@ -356,14 +356,12 @@ def make(self, key):

@schema
class CreatePyratIngestionTask(dj.Computed):
definition = """
definition = """
-> lab.User
"""

def make(self, key):
"""
Create one new PyratIngestionTask for every newly added user
"""
"""Create one new PyratIngestionTask for every newly added users."""
PyratIngestionTask.insert1({"pyrat_task_scheduled_time": datetime.utcnow()})
time.sleep(1)
self.insert1(key)
Expand Down Expand Up @@ -432,15 +430,14 @@ def make(self, key):
"import_order_request_id",
]


def get_pyrat_data(endpoint: str, params: dict = None, **kwargs):
base_url = "https://swc.pyrat.cloud/api/v3/"
pyrat_system_token = os.getenv("PYRAT_SYSTEM_TOKEN")
pyrat_user_token = os.getenv("PYRAT_USER_TOKEN")

if pyrat_system_token is None or pyrat_user_token is None:
raise ValueError(
f"The PYRAT tokens must be defined as an environment variable named 'PYRAT_SYSTEM_TOKEN' and 'PYRAT_USER_TOKEN'"
"The PYRAT tokens must be defined as an environment variable named 'PYRAT_SYSTEM_TOKEN' and 'PYRAT_USER_TOKEN'"
)

session = requests.Session()
Expand All @@ -460,4 +457,7 @@ def get_pyrat_data(endpoint: str, params: dict = None, **kwargs):

response = session.get(base_url + endpoint + params_str, **kwargs)

return response.json() if response.status_code == 200 else {"reponse code": response.status_code}
if response.status_code != 200:
raise requests.exceptions.HTTPError(f'PyRat API errored out with response code: {response.status_code}')

return response.json()
3 changes: 2 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ services:
pyrat_worker:
<<: *aeon-ingest-common
command: [ "aeon_ingest", "pyrat_worker" ]

env_file: ./.env

streams_worker:
<<: *aeon-ingest-common
depends_on:
Expand Down

0 comments on commit 6f98e68

Please sign in to comment.