Skip to content

Commit

Permalink
Fix offline SQL generation of migrations (#44790)
Browse files Browse the repository at this point in the history
* Fix offline SQL generation of migrations

Offline SQL mode is broken, and this provides a fix for it, including
a test to run the offline migration through the CI.

Changes:
 - refactored the migrations to remove dependency on inspectors and sessions
 - Updated the minimum version of the offline migration to 2.7.0, which
 is the oldest migration file we have in AF 3

* fixup! Fix offline SQL generation of migrations

* fixup! fixup! Fix offline SQL generation of migrations

* properly handle index dropping and fix raw sql issue

* sqlite lite is not supported for offline mode

* remove sa.text in op.execute

* check foreign key doesn't exist before creating it

* Apply suggestions from code review

Co-authored-by: Kaxil Naik <[email protected]>

* use procedure for mysql

---------

Co-authored-by: Kaxil Naik <[email protected]>
  • Loading branch information
ephraimbuddy and kaxil authored Dec 13, 2024
1 parent 957bd24 commit d28caa7
Show file tree
Hide file tree
Showing 11 changed files with 2,156 additions and 1,983 deletions.
14 changes: 14 additions & 0 deletions .github/actions/migration_tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ runs:
COMPOSE_PROJECT_NAME: "docker-compose"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
- name: "Bring compose down again"
shell: bash
run: breeze down
env:
COMPOSE_PROJECT_NAME: "docker-compose"
- name: "Test offline migration ${{env.BACKEND}}"
shell: bash
run: >
breeze shell "airflow db reset -y &&
airflow db downgrade -n 2.7.0 -y &&
airflow db migrate -s"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
if: env.BACKEND != 'sqlite'
- name: "Bring any containers left down"
shell: bash
run: breeze down
Expand Down
45 changes: 45 additions & 0 deletions airflow/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,48 @@ def disable_sqlite_fkeys(op):
op.execute("PRAGMA foreign_keys=on")
else:
yield op


def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op):
"""Older Mysql versions do not support DROP FOREIGN KEY IF EXISTS."""
op.execute(f"""
CREATE PROCEDURE DropForeignKeyIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'FOREIGN KEY'
) THEN
ALTER TABLE {table_name}
DROP CONSTRAINT {constraint_name};
ELSE
SELECT 1;
END IF;
END;
CALL DropForeignKeyIfExists();
DROP PROCEDURE DropForeignKeyIfExists;
""")


def mysql_drop_index_if_exists(index_name, table_name, op):
"""Older Mysql versions do not support DROP INDEX IF EXISTS."""
op.execute(f"""
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{index_name}' AND
CONSTRAINT_TYPE = 'INDEX'
) THEN
ALTER TABLE {table_name}
DROP INDEX {index_name};
ELSE
SELECT 1;
END IF;
""")
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

from __future__ import annotations

from typing import TYPE_CHECKING

import sqlalchemy as sa
from alembic import op
from sqlalchemy import inspect

from airflow.migrations.utils import mysql_drop_foreignkey_if_exists
from airflow.models import ID_LEN
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
revision = "5f2621c13b39"
Expand All @@ -39,91 +41,198 @@
depends_on = None
airflow_version = "2.10.3"

if TYPE_CHECKING:
from alembic.operations.base import BatchOperations
from sqlalchemy.sql.elements import conv


def _rename_fk_constraint(
*,
batch_op: BatchOperations,
original_name: str | conv,
new_name: str | conv,
referent_table: str,
local_cols: list[str],
remote_cols: list[str],
ondelete: str,
) -> None:
batch_op.drop_constraint(original_name, type_="foreignkey")
batch_op.create_foreign_key(
constraint_name=new_name,
referent_table=referent_table,
local_cols=local_cols,
remote_cols=remote_cols,
ondelete=ondelete,
)
def mysql_create_foreignkey_if_not_exists(
constraint_name, table_name, column_name, ref_table, ref_column, op
):
op.execute(f"""
CREATE PROCEDURE create_foreign_key_if_not_exists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'FOREIGN KEY'
) THEN
SELECT 1;
ELSE
ALTER TABLE {table_name}
ADD CONSTRAINT {constraint_name} FOREIGN KEY ({column_name})
REFERENCES {ref_table}({ref_column})
ON DELETE CASCADE;
END IF;
END;
CALL create_foreign_key_if_not_exists();
DROP PROCEDURE create_foreign_key_if_not_exists;
""")


def postgres_create_foreignkey_if_not_exists(
constraint_name, table_name, column_name, ref_table, ref_column, op
):
op.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.table_constraints
WHERE constraint_type = 'FOREIGN KEY'
AND constraint_name = '{constraint_name}'
) THEN
ALTER TABLE {table_name}
ADD CONSTRAINT {constraint_name}
FOREIGN KEY ({column_name})
REFERENCES {ref_table} ({ref_column})
ON DELETE CASCADE;
END IF;
END $$;
""")


def upgrade():
"""Rename dag_schedule_dataset_alias_reference constraint."""
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
bind = op.get_context().bind
insp = inspect(bind)
fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]

# "dsdar_dataset_alias_fkey" was the constraint name defined in the model while "dsdar_dataset_fkey" is the one
# defined in the previous migration.
# Rename this constraint name if user is using the name "dsdar_dataset_fkey".
if "dsdar_dataset_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dataset_fkey",
new_name="dsdar_dataset_alias_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
dialect = op.get_context().dialect.name
if dialect == "sqlite":
op.create_table(
"new_table",
sa.Column("alias_id", sa.Integer(), primary_key=True, nullable=False),
sa.Column("dag_id", sa.String(ID_LEN), primary_key=True, nullable=False),
sa.Column("created_at", UtcDateTime(timezone=True), nullable=False),
sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(
("alias_id",),
["dataset_alias.id"],
name="dsdar_dataset_alias_fkey",
ondelete="CASCADE",
)

# "dsdar_dag_fkey" was the constraint name defined in the model while "dsdar_dag_id_fkey" is the one
# defined in the previous migration.
# Rename this constraint name if user is using the name "dsdar_dag_fkey".
if "dsdar_dag_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dag_fkey",
new_name="dsdar_dag_id_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
),
sa.ForeignKeyConstraint(
columns=("dag_id",),
refcolumns=["dag.dag_id"],
name="dsdar_dag_id_fkey",
ondelete="CASCADE",
)
),
sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"),
)
op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference")
op.drop_table("dag_schedule_dataset_alias_reference")
op.rename_table("new_table", "dag_schedule_dataset_alias_reference")
op.create_index(
"idx_dag_schedule_dataset_alias_reference_dag_id",
"dag_schedule_dataset_alias_reference",
["dag_id"],
unique=False,
)
if dialect == "postgresql":
op.execute(
"ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dataset_fkey"
)
op.execute(
"ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dag_fkey"
)
postgres_create_foreignkey_if_not_exists(
"dsdar_dataset_alias_fkey",
"dag_schedule_dataset_alias_reference",
"alias_id",
"dataset_alias",
"id",
op,
)
postgres_create_foreignkey_if_not_exists(
"dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference", "alias_id", "dataset_alias", "id", op
)
if dialect == "mysql":
mysql_drop_foreignkey_if_exists("dsdar_dataset_fkey", "dag_schedule_dataset_alias_reference", op)
mysql_drop_foreignkey_if_exists("dsdar_dag_fkey", "dag_schedule_dataset_alias_reference", op)
mysql_create_foreignkey_if_not_exists(
"dsdar_dataset_alias_fkey",
"dag_schedule_dataset_alias_reference",
"alias_id",
"dataset_alias",
"id",
op,
)
mysql_create_foreignkey_if_not_exists(
"dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference", "alias_id", "dataset_alias", "id", op
)


def downgrade():
"""Undo dag_schedule_dataset_alias_reference constraint rename."""
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
bind = op.get_context().bind
insp = inspect(bind)
fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]
if "dsdar_dataset_alias_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dataset_alias_fkey",
new_name="dsdar_dataset_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
dialect = op.get_context().dialect.name
if dialect == "postgresql":
op.execute(
"ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dataset_alias_fkey"
)
op.execute(
"ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dag_id_fkey"
)
postgres_create_foreignkey_if_not_exists(
"dsdar_dataset_fkey",
"dag_schedule_dataset_alias_reference",
"alias_id",
"dataset_alias",
"id",
op,
)
postgres_create_foreignkey_if_not_exists(
"dsdar_dag_fkey",
"dag_schedule_dataset_alias_reference",
"alias_id",
"dataset_alias",
"id",
op,
)
if dialect == "mysql":
mysql_drop_foreignkey_if_exists(
"dsdar_dataset_alias_fkey", "dag_schedule_dataset_alias_reference", op
)
mysql_drop_foreignkey_if_exists("dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference", op)
mysql_create_foreignkey_if_not_exists(
"dsdar_dataset_fkey",
"dag_schedule_dataset_alias_reference",
"alias_id",
"dataset_alias",
"id",
op,
)
mysql_create_foreignkey_if_not_exists(
"dsdar_dag_fkey",
"dag_schedule_dataset_alias_reference",
"alias_id",
"dataset_alias",
"id",
op,
)
if dialect == "sqlite":
op.create_table(
"new_table",
sa.Column("alias_id", sa.Integer(), primary_key=True, nullable=False),
sa.Column("dag_id", sa.String(ID_LEN), primary_key=True, nullable=False),
sa.Column("created_at", UtcDateTime(timezone=True), nullable=False),
sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(
("alias_id",),
["dataset_alias.id"],
name="dsdar_dataset_fkey",
ondelete="CASCADE",
)

if "dsdar_dag_id_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dag_id_fkey",
new_name="dsdar_dag_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
),
sa.ForeignKeyConstraint(
columns=("dag_id",),
refcolumns=["dag.dag_id"],
name="dsdar_dag_fkey",
ondelete="CASCADE",
)
),
sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"),
)
op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference")
op.drop_table("dag_schedule_dataset_alias_reference")
op.rename_table("new_table", "dag_schedule_dataset_alias_reference")
op.create_index(
"idx_dag_schedule_dataset_alias_reference_dag_id",
"dag_schedule_dataset_alias_reference",
["dag_id"],
unique=False,
)
Loading

0 comments on commit d28caa7

Please sign in to comment.