chore: Allow auto pruning of the query table (#29936)
This commit is contained in:
parent
6e1ef193dd
commit
bac61fc24e
|
|
@ -0,0 +1,109 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from superset import db
|
||||
from superset.commands.base import BaseCommand
|
||||
from superset.models.sql_lab import Query
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# pylint: disable=consider-using-transaction
|
||||
class QueryPruneCommand(BaseCommand):
|
||||
"""
|
||||
Command to prune the query table by deleting rows older than the specified retention period.
|
||||
|
||||
This command deletes records from the `Query` table that have not been changed within the
|
||||
specified number of days. It helps in maintaining the database by removing outdated entries
|
||||
and freeing up space.
|
||||
|
||||
Attributes:
|
||||
retention_period_days (int): The number of days for which records should be retained.
|
||||
Records older than this period will be deleted.
|
||||
"""
|
||||
|
||||
def __init__(self, retention_period_days: int):
|
||||
"""
|
||||
:param retention_period_days: Number of days to keep in the query table
|
||||
"""
|
||||
self.retention_period_days = retention_period_days
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Executes the prune command
|
||||
"""
|
||||
batch_size = 999 # SQLite has a IN clause limit of 999
|
||||
total_deleted = 0
|
||||
start_time = time.time()
|
||||
|
||||
# Select all IDs that need to be deleted
|
||||
ids_to_delete = (
|
||||
db.session.execute(
|
||||
sa.select(Query.id).where(
|
||||
Query.changed_on
|
||||
< datetime.now() - timedelta(days=self.retention_period_days)
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
total_rows = len(ids_to_delete)
|
||||
|
||||
logger.info("Total rows to be deleted: %s", total_rows)
|
||||
|
||||
next_logging_threshold = 1
|
||||
|
||||
# Iterate over the IDs in batches
|
||||
for i in range(0, total_rows, batch_size):
|
||||
batch_ids = ids_to_delete[i : i + batch_size]
|
||||
|
||||
# Delete the selected batch using IN clause
|
||||
result = db.session.execute(sa.delete(Query).where(Query.id.in_(batch_ids)))
|
||||
|
||||
# Update the total number of deleted records
|
||||
total_deleted += result.rowcount
|
||||
|
||||
# Explicitly commit the transaction given that if an error occurs, we want to ensure that the
|
||||
# records that have been deleted so far are committed
|
||||
db.session.commit()
|
||||
|
||||
# Log the number of deleted records every 1% increase in progress
|
||||
percentage_complete = (total_deleted / total_rows) * 100
|
||||
if percentage_complete >= next_logging_threshold:
|
||||
logger.info(
|
||||
"Deleted %s rows from the query table older than %s days (%d%% complete)",
|
||||
total_deleted,
|
||||
self.retention_period_days,
|
||||
percentage_complete,
|
||||
)
|
||||
next_logging_threshold += 1
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
minutes, seconds = divmod(elapsed_time, 60)
|
||||
formatted_time = f"{int(minutes):02}:{int(seconds):02}"
|
||||
logger.info(
|
||||
"Pruning complete: %s rows deleted in %s", total_deleted, formatted_time
|
||||
)
|
||||
|
||||
def validate(self) -> None:
|
||||
pass
|
||||
|
|
@ -1000,6 +1000,12 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
|
|||
"task": "reports.prune_log",
|
||||
"schedule": crontab(minute=0, hour=0),
|
||||
},
|
||||
# Uncomment to enable pruning of the query table
|
||||
# "prune_query": {
|
||||
# "task": "prune_query",
|
||||
# "schedule": crontab(minute=0, hour=0, day_of_month=1),
|
||||
# "options": {"retention_period_days": 180},
|
||||
# },
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ from superset.commands.exceptions import CommandException
|
|||
from superset.commands.report.exceptions import ReportScheduleUnexpectedError
|
||||
from superset.commands.report.execute import AsyncExecuteReportScheduleCommand
|
||||
from superset.commands.report.log_prune import AsyncPruneReportScheduleLogCommand
|
||||
from superset.commands.sql_lab.query import QueryPruneCommand
|
||||
from superset.daos.report import ReportScheduleDAO
|
||||
from superset.extensions import celery_app
|
||||
from superset.stats_logger import BaseStatsLogger
|
||||
|
|
@ -119,3 +120,16 @@ def prune_log() -> None:
|
|||
logger.warning("A timeout occurred while pruning report schedule logs: %s", ex)
|
||||
except CommandException:
|
||||
logger.exception("An exception occurred while pruning report schedule logs")
|
||||
|
||||
|
||||
@celery_app.task(name="prune_query")
|
||||
def prune_query() -> None:
|
||||
stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"]
|
||||
stats_logger.incr("prune_query")
|
||||
|
||||
try:
|
||||
QueryPruneCommand(
|
||||
prune_query.request.properties.get("retention_period_days")
|
||||
).run()
|
||||
except CommandException as ex:
|
||||
logger.exception("An error occurred while pruning queries: %s", ex)
|
||||
|
|
|
|||
Loading…
Reference in New Issue