fix: Issue 13956 (#13980)
Co-authored-by: John Bodley <john.bodley@airbnb.com>
This commit is contained in:
parent
667eb83e33
commit
a3b41e2bac
|
|
@ -25,6 +25,8 @@ assists people when migrating to a new version.
|
|||
## Next
|
||||
- [13772](https://github.com/apache/superset/pull/13772): Row level security (RLS) is now enabled by default. To activate the feature, please run `superset init` to expose the RLS menus to Admin users.
|
||||
|
||||
- [13980](https://github.com/apache/superset/pull/13980): Data health checks no longer use the metadata database as an interim cache. Though non-breaking, deployments which implement complex logic should likely memoize the callback function. Refer to documentation in the confg.py file for more detail.
|
||||
|
||||
### Breaking Changes
|
||||
### Potential Downtime
|
||||
### Deprecations
|
||||
|
|
|
|||
|
|
@ -53,6 +53,9 @@ logger = logging.getLogger(__name__)
|
|||
if TYPE_CHECKING:
|
||||
from flask_appbuilder.security.sqla import models # pylint: disable=unused-import
|
||||
|
||||
from superset.connectors.sqla.models import ( # pylint: disable=unused-import
|
||||
SqlaTable,
|
||||
)
|
||||
from superset.models.core import Database # pylint: disable=unused-import
|
||||
|
||||
# Realtime stats logger, a StatsD implementation exists
|
||||
|
|
@ -1146,9 +1149,36 @@ GLOBAL_ASYNC_QUERIES_TRANSPORT = "polling"
|
|||
GLOBAL_ASYNC_QUERIES_POLLING_DELAY = 500
|
||||
GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://127.0.0.1:8080/"
|
||||
|
||||
# It's possible to add a dataset health check logic which is specific to your system.
|
||||
# It will get executed each time when user open a chart's explore view.
|
||||
DATASET_HEALTH_CHECK = None
|
||||
# A SQL dataset health check. Note if enabled it is strongly advised that the callable
|
||||
# be memoized to aid with performance, i.e.,
|
||||
#
|
||||
# @cache_manager.cache.memoize(timeout=0)
|
||||
# def DATASET_HEALTH_CHECK(datasource: SqlaTable) -> Optional[str]:
|
||||
# if (
|
||||
# datasource.sql and
|
||||
# len(sql_parse.ParsedQuery(datasource.sql, strip_comments=True).tables) == 1
|
||||
# ):
|
||||
# return (
|
||||
# "This virtual dataset queries only one table and therefore could be "
|
||||
# "replaced by querying the table directly."
|
||||
# )
|
||||
#
|
||||
# return None
|
||||
#
|
||||
# Within the FLASK_APP_MUTATOR callable, i.e., once the application and thus cache have
|
||||
# been initialized it is also necessary to add the following logic to blow the cache for
|
||||
# all datasources if the callback function changed.
|
||||
#
|
||||
# def FLASK_APP_MUTATOR(app: Flask) -> None:
|
||||
# name = "DATASET_HEALTH_CHECK"
|
||||
# func = app.config[name]
|
||||
# code = func.uncached.__code__.co_code
|
||||
#
|
||||
# if cache_manager.cache.get(name) != code:
|
||||
# cache_manager.cache.delete_memoized(func)
|
||||
# cache_manager.cache.set(name, code, timeout=0)
|
||||
#
|
||||
DATASET_HEALTH_CHECK: Optional[Callable[["SqlaTable"], str]] = None
|
||||
|
||||
# SQLalchemy link doc reference
|
||||
SQLALCHEMY_DOCS_URL = "https://docs.sqlalchemy.org/en/13/core/engines.html"
|
||||
|
|
|
|||
|
|
@ -60,7 +60,6 @@ from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetr
|
|||
from superset.db_engine_specs.base import TimestampExpression
|
||||
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
|
||||
from superset.exceptions import QueryObjectValidationError, SupersetSecurityException
|
||||
from superset.extensions import event_logger
|
||||
from superset.jinja_context import (
|
||||
BaseTemplateProcessor,
|
||||
ExtraCache,
|
||||
|
|
@ -687,9 +686,10 @@ class SqlaTable( # pylint: disable=too-many-public-methods,too-many-instance-at
|
|||
self.table_name, schema=self.schema, show_cols=False, latest_partition=False
|
||||
)
|
||||
|
||||
@property
|
||||
@property # type: ignore
|
||||
def health_check_message(self) -> Optional[str]:
|
||||
return self.extra_dict.get("health_check", {}).get("message")
|
||||
check = config["DATASET_HEALTH_CHECK"]
|
||||
return check(self) if check else None
|
||||
|
||||
@property
|
||||
def data(self) -> Dict[str, Any]:
|
||||
|
|
@ -703,13 +703,7 @@ class SqlaTable( # pylint: disable=too-many-public-methods,too-many-instance-at
|
|||
data_["fetch_values_predicate"] = self.fetch_values_predicate
|
||||
data_["template_params"] = self.template_params
|
||||
data_["is_sqllab_view"] = self.is_sqllab_view
|
||||
# Don't return previously populated health check message in case
|
||||
# the health check feature is turned off
|
||||
data_["health_check_message"] = (
|
||||
self.health_check_message
|
||||
if config.get("DATASET_HEALTH_CHECK")
|
||||
else None
|
||||
)
|
||||
data_["health_check_message"] = self.health_check_message
|
||||
data_["extra"] = self.extra
|
||||
return data_
|
||||
|
||||
|
|
@ -1608,26 +1602,6 @@ class SqlaTable( # pylint: disable=too-many-public-methods,too-many-instance-at
|
|||
extra_cache_keys += sqla_query.extra_cache_keys
|
||||
return extra_cache_keys
|
||||
|
||||
def health_check(self, commit: bool = False, force: bool = False) -> None:
|
||||
check = config.get("DATASET_HEALTH_CHECK")
|
||||
if check is None:
|
||||
return
|
||||
|
||||
extra = self.extra_dict
|
||||
# force re-run health check, or health check is updated
|
||||
if force or extra.get("health_check", {}).get("version") != check.version:
|
||||
with event_logger.log_context(action="dataset_health_check"):
|
||||
message = check(self)
|
||||
extra["health_check"] = {
|
||||
"version": check.version,
|
||||
"message": message,
|
||||
}
|
||||
self.extra = json.dumps(extra)
|
||||
|
||||
db.session.merge(self)
|
||||
if commit:
|
||||
db.session.commit()
|
||||
|
||||
|
||||
sa.event.listen(SqlaTable, "after_insert", security_manager.set_perm)
|
||||
sa.event.listen(SqlaTable, "after_update", security_manager.set_perm)
|
||||
|
|
|
|||
|
|
@ -169,9 +169,7 @@ class DatasetDAO(BaseDAO): # pylint: disable=too-many-public-methods
|
|||
super().update(model, properties, commit=commit)
|
||||
properties["columns"] = original_properties
|
||||
|
||||
updated_model = super().update(model, properties, commit=False)
|
||||
model.health_check(force=True, commit=False)
|
||||
return updated_model
|
||||
return super().update(model, properties, commit=False)
|
||||
|
||||
@classmethod
|
||||
def update_columns(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,68 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""remove dataset health check message
|
||||
|
||||
Revision ID: 134cea61c5e7
|
||||
Revises: 301362411006
|
||||
Create Date: 2021-04-07 07:21:27.324983
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "134cea61c5e7"
|
||||
down_revision = "301362411006"
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from alembic import op
|
||||
from sqlalchemy import Column, Integer, Text
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
from superset import db
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class SqlaTable(Base):
|
||||
__tablename__ = "tables"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
extra = Column(Text)
|
||||
|
||||
|
||||
def upgrade():
|
||||
bind = op.get_bind()
|
||||
session = db.Session(bind=bind)
|
||||
|
||||
for datasource in session.query(SqlaTable):
|
||||
if datasource.extra:
|
||||
try:
|
||||
extra = json.loads(datasource.extra)
|
||||
|
||||
if extra and "health_check" in extra:
|
||||
del extra["health_check"]
|
||||
datasource.extra = json.dumps(extra) if extra else None
|
||||
except Exception as ex:
|
||||
logging.exception(ex)
|
||||
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
||||
def downgrade():
|
||||
pass
|
||||
|
|
@ -732,10 +732,6 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
f"datasource_id={datasource_id}&"
|
||||
)
|
||||
|
||||
# if feature enabled, run some health check rules for sqla datasource
|
||||
if hasattr(datasource, "health_check"):
|
||||
datasource.health_check()
|
||||
|
||||
viz_type = form_data.get("viz_type")
|
||||
if not viz_type and datasource and datasource.default_endpoint:
|
||||
return redirect(datasource.default_endpoint)
|
||||
|
|
|
|||
|
|
@ -83,8 +83,6 @@ class Datasource(BaseSupersetView):
|
|||
status=409,
|
||||
)
|
||||
orm_datasource.update_from_object(datasource_dict)
|
||||
if hasattr(orm_datasource, "health_check"):
|
||||
orm_datasource.health_check(force=True, commit=False)
|
||||
data = orm_datasource.data
|
||||
db.session.commit()
|
||||
|
||||
|
|
|
|||
|
|
@ -221,21 +221,11 @@ class TestDatasource(SupersetTestCase):
|
|||
return "Warning message!"
|
||||
|
||||
app.config["DATASET_HEALTH_CHECK"] = my_check
|
||||
my_check.version = 0.1
|
||||
|
||||
self.login(username="admin")
|
||||
tbl = self.get_table_by_name("birth_names")
|
||||
self.datasource = ConnectorRegistry.get_datasource("table", tbl.id, db.session)
|
||||
|
||||
for key in self.datasource.export_fields:
|
||||
self.original_attrs[key] = getattr(self.datasource, key)
|
||||
|
||||
url = f"/datasource/get/{tbl.type}/{tbl.id}/"
|
||||
tbl.health_check(commit=True, force=True)
|
||||
resp = self.get_json_resp(url)
|
||||
self.assertEqual(resp["health_check_message"], "Warning message!")
|
||||
|
||||
del app.config["DATASET_HEALTH_CHECK"]
|
||||
datasource = ConnectorRegistry.get_datasource("table", tbl.id, db.session)
|
||||
assert datasource.health_check_message == "Warning message!"
|
||||
app.config["DATASET_HEALTH_CHECK"] = None
|
||||
|
||||
def test_get_datasource_failed(self):
|
||||
pytest.raises(
|
||||
|
|
|
|||
Loading…
Reference in New Issue