feat: Adds helper functions for migrations (#31303)

This commit is contained in:
Luiz Otavio 2024-12-11 10:50:56 -03:00 committed by GitHub
parent fd57fce977
commit 423a0fefa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 234 additions and 70 deletions

View File

@ -19,10 +19,8 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from alembic import op from alembic import op
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.reflection import Inspector
from superset.migrations.shared.utils import has_table
from superset.utils.core import generic_find_fk_constraint_name from superset.utils.core import generic_find_fk_constraint_name
@ -73,23 +71,3 @@ def redefine(
ondelete=on_delete, ondelete=on_delete,
onupdate=on_update, onupdate=on_update,
) )
def drop_fks_for_table(table_name: str) -> None:
"""
Drop all foreign key constraints for a table if it exist and the database
is not sqlite.
:param table_name: The table name to drop foreign key constraints for
"""
connection = op.get_bind()
inspector = Inspector.from_engine(connection)
if isinstance(connection.dialect, SQLiteDialect):
return # sqlite doesn't like constraints
if has_table(table_name):
foreign_keys = inspector.get_foreign_keys(table_name)
for fk in foreign_keys:
op.drop_constraint(fk["name"], table_name, type_="foreignkey")

View File

@ -21,17 +21,25 @@ from collections.abc import Iterator
from typing import Any, Callable, Optional, Union from typing import Any, Callable, Optional, Union
from uuid import uuid4 from uuid import uuid4
import sqlalchemy as sa
from alembic import op from alembic import op
from sqlalchemy import inspect from sqlalchemy import Column, inspect
from sqlalchemy.dialects.mysql.base import MySQLDialect from sqlalchemy.dialects.mysql.base import MySQLDialect
from sqlalchemy.dialects.postgresql.base import PGDialect from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import NoSuchTableError from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.orm import Query, Session from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.schema import SchemaItem
from superset.utils import json from superset.utils import json
logger = logging.getLogger(__name__) GREEN = "\033[32m"
RESET = "\033[0m"
YELLOW = "\033[33m"
RED = "\033[31m"
LRED = "\033[91m"
logger = logging.getLogger("alembic")
DEFAULT_BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 1000)) DEFAULT_BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 1000))
@ -185,15 +193,208 @@ def has_table(table_name: str) -> bool:
return table_exists return table_exists
def add_column_if_not_exists(table_name: str, column: sa.Column) -> None: def drop_fks_for_table(table_name: str) -> None:
""" """
Adds a column to a table if it does not already exist. Drop all foreign key constraints for a table if it exist and the database
is not sqlite.
:param table_name: Name of the table. :param table_name: The table name to drop foreign key constraints for
:param column: SQLAlchemy Column object.
""" """
if not table_has_column(table_name, column.name): connection = op.get_bind()
print(f"Adding column '{column.name}' to table '{table_name}'.\n") inspector = Inspector.from_engine(connection)
op.add_column(table_name, column)
else: if isinstance(connection.dialect, SQLiteDialect):
print(f"Column '{column.name}' already exists in table '{table_name}'.\n") return # sqlite doesn't like constraints
if has_table(table_name):
foreign_keys = inspector.get_foreign_keys(table_name)
for fk in foreign_keys:
logger.info(
f"Dropping foreign key {GREEN}{fk['name']}{RESET} from table {GREEN}{table_name}{RESET}..."
)
op.drop_constraint(fk["name"], table_name, type_="foreignkey")
def create_table(table_name: str, *columns: SchemaItem) -> None:
"""
Creates a database table with the specified name and columns.
This function checks if a table with the given name already exists in the database.
If the table already exists, it logs an informational.
Otherwise, it proceeds to create a new table using the provided name and schema columns.
:param table_name: The name of the table to be created.
:param columns: A variable number of arguments representing the schema just like when calling alembic's method create_table()
"""
if has_table(table_name=table_name):
logger.info(f"Table {LRED}{table_name}{RESET} already exists. Skipping...")
return
logger.info(f"Creating table {GREEN}{table_name}{RESET}...")
op.create_table(table_name, *columns)
logger.info(f"Table {GREEN}{table_name}{RESET} created.")
def drop_table(table_name: str) -> None:
"""
Drops a database table with the specified name.
This function checks if a table with the given name exists in the database.
If the table does not exist, it logs an informational message and skips the dropping process.
If the table exists, it first attempts to drop all foreign key constraints associated with the table
(handled by `drop_fks_for_table`) and then proceeds to drop the table.
:param table_name: The name of the table to be dropped.
"""
if not has_table(table_name=table_name):
logger.info(f"Table {GREEN}{table_name}{RESET} doesn't exist. Skipping...")
return
logger.info(f"Dropping table {GREEN}{table_name}{RESET}...")
drop_fks_for_table(table_name)
op.drop_table(table_name=table_name)
logger.info(f"Table {GREEN}{table_name}{RESET} dropped.")
def batch_operation(
callable: Callable[[int, int], None], count: int, batch_size: int
) -> None:
"""
Executes an operation by dividing a task into smaller batches and tracking progress.
This function is designed to process a large number of items in smaller batches. It takes a callable
that performs the operation on each batch. The function logs the progress of the operation as it processes
through the batches.
If count is set to 0 or lower, it logs an informational message and skips the batch process.
:param callable: A callable function that takes two integer arguments:
the start index and the end index of the current batch.
:param count: The total number of items to process.
:param batch_size: The number of items to process in each batch.
"""
if count <= 0:
logger.info(
f"No records to process in batch {LRED}(count <= 0){RESET} for callable {LRED}other_callable_example{RESET}. Skipping..."
)
return
for offset in range(0, count, batch_size):
percentage = (offset / count) * 100 if count else 0
logger.info(f"Progress: {offset:,}/{count:,} ({percentage:.2f}%)")
callable(offset, min(offset + batch_size, count))
logger.info(f"Progress: {count:,}/{count:,} (100%)")
logger.info(
f"End: {GREEN}{callable.__name__}{RESET} batch operation {GREEN}succesfully{RESET} executed."
)
def add_columns(table_name: str, *columns: Column) -> None:
"""
Adds new columns to an existing database table.
If a column already exists, it logs an informational message and skips the adding process.
Otherwise, it proceeds to add the new column to the table.
The operation is performed using Alembic's batch_alter_table.
:param table_name: The name of the table to which the columns will be added.
:param columns: A list of SQLAlchemy Column objects that define the name, type, and other attributes of the columns to be added.
"""
cols_to_add = []
for col in columns:
if table_has_column(table_name=table_name, column_name=col.name):
logger.info(
f"Column {LRED}{col.name}{RESET} already present on table {LRED}{table_name}{RESET}. Skipping..."
)
else:
cols_to_add.append(col)
with op.batch_alter_table(table_name) as batch_op:
for col in cols_to_add:
logger.info(
f"Adding column {GREEN}{col.name}{RESET} to table {GREEN}{table_name}{RESET}..."
)
batch_op.add_column(col)
def drop_columns(table_name: str, *columns: str) -> None:
"""
Drops specified columns from an existing database table.
If a column does not exist, it logs an informational message and skips the dropping process.
Otherwise, it proceeds to remove the column from the table.
The operation is performed using Alembic's batch_alter_table.
:param table_name: The name of the table from which the columns will be removed.
:param columns: A list of column names to be dropped.
"""
cols_to_drop = []
for col in columns:
if not table_has_column(table_name=table_name, column_name=col):
logger.info(
f"Column {LRED}{col}{RESET} is not present on table {LRED}{table_name}{RESET}. Skipping..."
)
else:
cols_to_drop.append(col)
with op.batch_alter_table(table_name) as batch_op:
for col in cols_to_drop:
logger.info(
f"Dropping column {GREEN}{col}{RESET} from table {GREEN}{table_name}{RESET}..."
)
batch_op.drop_column(col)
def create_index(table_name: str, index_name: str, *columns: str) -> None:
"""
Creates an index on specified columns of an existing database table.
If the index already exists, it logs an informational message and skips the creation process.
Otherwise, it proceeds to create a new index with the specified name on the given columns of the table.
:param table_name: The name of the table on which the index will be created.
:param index_name: The name of the index to be created.
:param columns: A list column names where the index will be created
"""
if table_has_index(table=table_name, index=index_name):
logger.info(
f"Table {LRED}{table_name}{RESET} already has index {LRED}{index_name}{RESET}. Skipping..."
)
return
logger.info(
f"Creating index {GREEN}{index_name}{RESET} on table {GREEN}{table_name}{RESET}"
)
op.create_index(table_name=table_name, index_name=index_name, columns=columns)
def drop_index(table_name: str, index_name: str) -> None:
"""
Drops an index from an existing database table.
If the index does not exists, it logs an informational message and skips the dropping process.
Otherwise, it proceeds with the removal operation.
:param table_name: The name of the table from which the index will be dropped.
:param index_name: The name of the index to be dropped.
"""
if not table_has_index(table=table_name, index=index_name):
logger.info(
f"Table {LRED}{table_name}{RESET} doesn't have index {LRED}{index_name}{RESET}. Skipping..."
)
return
logger.info(
f"Dropping index {GREEN}{index_name}{RESET} from table {GREEN}{table_name}{RESET}..."
)
op.drop_index(table_name=table_name, index_name=index_name)

View File

@ -30,7 +30,7 @@ import sqlalchemy as sa # noqa: E402
from alembic import op # noqa: E402 from alembic import op # noqa: E402
from sqlalchemy_utils import EncryptedType # noqa: E402 from sqlalchemy_utils import EncryptedType # noqa: E402
from superset.migrations.shared.constraints import drop_fks_for_table # noqa: E402 from superset.migrations.shared.utils import drop_fks_for_table # noqa: E402
def upgrade(): def upgrade():

View File

@ -22,9 +22,8 @@ Create Date: 2024-04-01 22:44:40.386543
""" """
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op
from superset.migrations.shared.utils import add_column_if_not_exists from superset.migrations.shared.utils import add_columns, drop_columns
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "c22cb5c2e546" revision = "c22cb5c2e546"
@ -32,11 +31,11 @@ down_revision = "678eefb4ab44"
def upgrade(): def upgrade():
add_column_if_not_exists( add_columns(
"user_attribute", "user_attribute",
sa.Column("avatar_url", sa.String(length=100), nullable=True), sa.Column("avatar_url", sa.String(length=100), nullable=True),
) )
def downgrade(): def downgrade():
op.drop_column("user_attribute", "avatar_url") drop_columns("user_attribute", "avatar_url")

View File

@ -23,9 +23,8 @@ Create Date: 2024-04-11 15:41:34.663989
""" """
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op
from superset.migrations.shared.utils import add_column_if_not_exists from superset.migrations.shared.utils import add_columns, drop_columns
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "5f57af97bc3f" revision = "5f57af97bc3f"
@ -36,12 +35,9 @@ tables = ["tables", "query", "saved_query", "tab_state", "table_schema"]
def upgrade(): def upgrade():
for table in tables: for table in tables:
add_column_if_not_exists( add_columns(table, sa.Column("catalog", sa.String(length=256), nullable=True))
table,
sa.Column("catalog", sa.String(length=256), nullable=True),
)
def downgrade(): def downgrade():
for table in reversed(tables): for table in reversed(tables):
op.drop_column(table, "catalog") drop_columns(table, "catalog")

View File

@ -23,13 +23,12 @@ Create Date: 2024-05-01 10:52:31.458433
""" """
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op
from superset.migrations.shared.catalogs import ( from superset.migrations.shared.catalogs import (
downgrade_catalog_perms, downgrade_catalog_perms,
upgrade_catalog_perms, upgrade_catalog_perms,
) )
from superset.migrations.shared.utils import add_column_if_not_exists from superset.migrations.shared.utils import add_columns, drop_columns
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "58d051681a3b" revision = "58d051681a3b"
@ -37,18 +36,16 @@ down_revision = "4a33124c18ad"
def upgrade(): def upgrade():
add_column_if_not_exists( add_columns(
"tables", "tables", sa.Column("catalog_perm", sa.String(length=1000), nullable=True)
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
) )
add_column_if_not_exists( add_columns(
"slices", "slices", sa.Column("catalog_perm", sa.String(length=1000), nullable=True)
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
) )
upgrade_catalog_perms(engines={"postgresql"}) upgrade_catalog_perms(engines={"postgresql"})
def downgrade(): def downgrade():
downgrade_catalog_perms(engines={"postgresql"}) downgrade_catalog_perms(engines={"postgresql"})
op.drop_column("slices", "catalog_perm") drop_columns("slices", "catalog_perm")
op.drop_column("tables", "catalog_perm") drop_columns("tables", "catalog_perm")

View File

@ -25,8 +25,7 @@ Create Date: 2024-05-24 11:31:57.115586
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "02f4f7811799" revision = "02f4f7811799"

View File

@ -25,8 +25,7 @@ Create Date: 2024-08-13 15:17:23.273168
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "39549add7bfc" revision = "39549add7bfc"

View File

@ -25,8 +25,7 @@ Create Date: 2024-08-13 15:23:28.768963
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "38f4144e8558" revision = "38f4144e8558"

View File

@ -25,8 +25,7 @@ Create Date: 2024-08-13 15:27:11.589886
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "e53fd48cc078" revision = "e53fd48cc078"

View File

@ -25,8 +25,7 @@ Create Date: 2024-08-13 15:29:33.135672
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "a6b32d2d07b1" revision = "a6b32d2d07b1"

View File

@ -25,8 +25,7 @@ Create Date: 2024-08-13 15:31:31.478017
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "007a1abffe7e" revision = "007a1abffe7e"

View File

@ -25,8 +25,7 @@ Create Date: 2024-08-13 15:33:14.551012
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op from alembic import op
from superset.migrations.shared.constraints import drop_fks_for_table from superset.migrations.shared.utils import drop_fks_for_table, has_table
from superset.migrations.shared.utils import has_table
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "48cbb571fa3a" revision = "48cbb571fa3a"