From 10f00cdde329916e94897f2d32d04ca2d7745372 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Wed, 31 Jul 2019 09:19:39 -0700 Subject: [PATCH] [fix] Isolate and improve performance on tagging system (#7858) * Fix tag perf * Add ASF header * Make script idempotent * Add CLI to sync tags * Add missing file * Merge heads * Fix lint * Remove script --- superset/cli.py | 12 + superset/common/tags.py | 385 ++++++++++++++++++ ...def97f26fdfb_add_index_to_tagged_object.py | 39 ++ superset/models/core.py | 19 +- 4 files changed, 446 insertions(+), 9 deletions(-) create mode 100644 superset/common/tags.py create mode 100644 superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py diff --git a/superset/cli.py b/superset/cli.py index 65b08bfef..572122086 100755 --- a/superset/cli.py +++ b/superset/cli.py @@ -24,10 +24,12 @@ from sys import stdout import click from colorama import Fore, Style from flask import g +from flask_appbuilder import Model from pathlib2 import Path import yaml from superset import app, appbuilder, db, examples, security_manager +from superset.common.tags import add_favorites, add_owners, add_types from superset.utils import core as utils, dashboard_import_export, dict_import_export config = app.config @@ -497,3 +499,13 @@ def load_test_users_run(): password="general", ) security_manager.get_session.commit() + + +@app.cli.command() +def sync_tags(): + """Rebuilds special tags (owner, type, favorited by).""" + # pylint: disable=no-member + metadata = Model.metadata + add_types(db.engine, metadata) + add_owners(db.engine, metadata) + add_favorites(db.engine, metadata) diff --git a/superset/common/tags.py b/superset/common/tags.py new file mode 100644 index 000000000..657611c60 --- /dev/null +++ b/superset/common/tags.py @@ -0,0 +1,385 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from sqlalchemy.exc import IntegrityError +from sqlalchemy.sql import and_, func, functions, join, literal, select + +from superset.models.tags import ObjectTypes, TagTypes + + +def add_types(engine, metadata): + """ + Tag every object according to its type: + + INSERT INTO tagged_object (tag_id, object_id, object_type) + SELECT + tag.id AS tag_id, + slices.id AS object_id, + 'chart' AS object_type + FROM slices + JOIN tag + ON tag.name = 'type:chart' + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = slices.id + AND tagged_object.object_type = 'chart' + WHERE tagged_object.tag_id IS NULL; + + INSERT INTO tagged_object (tag_id, object_id, object_type) + SELECT + tag.id AS tag_id, + dashboards.id AS object_id, + 'dashboard' AS object_type + FROM dashboards + JOIN tag + ON tag.name = 'type:dashboard' + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = dashboards.id + AND tagged_object.object_type = 'dashboard' + WHERE tagged_object.tag_id IS NULL; + + INSERT INTO tagged_object (tag_id, object_id, object_type) + SELECT + tag.id AS tag_id, + saved_query.id AS object_id, + 'query' AS object_type + FROM saved_query + JOIN tag + ON tag.name = 'type:query'; + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = saved_query.id + AND tagged_object.object_type = 'query' + WHERE tagged_object.tag_id IS NULL; + + """ + + tag = metadata.tables["tag"] + tagged_object = metadata.tables["tagged_object"] + slices = metadata.tables["slices"] + dashboards = metadata.tables["dashboards"] + saved_query = metadata.tables["saved_query"] + columns = ["tag_id", "object_id", "object_type"] + + # add a tag for each object type + insert = tag.insert() + for type_ in ObjectTypes.__members__: + try: + engine.execute(insert, name=f"type:{type_}", type=TagTypes.type) + except IntegrityError: + pass # already exists + + charts = ( + select( + [ + tag.c.id.label("tag_id"), + slices.c.id.label("object_id"), + literal(ObjectTypes.chart.name).label("object_type"), + ] + ) + .select_from( + join( + join(slices, tag, tag.c.name == "type:chart"), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == slices.c.id, + tagged_object.c.object_type == "chart", + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, charts) + engine.execute(query) + + dashboards = ( + select( + [ + tag.c.id.label("tag_id"), + dashboards.c.id.label("object_id"), + literal(ObjectTypes.dashboard.name).label("object_type"), + ] + ) + .select_from( + join( + join(dashboards, tag, tag.c.name == "type:dashboard"), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == dashboards.c.id, + tagged_object.c.object_type == "dashboard", + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, dashboards) + engine.execute(query) + + saved_queries = ( + select( + [ + tag.c.id.label("tag_id"), + saved_query.c.id.label("object_id"), + literal(ObjectTypes.query.name).label("object_type"), + ] + ) + .select_from( + join( + join(saved_query, tag, tag.c.name == "type:query"), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == saved_query.c.id, + tagged_object.c.object_type == "query", + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, saved_queries) + engine.execute(query) + + +def add_owners(engine, metadata): + """ + Tag every object according to its owner: + + INSERT INTO tagged_object (tag_id, object_id, object_type) + SELECT + tag.id AS tag_id, + slices.id AS object_id, + 'chart' AS object_type + FROM slices + JOIN tag + ON tag.name = CONCAT('owner:', slices.created_by_fk) + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = slices.id + AND tagged_object.object_type = 'chart' + WHERE tagged_object.tag_id IS NULL; + + SELECT + tag.id AS tag_id, + dashboards.id AS object_id, + 'dashboard' AS object_type + FROM dashboards + JOIN tag + ON tag.name = CONCAT('owner:', dashboards.created_by_fk) + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = dashboards.id + AND tagged_object.object_type = 'dashboard' + WHERE tagged_object.tag_id IS NULL; + + SELECT + tag.id AS tag_id, + saved_query.id AS object_id, + 'query' AS object_type + FROM saved_query + JOIN tag + ON tag.name = CONCAT('owner:', saved_query.created_by_fk) + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = saved_query.id + AND tagged_object.object_type = 'query' + WHERE tagged_object.tag_id IS NULL; + + """ + + tag = metadata.tables["tag"] + tagged_object = metadata.tables["tagged_object"] + users = metadata.tables["ab_user"] + slices = metadata.tables["slices"] + dashboards = metadata.tables["dashboards"] + saved_query = metadata.tables["saved_query"] + columns = ["tag_id", "object_id", "object_type"] + + # create a custom tag for each user + ids = select([users.c.id]) + insert = tag.insert() + for (id_,) in engine.execute(ids): + try: + engine.execute(insert, name=f"owner:{id_}", type=TagTypes.owner) + except IntegrityError: + pass # already exists + + charts = ( + select( + [ + tag.c.id.label("tag_id"), + slices.c.id.label("object_id"), + literal(ObjectTypes.chart.name).label("object_type"), + ] + ) + .select_from( + join( + join( + slices, + tag, + tag.c.name == functions.concat("owner:", slices.c.created_by_fk), + ), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == slices.c.id, + tagged_object.c.object_type == "chart", + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, charts) + engine.execute(query) + + dashboards = ( + select( + [ + tag.c.id.label("tag_id"), + dashboards.c.id.label("object_id"), + literal(ObjectTypes.dashboard.name).label("object_type"), + ] + ) + .select_from( + join( + join( + dashboards, + tag, + tag.c.name + == functions.concat("owner:", dashboards.c.created_by_fk), + ), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == dashboards.c.id, + tagged_object.c.object_type == "dashboard", + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, dashboards) + engine.execute(query) + + saved_queries = ( + select( + [ + tag.c.id.label("tag_id"), + saved_query.c.id.label("object_id"), + literal(ObjectTypes.query.name).label("object_type"), + ] + ) + .select_from( + join( + join( + saved_query, + tag, + tag.c.name + == functions.concat("owner:", saved_query.c.created_by_fk), + ), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == saved_query.c.id, + tagged_object.c.object_type == "query", + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, saved_queries) + engine.execute(query) + + +def add_favorites(engine, metadata): + """ + Tag every object that was favorited: + + INSERT INTO tagged_object (tag_id, object_id, object_type) + SELECT + tag.id AS tag_id, + favstar.obj_id AS object_id, + LOWER(favstar.class_name) AS object_type + FROM favstar + JOIN tag + ON tag.name = CONCAT('favorited_by:', favstar.user_id) + LEFT OUTER JOIN tagged_object + ON tagged_object.tag_id = tag.id + AND tagged_object.object_id = favstar.obj_id + AND tagged_object.object_type = LOWER(favstar.class_name) + WHERE tagged_object.tag_id IS NULL; + + """ + + tag = metadata.tables["tag"] + tagged_object = metadata.tables["tagged_object"] + users = metadata.tables["ab_user"] + favstar = metadata.tables["favstar"] + columns = ["tag_id", "object_id", "object_type"] + + # create a custom tag for each user + ids = select([users.c.id]) + insert = tag.insert() + for (id_,) in engine.execute(ids): + try: + engine.execute(insert, name=f"favorited_by:{id_}", type=TagTypes.type) + except IntegrityError: + pass # already exists + + favstars = ( + select( + [ + tag.c.id.label("tag_id"), + favstar.c.obj_id.label("object_id"), + func.lower(favstar.c.class_name).label("object_type"), + ] + ) + .select_from( + join( + join( + favstar, + tag, + tag.c.name == functions.concat("favorited_by:", favstar.c.user_id), + ), + tagged_object, + and_( + tagged_object.c.tag_id == tag.c.id, + tagged_object.c.object_id == favstar.c.obj_id, + tagged_object.c.object_type == func.lower(favstar.c.class_name), + ), + isouter=True, + full=False, + ) + ) + .where(tagged_object.c.tag_id.is_(None)) + ) + query = tagged_object.insert().from_select(columns, favstars) + engine.execute(query) diff --git a/superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py b/superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py new file mode 100644 index 000000000..b3af2edba --- /dev/null +++ b/superset/migrations/versions/def97f26fdfb_add_index_to_tagged_object.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Add index to tagged_object + +Revision ID: def97f26fdfb +Revises: d6ffdf31bdd4 +Create Date: 2019-07-11 19:02:38.768324 + +""" + +# revision identifiers, used by Alembic. +revision = "def97f26fdfb" +down_revision = "190188938582" + +from alembic import op + + +def upgrade(): + op.create_index( + op.f("ix_tagged_object_object_id"), "tagged_object", ["object_id"], unique=False + ) + + +def downgrade(): + op.drop_index(op.f("ix_tagged_object_object_id"), table_name="tagged_object") diff --git a/superset/models/core.py b/superset/models/core.py index 5a98d5efa..e3dcafa0a 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -52,7 +52,7 @@ from sqlalchemy.schema import UniqueConstraint from sqlalchemy_utils import EncryptedType import sqlparse -from superset import app, db, db_engine_specs, security_manager +from superset import app, db, db_engine_specs, is_feature_enabled, security_manager from superset.connectors.connector_registry import ConnectorRegistry from superset.legacy import update_time_range from superset.models.helpers import AuditMixinNullable, ImportMixin @@ -1299,11 +1299,12 @@ class DatasourceAccessRequest(Model, AuditMixinNullable): # events for updating tags -sqla.event.listen(Slice, "after_insert", ChartUpdater.after_insert) -sqla.event.listen(Slice, "after_update", ChartUpdater.after_update) -sqla.event.listen(Slice, "after_delete", ChartUpdater.after_delete) -sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert) -sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update) -sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete) -sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert) -sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete) +if is_feature_enabled("TAGGING_SYSTEM"): + sqla.event.listen(Slice, "after_insert", ChartUpdater.after_insert) + sqla.event.listen(Slice, "after_update", ChartUpdater.after_update) + sqla.event.listen(Slice, "after_delete", ChartUpdater.after_delete) + sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert) + sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update) + sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete) + sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert) + sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete)