Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix short circuit in mapped tasks #44925

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

shahar1
Copy link
Contributor

@shahar1 shahar1 commented Dec 13, 2024

closes: #43883


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@@ -49,46 +49,45 @@ def _get_dep_statuses(self, ti, session, dep_context):
finished_task_ids = {t.task_id for t in finished_tis}

for parent in upstream:
if isinstance(parent, SkipMixin):
Copy link
Contributor Author

@shahar1 shahar1 Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I though of making it:
if isinstance(parent, (SkipMixin, MappedOpeartor))

But then I saw that tests pass without this if - so I decided going with Occam's razor and simply remove it.

Comment on lines +815 to +816
op.downstream_task_ids = self.downstream_task_ids
op.upstream_task_ids = self.upstream_task_ids
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr following up #43883 (comment) - it seems that these attributes were missing here, so that's why PythonOperator printed misleading logs

Comment on lines +125 to +128
# The following could be applied only for non-mapped tasks
if map_index == -1:
SkipMixin._set_state_to_skipped(dag_run, task_ids_list, session)
session.commit()
Copy link
Contributor Author

@shahar1 shahar1 Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block could theoretically be removed, and then setting the skipped states will be done exclusively by NotPreviouslySkippedDep for both mapped and non-mapped.
Not sure regarding effects on performance, so I left it as-is for now (if we decide to remove it - some other tests will have to be adjusted).

@shahar1 shahar1 requested a review from eladkal December 13, 2024 23:03
@shahar1 shahar1 force-pushed the fix-dtm-short-circuit-main branch from b13c1dc to 9d0aee7 Compare December 13, 2024 23:06
@shahar1 shahar1 force-pushed the fix-dtm-short-circuit-main branch 4 times, most recently from dd54028 to f0ad8dc Compare December 14, 2024 10:13
@shahar1 shahar1 force-pushed the fix-dtm-short-circuit-main branch from f0ad8dc to a2e62b3 Compare December 14, 2024 20:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MappedTasks don't short-circuit
1 participant