-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MappedTasks don't short-circuit #43883
Comments
I was able to get the desired behaviour by creating a task group, pulling the from random import random
from typing import List
from airflow.decorators import dag, task, task_group
from airflow.models import XCom
from pendulum import now
@dag(
"test_foo",
schedule=None,
start_date=now(),
dag_display_name="Test random things",
)
def test_foo():
@task.python
def start() -> List[int]:
return [i for i in range(1, 11)]
@task_group()
def tg():
@task.short_circuit
def step_one(i: int) -> bool:
print(f"Hello from step {i}")
return random() >= 0.5
@task.python(trigger_rule="one_done")
def step_two(do: bool):
if not do:
print("Should've been skipped.")
print("Doing stuff")
for i in XCom.get_one(key="return_value", task_id="start", run_id=XCom.run_id):
step_two(step_one(i))
start() >> tg()
test_foo()
if __name__ == "__main__":
test_foo().test() |
I'm seeing this on 2.9.3, too. |
@eladkal we have upgraded from airflow 2.9.3 to 2.10.3, we have similar issue. where we dont see logs on any task. We getting error. "No task logs found. Try the Event Log tab for more context." May I know is this resolved? |
Hey @Chais, I've managed to reproduce the issue using the code snippets that you provided. airflow/airflow/operators/python.py Line 315 in b5f033a
I'll try to work on a fix. Edit: See my response below. |
Shouldn’t |
After spending some more time, I've figured that you should be correct, and the problem seems to be actually here (see TODO comment just above it): airflow/airflow/models/taskinstance.py Line 1331 in caa90a1
Until this point, The hacky solution seems to be pushing the |
Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping. |
You're correct, I think that now I understand better what's going on :) |
Apache Airflow version
2.10.3
If "Other Airflow 2 version" selected, which one?
No response
What happened?
It seems like tasks created via
.expand()
do not inherit the downstream tasks of its parent/original. This becomes an issue when we expand atask.short_circuit
because short-circuiting no longer works.What you think should happen instead?
My understanding from the documentation is, that
step_two.expand(do=step_one.expand(i=start()))
is supposed to be equivalent to:How to reproduce
Obviously approximately 5 of the 10 tasks are bound to return
False
and short-circuit. If they do the log will read something like this:But checking the
task_dict
variable in the module, we see thatstep_one
does havestep_two
set as downstream task and vice-versa,step_two
hasstep_one
set as upstream task.Operating System
24.04.1 LTS (Noble Numbat)
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
I'm testing this on Standalone
Anything else?
I can get closer to the desired behaviour by using a list comprehension in the dependency, but that can't be done dynamically, I can't iterate
PlainXComArg
s and it also doesn't produce exactly the desired behaviour.Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: