chore: remove druid datasource from the config (#19770)

* remove druid datasource from the config

* remove config related references to DruidDatasource

* Update __init__.py

* Update __init__.py

* Update manager.py

* remove config related references to DruidDatasource

* raise if instance type is not valid
This commit is contained in:
Elizabeth Thompson 2022-05-04 12:48:48 -07:00 committed by GitHub
parent 7d3437a98e
commit 060b5c0e17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 21 additions and 5987 deletions

View File

@ -30,6 +30,7 @@ assists people when migrating to a new version.
### Breaking Changes
- [19770](https://github.com/apache/superset/pull/19770): As per SIPs 11 and 68, the native NoSQL Druid connector is deprecated and has been removed. Druid is still supported through SQLAlchemy via pydruid. The config keys `DRUID_IS_ACTIVE` and `DRUID_METADATA_LINKS_ENABLED` have also been removed.
- [19274](https://github.com/apache/superset/pull/19274): The `PUBLIC_ROLE_LIKE_GAMMA` config key has been removed, set `PUBLIC_ROLE_LIKE = "Gamma"` to have the same functionality.
- [19273](https://github.com/apache/superset/pull/19273): The `SUPERSET_CELERY_WORKERS` and `SUPERSET_WORKERS` config keys has been removed. Configure Celery directly using `CELERY_CONFIG` on Superset.
- [19262](https://github.com/apache/superset/pull/19262): Per [SIP-11](https://github.com/apache/superset/issues/6032) and [SIP-68](https://github.com/apache/superset/issues/14909) the native NoSQL Druid connector is deprecated and will no longer be supported. Druid SQL is still [supported](https://superset.apache.org/docs/databases/druid).

View File

@ -20,36 +20,6 @@ import React from 'react';
import { t } from '@superset-ui/core';
import { sections } from '@superset-ui/chart-controls';
const appContainer = document.getElementById('app');
const bootstrapData = JSON.parse(appContainer.getAttribute('data-bootstrap'));
const druidIsActive = !!bootstrapData?.common?.conf?.DRUID_IS_ACTIVE;
const druidSection = druidIsActive
? [
[
{
name: 'show_druid_time_granularity',
config: {
type: 'CheckboxControl',
label: t('Show Druid granularity dropdown'),
default: false,
description: t('Check to include Druid granularity dropdown'),
},
},
],
[
{
name: 'show_druid_time_origin',
config: {
type: 'CheckboxControl',
label: t('Show Druid time origin'),
default: false,
description: t('Check to include time origin dropdown'),
},
},
],
]
: [];
export default {
controlPanelSections: [
sections.legacyTimeseriesTime,
@ -96,35 +66,6 @@ export default {
},
},
],
[
{
name: 'show_sqla_time_granularity',
config: {
type: 'CheckboxControl',
label: druidIsActive
? t('Show SQL time grain dropdown')
: t('Show time grain dropdown'),
default: false,
description: druidIsActive
? t('Check to include SQL time grain dropdown')
: t('Check to include time grain dropdown'),
},
},
],
[
{
name: 'show_sqla_time_column',
config: {
type: 'CheckboxControl',
label: druidIsActive
? t('Show SQL time column')
: t('Show time column'),
default: false,
description: t('Check to include time column dropdown'),
},
},
],
...druidSection,
['adhoc_filters'],
],
},

View File

@ -18,7 +18,6 @@ import json
import logging
import os
import sys
from datetime import datetime
from typing import Optional
import click
@ -53,39 +52,6 @@ def set_database_uri(database_name: str, uri: str, skip_create: bool) -> None:
database_utils.get_or_create_db(database_name, uri, not skip_create)
@click.command()
@with_appcontext
@click.option(
"--datasource",
"-d",
help="Specify which datasource name to load, if "
"omitted, all datasources will be refreshed",
)
@click.option(
"--merge",
"-m",
is_flag=True,
default=False,
help="Specify using 'merge' property during operation. " "Default value is False.",
)
def refresh_druid(datasource: str, merge: bool) -> None:
"""Refresh druid datasources"""
# pylint: disable=import-outside-toplevel
from superset.connectors.druid.models import DruidCluster
session = db.session()
for cluster in session.query(DruidCluster).all():
try:
cluster.refresh_datasources(datasource_name=datasource, merge_flag=merge)
except Exception as ex: # pylint: disable=broad-except
print("Error while processing cluster '{}'\n{}".format(cluster, str(ex)))
logger.exception(ex)
cluster.metadata_last_refreshed = datetime.now()
print("Refreshed metadata from cluster " "[" + cluster.cluster_name + "]")
session.commit()
@click.command()
@with_appcontext
def update_datasources_cache() -> None:

View File

@ -258,16 +258,6 @@ FAB_API_SWAGGER_UI = True
DRUID_TZ = tz.tzutc()
DRUID_ANALYSIS_TYPES = ["cardinality"]
# Legacy Druid NoSQL (native) connector
# Druid supports a SQL interface in its newer versions.
# Setting this flag to True enables the deprecated, API-based Druid
# connector. This feature may be removed at a future date.
DRUID_IS_ACTIVE = False
# If Druid is active whether to include the links to scan/refresh Druid datasources.
# This should be disabled if you are trying to wean yourself off of the Druid NoSQL
# connector.
DRUID_METADATA_LINKS_ENABLED = True
# ----------------------------------------------------
# AUTHENTICATION CONFIG
@ -646,19 +636,12 @@ TIME_GRAIN_ADDON_EXPRESSIONS: Dict[str, Dict[str, str]] = {}
VIZ_TYPE_DENYLIST: List[str] = []
# ---------------------------------------------------
# List of data sources not to be refreshed in druid cluster
# ---------------------------------------------------
DRUID_DATA_SOURCE_DENYLIST: List[str] = []
# --------------------------------------------------
# Modules, datasources and middleware to be registered
# --------------------------------------------------
DEFAULT_MODULE_DS_MAP = OrderedDict(
[
("superset.connectors.sqla.models", ["SqlaTable"]),
("superset.connectors.druid.models", ["DruidDatasource"]),
]
)
ADDITIONAL_MODULE_DS_MAP: Dict[str, List[str]] = {}
@ -984,8 +967,11 @@ BLUEPRINTS: List[Blueprint] = []
# Provide a callable that receives a tracking_url and returns another
# URL. This is used to translate internal Hadoop job tracker URL
# into a proxied one
TRACKING_URL_TRANSFORMER = lambda x: x
# Interval between consecutive polls when using Hive Engine
HIVE_POLL_INTERVAL = int(timedelta(seconds=5).total_seconds())
@ -1203,8 +1189,10 @@ SSL_CERT_PATH: Optional[str] = None
# to allow mutating the object with this callback.
# This can be used to set any properties of the object based on naming
# conventions and such. You can find examples in the tests.
SQLA_TABLE_MUTATOR = lambda table: table
# Global async query config options.
# Requires GLOBAL_ASYNC_QUERIES feature flag to be enabled.
GLOBAL_ASYNC_QUERIES_REDIS_CONFIG = {

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

File diff suppressed because it is too large Load Diff

View File

@ -1,445 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
from datetime import datetime
from flask import current_app as app, flash, Markup, redirect
from flask_appbuilder import CompactCRUDMixin, expose
from flask_appbuilder.fieldwidgets import Select2Widget
from flask_appbuilder.hooks import before_request
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access
from flask_babel import lazy_gettext as _
from werkzeug.exceptions import NotFound
from wtforms import StringField
from wtforms.ext.sqlalchemy.fields import QuerySelectField
from superset import db, security_manager
from superset.connectors.base.views import BS3TextFieldROWidget, DatasourceModelView
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.druid import models
from superset.constants import RouteMethod
from superset.superset_typing import FlaskResponse
from superset.utils import core as utils
from superset.views.base import (
BaseSupersetView,
DatasourceFilter,
DeleteMixin,
get_dataset_exist_error_msg,
ListWidgetWithCheckboxes,
SupersetModelView,
validate_json,
YamlExportMixin,
)
logger = logging.getLogger(__name__)
class EnsureEnabledMixin:
@staticmethod
def is_enabled() -> bool:
return bool(app.config["DRUID_IS_ACTIVE"])
@before_request
def ensure_enabled(self) -> None:
if not self.is_enabled():
raise NotFound()
class DruidColumnInlineView( # pylint: disable=too-many-ancestors
CompactCRUDMixin,
EnsureEnabledMixin,
SupersetModelView,
):
datamodel = SQLAInterface(models.DruidColumn)
include_route_methods = RouteMethod.RELATED_VIEW_SET
list_title = _("Columns")
show_title = _("Show Druid Column")
add_title = _("Add Druid Column")
edit_title = _("Edit Druid Column")
list_widget = ListWidgetWithCheckboxes
edit_columns = [
"column_name",
"verbose_name",
"description",
"dimension_spec_json",
"datasource",
"groupby",
"filterable",
]
add_columns = edit_columns
list_columns = ["column_name", "verbose_name", "type", "groupby", "filterable"]
can_delete = False
page_size = 500
label_columns = {
"column_name": _("Column"),
"type": _("Type"),
"datasource": _("Datasource"),
"groupby": _("Groupable"),
"filterable": _("Filterable"),
}
description_columns = {
"filterable": _(
"Whether this column is exposed in the `Filters` section "
"of the explore view."
),
"dimension_spec_json": utils.markdown(
"this field can be used to specify "
"a `dimensionSpec` as documented [here]"
"(http://druid.io/docs/latest/querying/dimensionspecs.html). "
"Make sure to input valid JSON and that the "
"`outputName` matches the `column_name` defined "
"above.",
True,
),
}
add_form_extra_fields = {
"datasource": QuerySelectField(
"Datasource",
query_factory=lambda: db.session.query(models.DruidDatasource),
allow_blank=True,
widget=Select2Widget(extra_classes="readonly"),
)
}
edit_form_extra_fields = add_form_extra_fields
def pre_update(self, item: "DruidColumnInlineView") -> None:
# If a dimension spec JSON is given, ensure that it is
# valid JSON and that `outputName` is specified
if item.dimension_spec_json:
try:
dimension_spec = json.loads(item.dimension_spec_json)
except ValueError as ex:
raise ValueError("Invalid Dimension Spec JSON: " + str(ex)) from ex
if not isinstance(dimension_spec, dict):
raise ValueError("Dimension Spec must be a JSON object")
if "outputName" not in dimension_spec:
raise ValueError("Dimension Spec does not contain `outputName`")
if "dimension" not in dimension_spec:
raise ValueError("Dimension Spec is missing `dimension`")
# `outputName` should be the same as the `column_name`
if dimension_spec["outputName"] != item.column_name:
raise ValueError(
"`outputName` [{}] unequal to `column_name` [{}]".format(
dimension_spec["outputName"], item.column_name
)
)
def post_update(self, item: "DruidColumnInlineView") -> None:
item.refresh_metrics()
def post_add(self, item: "DruidColumnInlineView") -> None:
self.post_update(item)
class DruidMetricInlineView( # pylint: disable=too-many-ancestors
CompactCRUDMixin,
EnsureEnabledMixin,
SupersetModelView,
):
datamodel = SQLAInterface(models.DruidMetric)
include_route_methods = RouteMethod.RELATED_VIEW_SET
list_title = _("Metrics")
show_title = _("Show Druid Metric")
add_title = _("Add Druid Metric")
edit_title = _("Edit Druid Metric")
list_columns = ["metric_name", "verbose_name", "metric_type"]
edit_columns = [
"metric_name",
"description",
"verbose_name",
"metric_type",
"json",
"datasource",
"d3format",
"warning_text",
]
add_columns = edit_columns
page_size = 500
validators_columns = {"json": [validate_json]}
description_columns = {
"metric_type": utils.markdown(
"use `postagg` as the metric type if you are defining a "
"[Druid Post Aggregation]"
"(http://druid.io/docs/latest/querying/post-aggregations.html)",
True,
)
}
label_columns = {
"metric_name": _("Metric"),
"description": _("Description"),
"verbose_name": _("Verbose Name"),
"metric_type": _("Type"),
"json": _("JSON"),
"datasource": _("Druid Datasource"),
"warning_text": _("Warning Message"),
}
add_form_extra_fields = {
"datasource": QuerySelectField(
"Datasource",
query_factory=lambda: db.session.query(models.DruidDatasource),
allow_blank=True,
widget=Select2Widget(extra_classes="readonly"),
)
}
edit_form_extra_fields = add_form_extra_fields
class DruidClusterModelView( # pylint: disable=too-many-ancestors
EnsureEnabledMixin,
SupersetModelView,
DeleteMixin,
YamlExportMixin,
):
datamodel = SQLAInterface(models.DruidCluster)
include_route_methods = RouteMethod.CRUD_SET
list_title = _("Druid Clusters")
show_title = _("Show Druid Cluster")
add_title = _("Add Druid Cluster")
edit_title = _("Edit Druid Cluster")
add_columns = [
"verbose_name",
"broker_host",
"broker_port",
"broker_user",
"broker_pass",
"broker_endpoint",
"cache_timeout",
"cluster_name",
]
edit_columns = add_columns
list_columns = ["cluster_name", "metadata_last_refreshed"]
search_columns = ("cluster_name",)
label_columns = {
"cluster_name": _("Cluster Name"),
"broker_host": _("Broker Host"),
"broker_port": _("Broker Port"),
"broker_user": _("Broker Username"),
"broker_pass": _("Broker Password"),
"broker_endpoint": _("Broker Endpoint"),
"verbose_name": _("Verbose Name"),
"cache_timeout": _("Cache Timeout"),
"metadata_last_refreshed": _("Metadata Last Refreshed"),
}
description_columns = {
"cache_timeout": _(
"Duration (in seconds) of the caching timeout for this cluster. "
"A timeout of 0 indicates that the cache never expires. "
"Note this defaults to the global timeout if undefined."
),
"broker_user": _(
"Druid supports basic authentication. See "
"[auth](http://druid.io/docs/latest/design/auth.html) and "
"druid-basic-security extension"
),
"broker_pass": _(
"Druid supports basic authentication. See "
"[auth](http://druid.io/docs/latest/design/auth.html) and "
"druid-basic-security extension"
),
}
yaml_dict_key = "databases"
def pre_add(self, item: "DruidClusterModelView") -> None:
security_manager.add_permission_view_menu("database_access", item.perm)
def pre_update(self, item: "DruidClusterModelView") -> None:
self.pre_add(item)
def _delete(self, pk: int) -> None:
DeleteMixin._delete(self, pk)
class DruidDatasourceModelView( # pylint: disable=too-many-ancestors
EnsureEnabledMixin,
DatasourceModelView,
DeleteMixin,
YamlExportMixin,
):
datamodel = SQLAInterface(models.DruidDatasource)
include_route_methods = RouteMethod.CRUD_SET
list_title = _("Druid Datasources")
show_title = _("Show Druid Datasource")
add_title = _("Add Druid Datasource")
edit_title = _("Edit Druid Datasource")
list_columns = ["datasource_link", "cluster", "changed_by_", "modified"]
order_columns = ["datasource_link", "modified"]
related_views = [DruidColumnInlineView, DruidMetricInlineView]
edit_columns = [
"datasource_name",
"cluster",
"description",
"owners",
"is_hidden",
"filter_select_enabled",
"fetch_values_from",
"default_endpoint",
"offset",
"cache_timeout",
]
search_columns = ("datasource_name", "cluster", "description", "owners")
add_columns = edit_columns
show_columns = add_columns + ["perm", "slices"]
page_size = 500
base_order = ("datasource_name", "asc")
description_columns = {
"slices": _(
"The list of charts associated with this table. By "
"altering this datasource, you may change how these associated "
"charts behave. "
"Also note that charts need to point to a datasource, so "
"this form will fail at saving if removing charts from a "
"datasource. If you want to change the datasource for a chart, "
"overwrite the chart from the 'explore view'"
),
"offset": _("Timezone offset (in hours) for this datasource"),
"description": Markup(
'Supports <a href="'
'https://daringfireball.net/projects/markdown/">markdown</a>'
),
"fetch_values_from": _(
"Time expression to use as a predicate when retrieving "
"distinct values to populate the filter component. "
"Only applies when `Enable Filter Select` is on. If "
"you enter `7 days ago`, the distinct list of values in "
"the filter will be populated based on the distinct value over "
"the past week"
),
"filter_select_enabled": _(
"Whether to populate the filter's dropdown in the explore "
"view's filter section with a list of distinct values fetched "
"from the backend on the fly"
),
"default_endpoint": _(
"Redirects to this endpoint when clicking on the datasource "
"from the datasource list"
),
"cache_timeout": _(
"Duration (in seconds) of the caching timeout for this datasource. "
"A timeout of 0 indicates that the cache never expires. "
"Note this defaults to the cluster timeout if undefined."
),
}
base_filters = [["id", DatasourceFilter, lambda: []]]
label_columns = {
"slices": _("Associated Charts"),
"datasource_link": _("Data Source"),
"cluster": _("Cluster"),
"description": _("Description"),
"owners": _("Owners"),
"is_hidden": _("Is Hidden"),
"filter_select_enabled": _("Enable Filter Select"),
"default_endpoint": _("Default Endpoint"),
"offset": _("Time Offset"),
"cache_timeout": _("Cache Timeout"),
"datasource_name": _("Datasource Name"),
"fetch_values_from": _("Fetch Values From"),
"changed_by_": _("Changed By"),
"modified": _("Modified"),
}
edit_form_extra_fields = {
"cluster": QuerySelectField(
"Cluster",
query_factory=lambda: db.session.query(models.DruidCluster),
widget=Select2Widget(extra_classes="readonly"),
),
"datasource_name": StringField(
"Datasource Name", widget=BS3TextFieldROWidget()
),
}
def pre_add(self, item: "DruidDatasourceModelView") -> None:
with db.session.no_autoflush:
query = db.session.query(models.DruidDatasource).filter(
models.DruidDatasource.datasource_name == item.datasource_name,
models.DruidDatasource.cluster_id == item.cluster_id,
)
if db.session.query(query.exists()).scalar():
raise Exception(get_dataset_exist_error_msg(item.full_name))
def post_add(self, item: "DruidDatasourceModelView") -> None:
item.refresh_metrics()
security_manager.add_permission_view_menu("datasource_access", item.get_perm())
if item.schema:
security_manager.add_permission_view_menu("schema_access", item.schema_perm)
def post_update(self, item: "DruidDatasourceModelView") -> None:
self.post_add(item)
def _delete(self, pk: int) -> None:
DeleteMixin._delete(self, pk)
class Druid(EnsureEnabledMixin, BaseSupersetView):
"""The base views for Superset!"""
@has_access
@expose("/refresh_datasources/")
def refresh_datasources( # pylint: disable=no-self-use
self, refresh_all: bool = True
) -> FlaskResponse:
"""endpoint that refreshes druid datasources metadata"""
session = db.session()
DruidCluster = ConnectorRegistry.sources[ # pylint: disable=invalid-name
"druid"
].cluster_class
for cluster in session.query(DruidCluster).all():
cluster_name = cluster.cluster_name
valid_cluster = True
try:
cluster.refresh_datasources(refresh_all=refresh_all)
except Exception as ex: # pylint: disable=broad-except
valid_cluster = False
flash(
"Error while processing cluster '{}'\n{}".format(
cluster_name, utils.error_msg_from_exception(ex)
),
"danger",
)
logger.exception(ex)
if valid_cluster:
cluster.metadata_last_refreshed = datetime.now()
flash(
_("Refreshed metadata from cluster [{}]").format(
cluster.cluster_name
),
"info",
)
session.commit()
return redirect("/druiddatasourcemodelview/list/")
@has_access
@expose("/scan_new_datasources/")
def scan_new_datasources(self) -> FlaskResponse:
"""
Calling this endpoint will cause a scan for new
datasources only and add them.
"""
return self.refresh_datasources(refresh_all=False)

View File

@ -269,20 +269,11 @@ def import_dashboard(
return dashboard_to_import.id # type: ignore
def decode_dashboards( # pylint: disable=too-many-return-statements
o: Dict[str, Any]
) -> Any:
def decode_dashboards(o: Dict[str, Any]) -> Any:
"""
Function to be passed into json.loads obj_hook parameter
Recreates the dashboard object from a json representation.
"""
# pylint: disable=import-outside-toplevel
from superset.connectors.druid.models import (
DruidCluster,
DruidColumn,
DruidDatasource,
DruidMetric,
)
if "__Dashboard__" in o:
return Dashboard(**o["__Dashboard__"])
@ -294,14 +285,6 @@ def decode_dashboards( # pylint: disable=too-many-return-statements
return SqlaTable(**o["__SqlaTable__"])
if "__SqlMetric__" in o:
return SqlMetric(**o["__SqlMetric__"])
if "__DruidCluster__" in o:
return DruidCluster(**o["__DruidCluster__"])
if "__DruidColumn__" in o:
return DruidColumn(**o["__DruidColumn__"])
if "__DruidDatasource__" in o:
return DruidDatasource(**o["__DruidDatasource__"])
if "__DruidMetric__" in o:
return DruidMetric(**o["__DruidMetric__"])
if "__datetime__" in o:
return datetime.strptime(o["__datetime__"], "%Y-%m-%dT%H:%M:%S")

View File

@ -27,16 +27,11 @@ from superset import db
from superset.commands.base import BaseCommand
from superset.commands.importers.exceptions import IncorrectVersionError
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.connectors.druid.models import (
DruidCluster,
DruidColumn,
DruidDatasource,
DruidMetric,
)
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.databases.commands.exceptions import DatabaseNotFoundError
from superset.datasets.commands.exceptions import DatasetInvalidError
from superset.models.core import Database
from superset.utils.dict_import_export import DATABASES_KEY, DRUID_CLUSTERS_KEY
from superset.utils.dict_import_export import DATABASES_KEY
logger = logging.getLogger(__name__)
@ -65,21 +60,6 @@ def lookup_sqla_database(table: SqlaTable) -> Optional[Database]:
return database
def lookup_druid_cluster(datasource: DruidDatasource) -> Optional[DruidCluster]:
return db.session.query(DruidCluster).filter_by(id=datasource.cluster_id).first()
def lookup_druid_datasource(datasource: DruidDatasource) -> Optional[DruidDatasource]:
return (
db.session.query(DruidDatasource)
.filter(
DruidDatasource.datasource_name == datasource.datasource_name,
DruidDatasource.cluster_id == datasource.cluster_id,
)
.first()
)
def import_dataset(
i_datasource: BaseDatasource,
database_id: Optional[int] = None,
@ -97,9 +77,9 @@ def import_dataset(
if isinstance(i_datasource, SqlaTable):
lookup_database = lookup_sqla_database
lookup_datasource = lookup_sqla_table
else:
lookup_database = lookup_druid_cluster
lookup_datasource = lookup_druid_datasource
raise DatasetInvalidError
return import_datasource(
db.session,
@ -122,22 +102,11 @@ def lookup_sqla_metric(session: Session, metric: SqlMetric) -> SqlMetric:
)
def lookup_druid_metric(session: Session, metric: DruidMetric) -> DruidMetric:
return (
session.query(DruidMetric)
.filter(
DruidMetric.datasource_id == metric.datasource_id,
DruidMetric.metric_name == metric.metric_name,
)
.first()
)
def import_metric(session: Session, metric: BaseMetric) -> BaseMetric:
if isinstance(metric, SqlMetric):
lookup_metric = lookup_sqla_metric
else:
lookup_metric = lookup_druid_metric
raise Exception(f"Invalid metric type: {metric}")
return import_simple_obj(session, metric, lookup_metric)
@ -152,22 +121,11 @@ def lookup_sqla_column(session: Session, column: TableColumn) -> TableColumn:
)
def lookup_druid_column(session: Session, column: DruidColumn) -> DruidColumn:
return (
session.query(DruidColumn)
.filter(
DruidColumn.datasource_id == column.datasource_id,
DruidColumn.column_name == column.column_name,
)
.first()
)
def import_column(session: Session, column: BaseColumn) -> BaseColumn:
if isinstance(column, TableColumn):
lookup_column = lookup_sqla_column
else:
lookup_column = lookup_druid_column
raise Exception(f"Invalid column type: {column}")
return import_simple_obj(session, column, lookup_column)
@ -257,19 +215,13 @@ def import_simple_obj(
def import_from_dict(
session: Session, data: Dict[str, Any], sync: Optional[List[str]] = None
) -> None:
"""Imports databases and druid clusters from dictionary"""
"""Imports databases from dictionary"""
if not sync:
sync = []
if isinstance(data, dict):
logger.info("Importing %d %s", len(data.get(DATABASES_KEY, [])), DATABASES_KEY)
for database in data.get(DATABASES_KEY, []):
Database.import_from_dict(session, database, sync=sync)
logger.info(
"Importing %d %s", len(data.get(DRUID_CLUSTERS_KEY, [])), DRUID_CLUSTERS_KEY
)
for datasource in data.get(DRUID_CLUSTERS_KEY, []):
DruidCluster.import_from_dict(session, datasource, sync=sync)
session.commit()
else:
logger.info("Supplied object is not a dictionary.")
@ -334,7 +286,7 @@ class ImportDatasetsCommand(BaseCommand):
# CLI export
if isinstance(config, dict):
# TODO (betodealmeida): validate with Marshmallow
if DATABASES_KEY not in config and DRUID_CLUSTERS_KEY not in config:
if DATABASES_KEY not in config:
raise IncorrectVersionError(f"{file_name} has no valid keys")
# UI export

View File

@ -119,13 +119,6 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
from superset.cachekeys.api import CacheRestApi
from superset.charts.api import ChartRestApi
from superset.charts.data.api import ChartDataRestApi
from superset.connectors.druid.views import (
Druid,
DruidClusterModelView,
DruidColumnInlineView,
DruidDatasourceModelView,
DruidMetricInlineView,
)
from superset.connectors.sqla.views import (
RowLevelSecurityFiltersModelView,
SqlMetricInlineView,
@ -152,7 +145,7 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
from superset.reports.logs.api import ReportExecutionLogRestApi
from superset.security.api import SecurityRestApi
from superset.views.access_requests import AccessRequestsModelView
from superset.views.alerts import AlertView, ReportView
from superset.views.alerts import AlertView
from superset.views.annotations import (
AnnotationLayerModelView,
AnnotationModelView,
@ -405,66 +398,6 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
menu_cond=lambda: bool(self.config["ENABLE_ACCESS_REQUEST"]),
)
#
# Druid Views
#
appbuilder.add_separator(
"Data", cond=lambda: bool(self.config["DRUID_IS_ACTIVE"])
)
appbuilder.add_view(
DruidDatasourceModelView,
"Druid Datasources",
label=__("Druid Datasources"),
category="Data",
category_label=__("Data"),
icon="fa-cube",
menu_cond=lambda: bool(self.config["DRUID_IS_ACTIVE"]),
)
appbuilder.add_view(
DruidClusterModelView,
name="Druid Clusters",
label=__("Druid Clusters"),
icon="fa-cubes",
category="Data",
category_label=__("Data"),
category_icon="fa-database",
menu_cond=lambda: bool(self.config["DRUID_IS_ACTIVE"]),
)
appbuilder.add_view_no_menu(DruidMetricInlineView)
appbuilder.add_view_no_menu(DruidColumnInlineView)
appbuilder.add_view_no_menu(Druid)
appbuilder.add_link(
"Scan New Datasources",
label=__("Scan New Datasources"),
href="/druid/scan_new_datasources/",
category="Data",
category_label=__("Data"),
category_icon="fa-database",
icon="fa-refresh",
cond=lambda: bool(
self.config["DRUID_IS_ACTIVE"]
and self.config["DRUID_METADATA_LINKS_ENABLED"]
),
)
appbuilder.add_view_no_menu(ReportView)
appbuilder.add_link(
"Refresh Druid Metadata",
label=__("Refresh Druid Metadata"),
href="/druid/refresh_datasources/",
category="Data",
category_label=__("Data"),
category_icon="fa-database",
icon="fa-cog",
cond=lambda: bool(
self.config["DRUID_IS_ACTIVE"]
and self.config["DRUID_METADATA_LINKS_ENABLED"]
),
)
appbuilder.add_separator(
"Data", cond=lambda: bool(self.config["DRUID_IS_ACTIVE"])
)
def init_app_in_ctx(self) -> None:
"""
Runs init logic in the context of the app

View File

@ -49,7 +49,6 @@ from sqlalchemy.sql.elements import BinaryExpression
from superset import app, ConnectorRegistry, db, is_feature_enabled, security_manager
from superset.common.request_contexed_based import is_user_admin
from superset.connectors.base.models import BaseDatasource
from superset.connectors.druid.models import DruidColumn, DruidMetric
from superset.connectors.sqla.models import SqlMetric, TableColumn
from superset.extensions import cache_manager
from superset.models.filter_set import FilterSet
@ -485,8 +484,6 @@ if is_feature_enabled("DASHBOARD_CACHE"):
Dashboard.clear_cache_for_datasource(datasource_id=obj.id)
elif isinstance(obj, (SqlMetric, TableColumn)):
Dashboard.clear_cache_for_datasource(datasource_id=obj.table_id)
elif isinstance(obj, (DruidMetric, DruidColumn)):
Dashboard.clear_cache_for_datasource(datasource_id=obj.datasource_id)
sqla.event.listen(Dashboard, "after_update", clear_dashboard_cache)
sqla.event.listen(
@ -501,5 +498,3 @@ if is_feature_enabled("DASHBOARD_CACHE"):
# trigger update events for BaseDatasource.
sqla.event.listen(SqlMetric, "after_update", clear_dashboard_cache)
sqla.event.listen(TableColumn, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidMetric, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidColumn, "after_update", clear_dashboard_cache)

View File

@ -79,7 +79,6 @@ from superset.utils.urls import get_url_host
if TYPE_CHECKING:
from superset.common.query_context import QueryContext
from superset.connectors.base.models import BaseDatasource
from superset.connectors.druid.models import DruidCluster
from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.sql_lab import Query
@ -153,9 +152,6 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
GAMMA_READ_ONLY_MODEL_VIEWS = {
"Dataset",
"DruidColumnInlineView",
"DruidDatasourceModelView",
"DruidMetricInlineView",
"Datasource",
} | READ_ONLY_MODEL_VIEWS
@ -325,7 +321,7 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
return self.can_access("all_database_access", "all_database_access")
def can_access_database(self, database: Union["Database", "DruidCluster"]) -> bool:
def can_access_database(self, database: "Database") -> bool:
"""
Return True if the user can fully access the Superset database, False otherwise.

View File

@ -19,12 +19,10 @@ from typing import Any, Dict
from sqlalchemy.orm import Session
from superset.connectors.druid.models import DruidCluster
from superset.models.core import Database
EXPORT_VERSION = "1.0.0"
DATABASES_KEY = "databases"
DRUID_CLUSTERS_KEY = "druid_clusters"
logger = logging.getLogger(__name__)
@ -33,14 +31,9 @@ def export_schema_to_dict(back_references: bool) -> Dict[str, Any]:
databases = [
Database.export_schema(recursive=True, include_parent_ref=back_references)
]
clusters = [
DruidCluster.export_schema(recursive=True, include_parent_ref=back_references)
]
data = {}
if databases:
data[DATABASES_KEY] = databases
if clusters:
data[DRUID_CLUSTERS_KEY] = clusters
return data
@ -59,19 +52,7 @@ def export_to_dict(
for database in dbs
]
logger.info("Exported %d %s", len(databases), DATABASES_KEY)
cls = session.query(DruidCluster)
clusters = [
cluster.export_to_dict(
recursive=recursive,
include_parent_ref=back_references,
include_defaults=include_defaults,
)
for cluster in cls
]
logger.info("Exported %d %s", len(clusters), DRUID_CLUSTERS_KEY)
data = {}
if databases:
data[DATABASES_KEY] = databases
if clusters:
data[DRUID_CLUSTERS_KEY] = clusters
return data

View File

@ -19,7 +19,7 @@ import functools
import logging
import traceback
from datetime import datetime
from typing import Any, Callable, cast, Dict, List, Optional, TYPE_CHECKING, Union
from typing import Any, Callable, cast, Dict, List, Optional, Union
import simplejson as json
import yaml
@ -79,16 +79,12 @@ from superset.utils import core as utils
from .utils import bootstrap_user_data
if TYPE_CHECKING:
from superset.connectors.druid.views import DruidClusterModelView
FRONTEND_CONF_KEYS = (
"SUPERSET_WEBSERVER_TIMEOUT",
"SUPERSET_DASHBOARD_POSITION_DATA_LIMIT",
"SUPERSET_DASHBOARD_PERIODICAL_REFRESH_LIMIT",
"SUPERSET_DASHBOARD_PERIODICAL_REFRESH_WARNING_MESSAGE",
"DISABLE_DATASET_SOURCE_EDIT",
"DRUID_IS_ACTIVE",
"ENABLE_JAVASCRIPT_CONTROLS",
"DEFAULT_SQLLAB_LIMIT",
"DEFAULT_VIZ_TYPE",

View File

@ -1996,61 +1996,6 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
def log(self) -> FlaskResponse: # pylint: disable=no-self-use
return Response(status=200)
@has_access
@expose("/sync_druid/", methods=["POST"])
@event_logger.log_this
def sync_druid_source(self) -> FlaskResponse: # pylint: disable=no-self-use
"""Syncs the druid datasource in main db with the provided config.
The endpoint takes 3 arguments:
user - user name to perform the operation as
cluster - name of the druid cluster
config - configuration stored in json that contains:
name: druid datasource name
dimensions: list of the dimensions, they become druid columns
with the type STRING
metrics_spec: list of metrics (dictionary). Metric consists of
2 attributes: type and name. Type can be count,
etc. `count` type is stored internally as longSum
other fields will be ignored.
Example: {
'name': 'test_click',
'metrics_spec': [{'type': 'count', 'name': 'count'}],
'dimensions': ['affiliate_id', 'campaign', 'first_seen']
}
"""
payload = request.get_json(force=True)
druid_config = payload["config"]
user_name = payload["user"]
cluster_name = payload["cluster"]
user = security_manager.find_user(username=user_name)
DruidDatasource = ConnectorRegistry.sources[ # pylint: disable=invalid-name
"druid"
]
DruidCluster = DruidDatasource.cluster_class # pylint: disable=invalid-name
if not user:
err_msg = __("Can't find user, please ask your admin to create one.")
logger.error(err_msg, exc_info=True)
return json_error_response(err_msg)
cluster = (
db.session.query(DruidCluster)
.filter_by(cluster_name=cluster_name)
.one_or_none()
)
if not cluster:
err_msg = __("Can't find DruidCluster")
logger.error(err_msg, exc_info=True)
return json_error_response(err_msg)
try:
DruidDatasource.sync_to_db_from_config(druid_config, user, cluster)
except Exception as ex: # pylint: disable=broad-except
err_msg = utils.error_msg_from_exception(ex)
logger.exception(err_msg)
return json_error_response(err_msg)
return Response(status=201)
@has_access
@expose("/get_or_create_table/", methods=["POST"])
@event_logger.log_this

View File

@ -38,7 +38,6 @@ from tests.integration_tests.fixtures.energy_dashboard import (
from tests.integration_tests.test_app import app # isort:skip
from superset import db, security_manager
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.druid.models import DruidDatasource
from superset.connectors.sqla.models import SqlaTable
from superset.models import core as models
from superset.models.datasource_access_request import DatasourceAccessRequest
@ -114,8 +113,6 @@ class TestRequestAccess(SupersetTestCase):
@classmethod
def setUpClass(cls):
with app.app_context():
cls.create_druid_test_objects()
security_manager.add_role("override_me")
security_manager.add_role(TEST_ROLE_1)
security_manager.add_role(TEST_ROLE_2)
@ -181,40 +178,6 @@ class TestRequestAccess(SupersetTestCase):
"datasource_access", updated_override_me.permissions[0].permission.name
)
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
def test_override_role_permissions_druid_and_table(self):
database = get_example_database()
engine = database.get_sqla_engine()
schema = inspect(engine).default_schema_name
perm_data = ROLE_ALL_PERM_DATA.copy()
perm_data["database"][0]["schema"][0]["name"] = schema
response = self.client.post(
"/superset/override_role_permissions/",
data=json.dumps(ROLE_ALL_PERM_DATA),
content_type="application/json",
)
self.assertEqual(201, response.status_code)
updated_role = security_manager.find_role("override_me")
perms = sorted(updated_role.permissions, key=lambda p: p.view_menu.name)
druid_ds_1 = self.get_druid_ds_by_name("druid_ds_1")
self.assertEqual(druid_ds_1.perm, perms[0].view_menu.name)
self.assertEqual("datasource_access", perms[0].permission.name)
druid_ds_2 = self.get_druid_ds_by_name("druid_ds_2")
self.assertEqual(druid_ds_2.perm, perms[1].view_menu.name)
self.assertEqual(
"datasource_access", updated_role.permissions[1].permission.name
)
birth_names = self.get_table(name="birth_names")
self.assertEqual(birth_names.perm, perms[2].view_menu.name)
self.assertEqual(
"datasource_access", updated_role.permissions[2].permission.name
)
self.assertEqual(3, len(perms))
@pytest.mark.usefixtures(
"load_energy_table_with_slice", "load_birth_names_dashboard_with_slices"
)
@ -596,56 +559,6 @@ class TestRequestAccess(SupersetTestCase):
"<ul><li>{}</li></ul>".format(approve_link_3),
)
# Request druid access, there are no roles have this table.
druid_ds_4 = (
session.query(DruidDatasource)
.filter_by(datasource_name="druid_ds_1")
.first()
)
druid_ds_4_id = druid_ds_4.id
# request access to the table
self.get_resp(ACCESS_REQUEST.format("druid", druid_ds_4_id, "go"))
access_request4 = self.get_access_requests("gamma", "druid", druid_ds_4_id)
self.assertEqual(access_request4.roles_with_datasource, "<ul></ul>")
# Case 5. Roles exist that contains the druid datasource.
# add druid ds to the existing roles
druid_ds_5 = (
session.query(DruidDatasource)
.filter_by(datasource_name="druid_ds_2")
.first()
)
druid_ds_5_id = druid_ds_5.id
druid_ds_5_perm = druid_ds_5.perm
druid_ds_2_role = security_manager.add_role("druid_ds_2_role")
admin_role = security_manager.find_role("Admin")
security_manager.add_permission_role(
admin_role,
security_manager.find_permission_view_menu(
"datasource_access", druid_ds_5_perm
),
)
security_manager.add_permission_role(
druid_ds_2_role,
security_manager.find_permission_view_menu(
"datasource_access", druid_ds_5_perm
),
)
session.commit()
self.get_resp(ACCESS_REQUEST.format("druid", druid_ds_5_id, "go"))
access_request5 = self.get_access_requests("gamma", "druid", druid_ds_5_id)
approve_link_5 = ROLE_GRANT_LINK.format(
"druid", druid_ds_5_id, "gamma", "druid_ds_2_role", "druid_ds_2_role"
)
self.assertEqual(
access_request5.roles_with_datasource,
"<ul><li>{}</li></ul>".format(approve_link_5),
)
# cleanup
gamma_user = security_manager.find_user(username="gamma")
gamma_user.roles.remove(security_manager.find_role("dummy_role"))

View File

@ -38,7 +38,6 @@ from tests.integration_tests.test_app import app
from superset.sql_parse import CtasMethod
from superset import db, security_manager
from superset.connectors.base.models import BaseDatasource
from superset.connectors.druid.models import DruidCluster, DruidDatasource
from superset.connectors.sqla.models import SqlaTable
from superset.models import core as models
from superset.models.slice import Slice
@ -153,7 +152,7 @@ class SupersetTestCase(TestCase):
user_to_create.roles = []
for chosen_user_role in roles:
if should_create_roles:
## copy role from gamma but without data permissions
# copy role from gamma but without data permissions
security_manager.copy_role("Gamma", chosen_user_role, merge=False)
user_to_create.roles.append(security_manager.find_role(chosen_user_role))
db.session.commit()
@ -191,30 +190,6 @@ class SupersetTestCase(TestCase):
)
return user
@classmethod
def create_druid_test_objects(cls):
# create druid cluster and druid datasources
with app.app_context():
session = db.session
cluster = (
session.query(DruidCluster).filter_by(cluster_name="druid_test").first()
)
if not cluster:
cluster = DruidCluster(cluster_name="druid_test")
session.add(cluster)
session.commit()
druid_datasource1 = DruidDatasource(
datasource_name="druid_ds_1", cluster=cluster
)
session.add(druid_datasource1)
druid_datasource2 = DruidDatasource(
datasource_name="druid_ds_2", cluster=cluster
)
session.add(druid_datasource2)
session.commit()
@staticmethod
def get_table_by_id(table_id: int) -> SqlaTable:
return db.session.query(SqlaTable).filter_by(id=table_id).one()
@ -275,10 +250,6 @@ class SupersetTestCase(TestCase):
else:
raise ValueError("Database doesn't exist")
@staticmethod
def get_druid_ds_by_name(name: str) -> DruidDatasource:
return db.session.query(DruidDatasource).filter_by(datasource_name=name).first()
@staticmethod
def get_datasource_mock() -> BaseDatasource:
datasource = Mock()

View File

@ -294,7 +294,6 @@ class TestCore(SupersetTestCase):
def assert_admin_permission_in(role_name, assert_func):
role = security_manager.find_role(role_name)
permissions = [p.permission.name for p in role.permissions]
assert_func("can_sync_druid_source", permissions)
assert_func("can_approve", permissions)
assert_admin_permission_in("Admin", self.assertIn)

View File

@ -24,12 +24,7 @@ import yaml
from tests.integration_tests.test_app import app
from superset import db
from superset.connectors.druid.models import (
DruidColumn,
DruidDatasource,
DruidMetric,
DruidCluster,
)
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.utils.database import get_example_database
from superset.utils.dict_import_export import export_to_dict
@ -52,9 +47,6 @@ class TestDictImportExport(SupersetTestCase):
for table in session.query(SqlaTable):
if DBREF in table.params_dict:
session.delete(table)
for datasource in session.query(DruidDatasource):
if DBREF in datasource.params_dict:
session.delete(datasource)
session.commit()
@classmethod
@ -96,38 +88,6 @@ class TestDictImportExport(SupersetTestCase):
table.metrics.append(SqlMetric(metric_name=metric_name, expression=""))
return table, dict_rep
def create_druid_datasource(self, name, id=0, cols_names=[], metric_names=[]):
cluster_name = "druid_test"
cluster = self.get_or_create(
DruidCluster, {"cluster_name": cluster_name}, db.session
)
name = "{0}{1}".format(NAME_PREFIX, name)
params = {DBREF: id, "database_name": cluster_name}
dict_rep = {
"cluster_id": cluster.id,
"datasource_name": name,
"id": id,
"params": json.dumps(params),
"columns": [{"column_name": c} for c in cols_names],
"metrics": [{"metric_name": c, "json": "{}"} for c in metric_names],
}
datasource = DruidDatasource(
id=id,
datasource_name=name,
cluster_id=cluster.id,
params=json.dumps(params),
)
for col_name in cols_names:
datasource.columns.append(DruidColumn(column_name=col_name))
for metric_name in metric_names:
datasource.metrics.append(DruidMetric(metric_name=metric_name))
return datasource, dict_rep
def get_datasource(self, datasource_id):
return db.session.query(DruidDatasource).filter_by(id=datasource_id).first()
def yaml_compare(self, obj_1, obj_2):
obj_1_str = yaml.safe_dump(obj_1, default_flow_style=False)
obj_2_str = yaml.safe_dump(obj_2, default_flow_style=False)
@ -308,118 +268,6 @@ class TestDictImportExport(SupersetTestCase):
ui_export["databases"][0]["tables"], cli_export["databases"][0]["tables"]
)
def test_import_druid_no_metadata(self):
datasource, dict_datasource = self.create_druid_datasource(
"pure_druid", id=ID_PREFIX + 1
)
imported_cluster = DruidDatasource.import_from_dict(db.session, dict_datasource)
db.session.commit()
imported = self.get_datasource(imported_cluster.id)
self.assert_datasource_equals(datasource, imported)
def test_import_druid_1_col_1_met(self):
datasource, dict_datasource = self.create_druid_datasource(
"druid_1_col_1_met",
id=ID_PREFIX + 2,
cols_names=["col1"],
metric_names=["metric1"],
)
imported_cluster = DruidDatasource.import_from_dict(db.session, dict_datasource)
db.session.commit()
imported = self.get_datasource(imported_cluster.id)
self.assert_datasource_equals(datasource, imported)
self.assertEqual(
{DBREF: ID_PREFIX + 2, "database_name": "druid_test"},
json.loads(imported.params),
)
def test_import_druid_2_col_2_met(self):
datasource, dict_datasource = self.create_druid_datasource(
"druid_2_col_2_met",
id=ID_PREFIX + 3,
cols_names=["c1", "c2"],
metric_names=["m1", "m2"],
)
imported_cluster = DruidDatasource.import_from_dict(db.session, dict_datasource)
db.session.commit()
imported = self.get_datasource(imported_cluster.id)
self.assert_datasource_equals(datasource, imported)
def test_import_druid_override_append(self):
datasource, dict_datasource = self.create_druid_datasource(
"druid_override", id=ID_PREFIX + 3, cols_names=["col1"], metric_names=["m1"]
)
imported_cluster = DruidDatasource.import_from_dict(db.session, dict_datasource)
db.session.commit()
table_over, table_over_dict = self.create_druid_datasource(
"druid_override",
id=ID_PREFIX + 3,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported_over_cluster = DruidDatasource.import_from_dict(
db.session, table_over_dict
)
db.session.commit()
imported_over = self.get_datasource(imported_over_cluster.id)
self.assertEqual(imported_cluster.id, imported_over.id)
expected_datasource, _ = self.create_druid_datasource(
"druid_override",
id=ID_PREFIX + 3,
metric_names=["new_metric1", "m1"],
cols_names=["col1", "new_col1", "col2", "col3"],
)
self.assert_datasource_equals(expected_datasource, imported_over)
def test_import_druid_override_sync(self):
datasource, dict_datasource = self.create_druid_datasource(
"druid_override", id=ID_PREFIX + 3, cols_names=["col1"], metric_names=["m1"]
)
imported_cluster = DruidDatasource.import_from_dict(db.session, dict_datasource)
db.session.commit()
table_over, table_over_dict = self.create_druid_datasource(
"druid_override",
id=ID_PREFIX + 3,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported_over_cluster = DruidDatasource.import_from_dict(
session=db.session, dict_rep=table_over_dict, sync=["metrics", "columns"]
) # syncing metrics and columns
db.session.commit()
imported_over = self.get_datasource(imported_over_cluster.id)
self.assertEqual(imported_cluster.id, imported_over.id)
expected_datasource, _ = self.create_druid_datasource(
"druid_override",
id=ID_PREFIX + 3,
metric_names=["new_metric1"],
cols_names=["new_col1", "col2", "col3"],
)
self.assert_datasource_equals(expected_datasource, imported_over)
def test_import_druid_override_identical(self):
datasource, dict_datasource = self.create_druid_datasource(
"copy_cat",
id=ID_PREFIX + 4,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported = DruidDatasource.import_from_dict(
session=db.session, dict_rep=dict_datasource
)
db.session.commit()
copy_datasource, dict_cp_datasource = self.create_druid_datasource(
"copy_cat",
id=ID_PREFIX + 4,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported_copy = DruidDatasource.import_from_dict(db.session, dict_cp_datasource)
db.session.commit()
self.assertEqual(imported.id, imported_copy.id)
self.assert_datasource_equals(copy_datasource, self.get_datasource(imported.id))
if __name__ == "__main__":
unittest.main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,668 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
"""Unit tests for Superset"""
import json
import unittest
from datetime import datetime
from unittest.mock import Mock, patch
from tests.integration_tests.test_app import app
from superset import db, security_manager
from superset.connectors.druid.views import (
Druid,
DruidClusterModelView,
DruidColumnInlineView,
DruidDatasourceModelView,
DruidMetricInlineView,
)
from .base_tests import SupersetTestCase
try:
from superset.connectors.druid.models import (
DruidCluster,
DruidColumn,
DruidDatasource,
DruidMetric,
)
except ImportError:
pass
class PickableMock(Mock):
def __reduce__(self):
return (Mock, ())
SEGMENT_METADATA = [
{
"id": "some_id",
"intervals": ["2013-05-13T00:00:00.000Z/2013-05-14T00:00:00.000Z"],
"columns": {
"__time": {
"type": "LONG",
"hasMultipleValues": False,
"size": 407240380,
"cardinality": None,
"errorMessage": None,
},
"dim1": {
"type": "STRING",
"hasMultipleValues": False,
"size": 100000,
"cardinality": 1944,
"errorMessage": None,
},
"dim2": {
"type": "STRING",
"hasMultipleValues": True,
"size": 100000,
"cardinality": 1504,
"errorMessage": None,
},
"metric1": {
"type": "FLOAT",
"hasMultipleValues": False,
"size": 100000,
"cardinality": None,
"errorMessage": None,
},
},
"aggregators": {
"metric1": {"type": "longSum", "name": "metric1", "fieldName": "metric1"}
},
"size": 300000,
"numRows": 5000000,
}
]
GB_RESULT_SET = [
{
"version": "v1",
"timestamp": "2012-01-01T00:00:00.000Z",
"event": {"dim1": "Canada", "dim2": "boy", "count": 12345678},
},
{
"version": "v1",
"timestamp": "2012-01-01T00:00:00.000Z",
"event": {"dim1": "USA", "dim2": "girl", "count": 12345678 / 2},
},
]
DruidCluster.get_druid_version = lambda _: "0.9.1" # type: ignore
class TestDruid(SupersetTestCase):
"""Testing interactions with Druid"""
@classmethod
def setUpClass(cls):
cls.create_druid_test_objects()
def get_test_cluster_obj(self):
return DruidCluster(
cluster_name="test_cluster",
broker_host="localhost",
broker_port=7980,
broker_endpoint="druid/v2",
metadata_last_refreshed=datetime.now(),
)
def get_cluster(self, PyDruid):
instance = PyDruid.return_value
instance.time_boundary.return_value = [{"result": {"maxTime": "2016-01-01"}}]
instance.segment_metadata.return_value = SEGMENT_METADATA
cluster = (
db.session.query(DruidCluster)
.filter_by(cluster_name="test_cluster")
.first()
)
if cluster:
for datasource in (
db.session.query(DruidDatasource).filter_by(cluster_id=cluster.id).all()
):
db.session.delete(datasource)
db.session.delete(cluster)
db.session.commit()
cluster = self.get_test_cluster_obj()
db.session.add(cluster)
cluster.get_datasources = PickableMock(return_value=["test_datasource"])
return cluster
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_client(self, PyDruid):
self.login(username="admin")
cluster = self.get_cluster(PyDruid)
cluster.refresh_datasources()
cluster.refresh_datasources(merge_flag=True)
datasource_id = cluster.datasources[0].id
db.session.commit()
nres = [
list(v["event"].items()) + [("timestamp", v["timestamp"])]
for v in GB_RESULT_SET
]
nres = [dict(v) for v in nres]
import pandas as pd
df = pd.DataFrame(nres)
instance = PyDruid.return_value
instance.export_pandas.return_value = df
instance.query_dict = {}
instance.query_builder.last_query.query_dict = {}
resp = self.get_resp("/superset/explore/druid/{}/".format(datasource_id))
self.assertIn("test_datasource", resp)
form_data = {
"viz_type": "table",
"granularity": "one+day",
"druid_time_origin": "",
"since": "7 days ago",
"until": "now",
"row_limit": 5000,
"include_search": "false",
"metrics": ["count"],
"groupby": ["dim1"],
"force": "true",
}
# One groupby
url = "/superset/explore_json/druid/{}/".format(datasource_id)
resp = self.get_json_resp(url, {"form_data": json.dumps(form_data)})
self.assertEqual("Canada", resp["data"]["records"][0]["dim1"])
form_data = {
"viz_type": "table",
"granularity": "one+day",
"druid_time_origin": "",
"since": "7 days ago",
"until": "now",
"row_limit": 5000,
"include_search": "false",
"metrics": ["count"],
"groupby": ["dim1", "dim2"],
"force": "true",
}
# two groupby
url = "/superset/explore_json/druid/{}/".format(datasource_id)
resp = self.get_json_resp(url, {"form_data": json.dumps(form_data)})
self.assertEqual("Canada", resp["data"]["records"][0]["dim1"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_druid_sync_from_config(self):
CLUSTER_NAME = "new_druid"
self.login()
cluster = self.get_or_create(
DruidCluster, {"cluster_name": CLUSTER_NAME}, db.session
)
db.session.merge(cluster)
db.session.commit()
ds = (
db.session.query(DruidDatasource)
.filter_by(datasource_name="test_click")
.first()
)
if ds:
db.session.delete(ds)
db.session.commit()
cfg = {
"user": "admin",
"cluster": CLUSTER_NAME,
"config": {
"name": "test_click",
"dimensions": ["affiliate_id", "campaign", "first_seen"],
"metrics_spec": [
{"type": "count", "name": "count"},
{"type": "sum", "name": "sum"},
],
"batch_ingestion": {
"sql": "SELECT * FROM clicks WHERE d='{{ ds }}'",
"ts_column": "d",
"sources": [{"table": "clicks", "partition": "d='{{ ds }}'"}],
},
},
}
def check():
resp = self.client.post("/superset/sync_druid/", data=json.dumps(cfg))
druid_ds = (
db.session.query(DruidDatasource)
.filter_by(datasource_name="test_click")
.one()
)
col_names = set([c.column_name for c in druid_ds.columns])
assert {"affiliate_id", "campaign", "first_seen"} == col_names
metric_names = {m.metric_name for m in druid_ds.metrics}
assert {"count", "sum"} == metric_names
assert resp.status_code == 201
check()
# checking twice to make sure a second sync yields the same results
check()
# datasource exists, add new metrics and dimensions
cfg = {
"user": "admin",
"cluster": CLUSTER_NAME,
"config": {
"name": "test_click",
"dimensions": ["affiliate_id", "second_seen"],
"metrics_spec": [
{"type": "bla", "name": "sum"},
{"type": "unique", "name": "unique"},
],
},
}
resp = self.client.post("/superset/sync_druid/", data=json.dumps(cfg))
druid_ds = (
db.session.query(DruidDatasource)
.filter_by(datasource_name="test_click")
.one()
)
# columns and metrics are not deleted if config is changed as
# user could define their own dimensions / metrics and want to keep them
assert set([c.column_name for c in druid_ds.columns]) == set(
["affiliate_id", "campaign", "first_seen", "second_seen"]
)
assert set([m.metric_name for m in druid_ds.metrics]) == set(
["count", "sum", "unique"]
)
# metric type will not be overridden, sum stays instead of bla
assert set([m.metric_type for m in druid_ds.metrics]) == set(
["longSum", "sum", "unique"]
)
assert resp.status_code == 201
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@unittest.skipUnless(app.config["DRUID_IS_ACTIVE"], "DRUID_IS_ACTIVE is false")
def test_filter_druid_datasource(self):
CLUSTER_NAME = "new_druid"
cluster = self.get_or_create(
DruidCluster, {"cluster_name": CLUSTER_NAME}, db.session
)
db.session.merge(cluster)
gamma_ds = self.get_or_create(
DruidDatasource,
{"datasource_name": "datasource_for_gamma", "cluster": cluster},
db.session,
)
gamma_ds.cluster = cluster
db.session.merge(gamma_ds)
no_gamma_ds = self.get_or_create(
DruidDatasource,
{"datasource_name": "datasource_not_for_gamma", "cluster": cluster},
db.session,
)
no_gamma_ds.cluster = cluster
db.session.merge(no_gamma_ds)
db.session.commit()
security_manager.add_permission_view_menu("datasource_access", gamma_ds.perm)
security_manager.add_permission_view_menu("datasource_access", no_gamma_ds.perm)
perm = security_manager.find_permission_view_menu(
"datasource_access", gamma_ds.get_perm()
)
security_manager.add_permission_role(security_manager.find_role("Gamma"), perm)
security_manager.get_session.commit()
self.login(username="gamma")
url = "/druiddatasourcemodelview/list/"
resp = self.get_resp(url)
self.assertIn("datasource_for_gamma", resp)
self.assertNotIn("datasource_not_for_gamma", resp)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_sync_druid_perm(self, PyDruid):
self.login(username="admin")
instance = PyDruid.return_value
instance.time_boundary.return_value = [{"result": {"maxTime": "2016-01-01"}}]
instance.segment_metadata.return_value = SEGMENT_METADATA
cluster = (
db.session.query(DruidCluster)
.filter_by(cluster_name="test_cluster")
.first()
)
if cluster:
for datasource in (
db.session.query(DruidDatasource).filter_by(cluster_id=cluster.id).all()
):
db.session.delete(datasource)
db.session.delete(cluster)
db.session.commit()
cluster = DruidCluster(
cluster_name="test_cluster",
broker_host="localhost",
broker_port=7980,
metadata_last_refreshed=datetime.now(),
)
db.session.add(cluster)
cluster.get_datasources = PickableMock(return_value=["test_datasource"])
cluster.refresh_datasources()
cluster.datasources[0].merge_flag = True
metadata = cluster.datasources[0].latest_metadata()
self.assertEqual(len(metadata), 4)
db.session.commit()
view_menu_name = cluster.datasources[0].get_perm()
view_menu = security_manager.find_view_menu(view_menu_name)
permission = security_manager.find_permission("datasource_access")
pv = (
security_manager.get_session.query(security_manager.permissionview_model)
.filter_by(permission=permission, view_menu=view_menu)
.first()
)
assert pv is not None
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_refresh_metadata(self, PyDruid):
self.login(username="admin")
cluster = self.get_cluster(PyDruid)
cluster.refresh_datasources()
datasource = cluster.datasources[0]
cols = db.session.query(DruidColumn).filter(
DruidColumn.datasource_id == datasource.id
)
for col in cols:
self.assertIn(col.column_name, SEGMENT_METADATA[0]["columns"].keys())
metrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == datasource.id)
.filter(DruidMetric.metric_name.like("%__metric1"))
)
for metric in metrics:
agg, _ = metric.metric_name.split("__")
self.assertEqual(
json.loads(metric.json)["type"], "double{}".format(agg.capitalize())
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_refresh_metadata_augment_type(self, PyDruid):
self.login(username="admin")
cluster = self.get_cluster(PyDruid)
cluster.refresh_datasources()
metadata = SEGMENT_METADATA[:]
metadata[0]["columns"]["metric1"]["type"] = "LONG"
instance = PyDruid.return_value
instance.segment_metadata.return_value = metadata
cluster.refresh_datasources()
datasource = cluster.datasources[0]
column = (
db.session.query(DruidColumn)
.filter(DruidColumn.datasource_id == datasource.id)
.filter(DruidColumn.column_name == "metric1")
).one()
self.assertEqual(column.type, "LONG")
metrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == datasource.id)
.filter(DruidMetric.metric_name.like("%__metric1"))
)
for metric in metrics:
agg, _ = metric.metric_name.split("__")
self.assertEqual(metric.json_obj["type"], "long{}".format(agg.capitalize()))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_refresh_metadata_augment_verbose_name(self, PyDruid):
self.login(username="admin")
cluster = self.get_cluster(PyDruid)
cluster.refresh_datasources()
datasource = cluster.datasources[0]
metrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == datasource.id)
.filter(DruidMetric.metric_name.like("%__metric1"))
)
for metric in metrics:
metric.verbose_name = metric.metric_name
db.session.commit()
# The verbose name should not change during a refresh.
cluster.refresh_datasources()
datasource = cluster.datasources[0]
metrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == datasource.id)
.filter(DruidMetric.metric_name.like("%__metric1"))
)
for metric in metrics:
self.assertEqual(metric.verbose_name, metric.metric_name)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_urls(self):
cluster = self.get_test_cluster_obj()
self.assertEqual(
cluster.get_base_url("localhost", "9999"), "http://localhost:9999"
)
self.assertEqual(
cluster.get_base_url("http://localhost", "9999"), "http://localhost:9999"
)
self.assertEqual(
cluster.get_base_url("https://localhost", "9999"), "https://localhost:9999"
)
self.assertEqual(
cluster.get_base_broker_url(), "http://localhost:7980/druid/v2"
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_druid_time_granularities(self, PyDruid):
self.login(username="admin")
cluster = self.get_cluster(PyDruid)
cluster.refresh_datasources()
cluster.refresh_datasources(merge_flag=True)
datasource_id = cluster.datasources[0].id
db.session.commit()
nres = [
list(v["event"].items()) + [("timestamp", v["timestamp"])]
for v in GB_RESULT_SET
]
nres = [dict(v) for v in nres]
import pandas as pd
df = pd.DataFrame(nres)
instance = PyDruid.return_value
instance.export_pandas.return_value = df
instance.query_dict = {}
instance.query_builder.last_query.query_dict = {}
form_data = {
"viz_type": "table",
"since": "7 days ago",
"until": "now",
"metrics": ["count"],
"groupby": [],
"include_time": "true",
}
granularity_map = {
"5 seconds": "PT5S",
"30 seconds": "PT30S",
"1 minute": "PT1M",
"5 minutes": "PT5M",
"1 hour": "PT1H",
"6 hour": "PT6H",
"one day": "P1D",
"1 day": "P1D",
"7 days": "P7D",
"week": "P1W",
"week_starting_sunday": "P1W",
"week_ending_saturday": "P1W",
"month": "P1M",
"quarter": "P3M",
"year": "P1Y",
}
url = "/superset/explore_json/druid/{}/".format(datasource_id)
for granularity_mapping in granularity_map:
form_data["granularity"] = granularity_mapping
self.get_json_resp(url, {"form_data": json.dumps(form_data)})
self.assertEqual(
granularity_map[granularity_mapping],
instance.timeseries.call_args[1]["granularity"]["period"],
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
@patch("superset.connectors.druid.models.PyDruid")
def test_external_metadata(self, PyDruid):
self.login(username="admin")
self.login(username="admin")
cluster = self.get_cluster(PyDruid)
cluster.refresh_datasources()
datasource = cluster.datasources[0]
url = "/datasource/external_metadata/druid/{}/".format(datasource.id)
resp = self.get_json_resp(url)
col_names = {o.get("name") for o in resp}
self.assertEqual(col_names, {"__time", "dim1", "dim2", "metric1"})
class TestDruidViewEnabling(SupersetTestCase):
def test_druid_disabled(self):
with patch.object(Druid, "is_enabled", return_value=False):
self.login("admin")
uri = "/druid/refresh_datasources/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_druid_enabled(self):
with patch.object(Druid, "is_enabled", return_value=True):
self.login("admin")
uri = "/druid/refresh_datasources/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_druid_cluster_disabled(self):
with patch.object(DruidClusterModelView, "is_enabled", return_value=False):
self.login("admin")
uri = "/druidclustermodelview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_druid_cluster_enabled(self):
with patch.object(DruidClusterModelView, "is_enabled", return_value=True):
self.login("admin")
uri = "/druidclustermodelview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_druid_column_disabled(self):
with patch.object(DruidColumnInlineView, "is_enabled", return_value=False):
self.login("admin")
uri = "/druidcolumninlineview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_druid_column_enabled(self):
with patch.object(DruidColumnInlineView, "is_enabled", return_value=True):
self.login("admin")
uri = "/druidcolumninlineview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_druid_datasource_disabled(self):
with patch.object(DruidDatasourceModelView, "is_enabled", return_value=False):
self.login("admin")
uri = "/druiddatasourcemodelview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_druid_datasource_enabled(self):
with patch.object(DruidDatasourceModelView, "is_enabled", return_value=True):
self.login("admin")
uri = "/druiddatasourcemodelview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_druid_metric_disabled(self):
with patch.object(DruidMetricInlineView, "is_enabled", return_value=False):
self.login("admin")
uri = "/druidmetricinlineview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_druid_metric_enabled(self):
with patch.object(DruidMetricInlineView, "is_enabled", return_value=True):
self.login("admin")
uri = "/druidmetricinlineview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
if __name__ == "__main__":
unittest.main()

View File

@ -34,12 +34,7 @@ from tests.integration_tests.fixtures.energy_dashboard import (
from tests.integration_tests.test_app import app
from superset.dashboards.commands.importers.v0 import decode_dashboards
from superset import db, security_manager
from superset.connectors.druid.models import (
DruidColumn,
DruidDatasource,
DruidMetric,
DruidCluster,
)
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.dashboards.commands.importers.v0 import import_chart, import_dashboard
from superset.datasets.commands.importers.v0 import import_dataset
@ -72,15 +67,11 @@ class TestImportExport(SupersetTestCase):
for table in session.query(SqlaTable):
if "remote_id" in table.params_dict:
session.delete(table)
for datasource in session.query(DruidDatasource):
if "remote_id" in datasource.params_dict:
session.delete(datasource)
session.commit()
@classmethod
def setUpClass(cls):
cls.delete_imports()
cls.create_druid_test_objects()
@classmethod
def tearDownClass(cls):
@ -141,25 +132,6 @@ class TestImportExport(SupersetTestCase):
table.metrics.append(SqlMetric(metric_name=metric_name, expression=""))
return table
def create_druid_datasource(self, name, id=0, cols_names=[], metric_names=[]):
cluster_name = "druid_test"
cluster = self.get_or_create(
DruidCluster, {"cluster_name": cluster_name}, db.session
)
params = {"remote_id": id, "database_name": cluster_name}
datasource = DruidDatasource(
id=id,
datasource_name=name,
cluster_id=cluster.id,
params=json.dumps(params),
)
for col_name in cols_names:
datasource.columns.append(DruidColumn(column_name=col_name))
for metric_name in metric_names:
datasource.metrics.append(DruidMetric(metric_name=metric_name, json="{}"))
return datasource
def get_slice(self, slc_id):
return db.session.query(Slice).filter_by(id=slc_id).first()
@ -169,9 +141,6 @@ class TestImportExport(SupersetTestCase):
def get_dash(self, dash_id):
return db.session.query(Dashboard).filter_by(id=dash_id).first()
def get_datasource(self, datasource_id):
return db.session.query(DruidDatasource).filter_by(id=datasource_id).first()
def assert_dash_equals(
self, expected_dash, actual_dash, check_position=True, check_slugs=True
):
@ -704,78 +673,6 @@ class TestImportExport(SupersetTestCase):
self.assertEqual(imported_id, imported_id_copy)
self.assert_table_equals(copy_table, self.get_table_by_id(imported_id))
def test_import_druid_no_metadata(self):
datasource = self.create_druid_datasource("pure_druid", id=10001)
imported_id = import_dataset(datasource, import_time=1989)
imported = self.get_datasource(imported_id)
self.assert_datasource_equals(datasource, imported)
def test_import_druid_1_col_1_met(self):
datasource = self.create_druid_datasource(
"druid_1_col_1_met", id=10002, cols_names=["col1"], metric_names=["metric1"]
)
imported_id = import_dataset(datasource, import_time=1990)
imported = self.get_datasource(imported_id)
self.assert_datasource_equals(datasource, imported)
self.assertEqual(
{"remote_id": 10002, "import_time": 1990, "database_name": "druid_test"},
json.loads(imported.params),
)
def test_import_druid_2_col_2_met(self):
datasource = self.create_druid_datasource(
"druid_2_col_2_met",
id=10003,
cols_names=["c1", "c2"],
metric_names=["m1", "m2"],
)
imported_id = import_dataset(datasource, import_time=1991)
imported = self.get_datasource(imported_id)
self.assert_datasource_equals(datasource, imported)
def test_import_druid_override(self):
datasource = self.create_druid_datasource(
"druid_override", id=10004, cols_names=["col1"], metric_names=["m1"]
)
imported_id = import_dataset(datasource, import_time=1991)
table_over = self.create_druid_datasource(
"druid_override",
id=10004,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported_over_id = import_dataset(table_over, import_time=1992)
imported_over = self.get_datasource(imported_over_id)
self.assertEqual(imported_id, imported_over.id)
expected_datasource = self.create_druid_datasource(
"druid_override",
id=10004,
metric_names=["new_metric1", "m1"],
cols_names=["col1", "new_col1", "col2", "col3"],
)
self.assert_datasource_equals(expected_datasource, imported_over)
def test_import_druid_override_identical(self):
datasource = self.create_druid_datasource(
"copy_cat",
id=10005,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported_id = import_dataset(datasource, import_time=1993)
copy_datasource = self.create_druid_datasource(
"copy_cat",
id=10005,
cols_names=["new_col1", "col2", "col3"],
metric_names=["new_metric1"],
)
imported_id_copy = import_dataset(copy_datasource, import_time=1994)
self.assertEqual(imported_id, imported_id_copy)
self.assert_datasource_equals(copy_datasource, self.get_datasource(imported_id))
if __name__ == "__main__":
unittest.main()

View File

@ -32,7 +32,6 @@ from flask import current_app
from superset.models.dashboard import Dashboard
from superset import app, appbuilder, db, security_manager, viz, ConnectorRegistry
from superset.connectors.druid.models import DruidCluster, DruidDatasource
from superset.connectors.sqla.models import SqlaTable
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetSecurityException
@ -273,93 +272,6 @@ class TestRolePermission(SupersetTestCase):
session.delete(stored_table)
session.commit()
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_set_perm_druid_datasource(self):
self.create_druid_test_objects()
session = db.session
druid_cluster = (
session.query(DruidCluster).filter_by(cluster_name="druid_test").one()
)
datasource = DruidDatasource(
datasource_name="tmp_datasource",
cluster=druid_cluster,
cluster_id=druid_cluster.id,
)
session.add(datasource)
session.commit()
# store without a schema
stored_datasource = (
session.query(DruidDatasource)
.filter_by(datasource_name="tmp_datasource")
.one()
)
self.assertEqual(
stored_datasource.perm,
f"[druid_test].[tmp_datasource](id:{stored_datasource.id})",
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", stored_datasource.perm
)
)
self.assertIsNone(stored_datasource.schema_perm)
# store with a schema
stored_datasource.datasource_name = "tmp_schema.tmp_datasource"
session.commit()
self.assertEqual(
stored_datasource.perm,
f"[druid_test].[tmp_schema.tmp_datasource](id:{stored_datasource.id})",
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", stored_datasource.perm
)
)
self.assertIsNotNone(stored_datasource.schema_perm, "[druid_test].[tmp_schema]")
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"schema_access", stored_datasource.schema_perm
)
)
session.delete(stored_datasource)
session.commit()
def test_set_perm_druid_cluster(self):
session = db.session
cluster = DruidCluster(cluster_name="tmp_druid_cluster")
session.add(cluster)
stored_cluster = (
session.query(DruidCluster)
.filter_by(cluster_name="tmp_druid_cluster")
.one()
)
self.assertEqual(
stored_cluster.perm, f"[tmp_druid_cluster].(id:{stored_cluster.id})"
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", stored_cluster.perm
)
)
stored_cluster.cluster_name = "tmp_druid_cluster2"
session.commit()
self.assertEqual(
stored_cluster.perm, f"[tmp_druid_cluster2].(id:{stored_cluster.id})"
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", stored_cluster.perm
)
)
session.delete(stored_cluster)
session.commit()
def test_set_perm_database(self):
session = db.session
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
@ -390,28 +302,6 @@ class TestRolePermission(SupersetTestCase):
session.delete(stored_db)
session.commit()
def test_hybrid_perm_druid_cluster(self):
cluster = DruidCluster(cluster_name="tmp_druid_cluster3")
db.session.add(cluster)
id_ = (
db.session.query(DruidCluster.id)
.filter_by(cluster_name="tmp_druid_cluster3")
.scalar()
)
record = (
db.session.query(DruidCluster)
.filter_by(perm=f"[tmp_druid_cluster3].(id:{id_})")
.one()
)
self.assertEqual(record.get_perm(), record.perm)
self.assertEqual(record.id, id_)
self.assertEqual(record.cluster_name, "tmp_druid_cluster3")
db.session.delete(cluster)
db.session.commit()
def test_hybrid_perm_database(self):
database = Database(database_name="tmp_database3", sqlalchemy_uri="sqlite://")
@ -706,7 +596,6 @@ class TestRolePermission(SupersetTestCase):
self.assertIn(("all_database_access", "all_database_access"), perm_set)
self.assertIn(("can_override_role_permissions", "Superset"), perm_set)
self.assertIn(("can_sync_druid_source", "Superset"), perm_set)
self.assertIn(("can_override_role_permissions", "Superset"), perm_set)
self.assertIn(("can_approve", "Superset"), perm_set)