【emergency!!!】occur error(TypeError: cannot serialize '_io.TextIOWrapper' object) in KubernetesExecutor #12979
Unanswered
zhangxiao696
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
when set dag use KubernetesExecutor with:
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
'executor_config': {
'KubernetesExecutor': {
'request_cpu': "200m",
'limit_cpu': "200m",
'request_memory': "500Mi",
'limit_memory': "500Mi"
}
}
},
)
the program raise exception:
[2020-12-10 10:02:08,142] {scheduler_job.py:1384} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1382, in _execute
self._execute_helper()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1453, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1515, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 155, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 803, in execute_async
self.task_queue.put((key, command, kube_executor_config))
File "", line 2, in put
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
But, if I remove executor_config in default_args of Dag, the program is working!!!
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
)
why???????
please help me!!!!
Beta Was this translation helpful? Give feedback.
All reactions