-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-72: Pass context keys from API Server to Workers #44899
base: main
Are you sure you want to change the base?
Conversation
0dda9cd
to
c751760
Compare
c751760
to
ef7be4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah looks like a good approach
|
||
# TODO: `dag_id` and `run_id` are duplicated from TaskInstance | ||
# See if we can avoid sending these fields from API server and instead | ||
# use the TaskInstance data to get the DAG run information in the client (Task Execution Interface). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, or vice-versa.
class TIRunContext(BaseModel): | ||
"""Response schema for TaskInstance run context.""" | ||
|
||
dag_run: DagRun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have put this on the TI object, not in here, but potato potato
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, except it's not available/required in the TI start context (which is just the uuid and the 4/5-tuple). Right.
@@ -48,6 +51,108 @@ | |||
log = logging.getLogger(__name__) | |||
|
|||
|
|||
@router.patch( | |||
"/{task_instance_id}/run", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
"/{task_instance_id}/run", | |
"/{task_instance_id}/start", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about this, maybe run is better, as this could be called more than once in the case of things like reschedule couldn't it? (Yes, it's just naming so it doesn't matter, but names also do matter)
ds = logical_date.strftime("%Y-%m-%d") | ||
ds_nodash = ds.replace("-", "") | ||
ts = logical_date.isoformat() | ||
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S") | ||
ts_nodash_with_tz = ts.replace("-", "").replace(":", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit/future work: make these lazy
@@ -48,9 +48,13 @@ class RuntimeTaskInstance(TaskInstance): | |||
model_config = ConfigDict(arbitrary_types_allowed=True) | |||
|
|||
task: BaseOperator | |||
_ti_context_from_server: TIRunContext | None = None | |||
"""The Task Instance context from the API server, if any.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets exclude this from the repr I think
Part of #44481
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.