Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Pass context keys from API Server to Workers #44899

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Dec 12, 2024

Part of #44481


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kaxil kaxil force-pushed the task-context-api-server branch from 0dda9cd to c751760 Compare December 13, 2024 11:32
@kaxil kaxil force-pushed the task-context-api-server branch from c751760 to ef7be4c Compare December 13, 2024 12:00
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks like a good approach


# TODO: `dag_id` and `run_id` are duplicated from TaskInstance
# See if we can avoid sending these fields from API server and instead
# use the TaskInstance data to get the DAG run information in the client (Task Execution Interface).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, or vice-versa.

class TIRunContext(BaseModel):
"""Response schema for TaskInstance run context."""

dag_run: DagRun
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have put this on the TI object, not in here, but potato potato

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, except it's not available/required in the TI start context (which is just the uuid and the 4/5-tuple). Right.

@@ -48,6 +51,108 @@
log = logging.getLogger(__name__)


@router.patch(
"/{task_instance_id}/run",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe

Suggested change
"/{task_instance_id}/run",
"/{task_instance_id}/start",

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about this, maybe run is better, as this could be called more than once in the case of things like reschedule couldn't it? (Yes, it's just naming so it doesn't matter, but names also do matter)

Comment on lines +88 to +92
ds = logical_date.strftime("%Y-%m-%d")
ds_nodash = ds.replace("-", "")
ts = logical_date.isoformat()
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
ts_nodash_with_tz = ts.replace("-", "").replace(":", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/future work: make these lazy

@@ -48,9 +48,13 @@ class RuntimeTaskInstance(TaskInstance):
model_config = ConfigDict(arbitrary_types_allowed=True)

task: BaseOperator
_ti_context_from_server: TIRunContext | None = None
"""The Task Instance context from the API server, if any."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets exclude this from the repr I think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

Successfully merging this pull request may close these issues.

2 participants