Issue while connecting to GCP using Secrets / Impersonation Chain in DataprocCreateClusterOperator #13042
Unanswered
abhishekshenoy
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
We are using airflow 2.0.0b2 version and our airflow runs on GKE clusters.
The executor pods run with a service account that has permission to access secrets in Google Secret Manager.
When i pass the secret name (prefix and all correctly set) to gcp_conn_id , i see that though the secrets are correctly fetched it is not used to create a cluster in Dataproc. Rather the default account in the pod is used to create the cluser which does not have the dataproc admin role.
Task definition is below:
I get the below exception:
I know i am missing something here , i tried retrieving the connection and passing the connection.uri string to gcp_conn_id using templates but as gcp_conn_id is not a templated field the variable was not getting resolved.
I am still unable to establish a connection when i try gcp_conn_id with connections retrieved from secrets.
I moved from using secrets to using impersonation_chain by giving the service accounts with which my pods run serviceAccountTokenCreator role for .
The below Task definition works fine in my local Docker setup and i am able to create a cluster wherein my tasks are running as local executor.
The same when run on my dev setup throws the below excpetion , i do not understand why it is looking into the secret manager to get google_cloud_default information ? Any help would unblock me in proceeding with setting up a dataproc flow using Airflow. Have attached the file which has the whole exception stack trace.
impersonation_chain_exception.log
[2020-12-13 07:31:00,542] {taskinstance.py:1018} INFO - Starting attempt 1 of 2
[2020-12-13 07:31:00,542] {taskinstance.py:1019} INFO -
[2020-12-13 07:31:00,640] {taskinstance.py:1038} INFO - Executing <Task(DataprocCreateClusterOperator): create_cluster> on 2020-12-13T07:30:40.273638+00:00
[2020-12-13 07:31:00,645] {standard_task_runner.py:50} INFO - Started process 14 to run task
[2020-12-13 07:31:00,650] {standard_task_runner.py:74} INFO - Running: ['airflow', 'tasks', 'run', '4_spark_submit_dataproc', 'create_cluster', '2020-12-13T07:30:40.273638+00:00', '--job-id', '203', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/templates/4_spark_submit_dataproc.py', '--cfg-path', '/tmp/tmpeqcdvbvb']
[2020-12-13 07:31:00,651] {standard_task_runner.py:75} INFO - Job 203: Subtask create_cluster
[2020-12-13 07:31:01,189] {logging_mixin.py:103} INFO - Running <TaskInstance: 4_spark_submit_dataproc.create_cluster 2020-12-13T07:30:40.273638+00:00 [running]> on host 4sparksubmitdataproccreatecluster-8d0cc477352847c6a93d4b2568051
[2020-12-13 07:31:01,550] {taskinstance.py:1230} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=[email protected]
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=4_spark_submit_dataproc
AIRFLOW_CTX_TASK_ID=create_cluster
AIRFLOW_CTX_EXECUTION_DATE=2020-12-13T07:30:40.273638+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2020-12-13T07:30:40.273638+00:00
[2020-12-13 07:31:01,550] {dataproc.py:603} INFO - Creating cluster: first-cluster-setup
[2020-12-13 07:31:02,030] {secret_manager_client.py:89} ERROR - Google Cloud API Call Error (PermissionDenied): No access for Secret ID airflow-connections-google_cloud_default.
Did you add 'secretmanager.versions.access' permission?
[2020-12-13 07:31:02,103] {taskinstance.py:1396} ERROR - (psycopg2.errors.UndefinedColumn) column connection.description does not exist
LINE 1: ...id, connection.conn_type AS connection_conn_type, connection...
^
[SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
FROM connection
WHERE connection.conn_id = %(conn_id_1)s
LIMIT %(param_1)s]
[parameters: {'conn_id_1': 'google_cloud_default', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/f405)
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column connection.description does not exist
LINE 1: ...id, connection.conn_type AS connection_conn_type, connection...
^
Beta Was this translation helpful? Give feedback.
All reactions