[fix] Isolate and improve performance on tagging system (#7858)

* Fix tag perf

* Add ASF header

* Make script idempotent

* Add CLI to sync tags

* Add missing file

* Merge heads

* Fix lint

* Remove script
This commit is contained in:
Beto Dealmeida 2019-07-31 09:19:39 -07:00 committed by GitHub
parent 9b7261f101
commit 10f00cdde3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 446 additions and 9 deletions

View File

@ -24,10 +24,12 @@ from sys import stdout
import click
from colorama import Fore, Style
from flask import g
from flask_appbuilder import Model
from pathlib2 import Path
import yaml
from superset import app, appbuilder, db, examples, security_manager
from superset.common.tags import add_favorites, add_owners, add_types
from superset.utils import core as utils, dashboard_import_export, dict_import_export
config = app.config
@ -497,3 +499,13 @@ def load_test_users_run():
password="general",
)
security_manager.get_session.commit()
@app.cli.command()
def sync_tags():
"""Rebuilds special tags (owner, type, favorited by)."""
# pylint: disable=no-member
metadata = Model.metadata
add_types(db.engine, metadata)
add_owners(db.engine, metadata)
add_favorites(db.engine, metadata)

385
superset/common/tags.py Normal file
View File

@ -0,0 +1,385 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import and_, func, functions, join, literal, select
from superset.models.tags import ObjectTypes, TagTypes
def add_types(engine, metadata):
"""
Tag every object according to its type:
INSERT INTO tagged_object (tag_id, object_id, object_type)
SELECT
tag.id AS tag_id,
slices.id AS object_id,
'chart' AS object_type
FROM slices
JOIN tag
ON tag.name = 'type:chart'
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = slices.id
AND tagged_object.object_type = 'chart'
WHERE tagged_object.tag_id IS NULL;
INSERT INTO tagged_object (tag_id, object_id, object_type)
SELECT
tag.id AS tag_id,
dashboards.id AS object_id,
'dashboard' AS object_type
FROM dashboards
JOIN tag
ON tag.name = 'type:dashboard'
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = dashboards.id
AND tagged_object.object_type = 'dashboard'
WHERE tagged_object.tag_id IS NULL;
INSERT INTO tagged_object (tag_id, object_id, object_type)
SELECT
tag.id AS tag_id,
saved_query.id AS object_id,
'query' AS object_type
FROM saved_query
JOIN tag
ON tag.name = 'type:query';
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = saved_query.id
AND tagged_object.object_type = 'query'
WHERE tagged_object.tag_id IS NULL;
"""
tag = metadata.tables["tag"]
tagged_object = metadata.tables["tagged_object"]
slices = metadata.tables["slices"]
dashboards = metadata.tables["dashboards"]
saved_query = metadata.tables["saved_query"]
columns = ["tag_id", "object_id", "object_type"]
# add a tag for each object type
insert = tag.insert()
for type_ in ObjectTypes.__members__:
try:
engine.execute(insert, name=f"type:{type_}", type=TagTypes.type)
except IntegrityError:
pass # already exists
charts = (
select(
[
tag.c.id.label("tag_id"),
slices.c.id.label("object_id"),
literal(ObjectTypes.chart.name).label("object_type"),
]
)
.select_from(
join(
join(slices, tag, tag.c.name == "type:chart"),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == slices.c.id,
tagged_object.c.object_type == "chart",
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, charts)
engine.execute(query)
dashboards = (
select(
[
tag.c.id.label("tag_id"),
dashboards.c.id.label("object_id"),
literal(ObjectTypes.dashboard.name).label("object_type"),
]
)
.select_from(
join(
join(dashboards, tag, tag.c.name == "type:dashboard"),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == dashboards.c.id,
tagged_object.c.object_type == "dashboard",
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, dashboards)
engine.execute(query)
saved_queries = (
select(
[
tag.c.id.label("tag_id"),
saved_query.c.id.label("object_id"),
literal(ObjectTypes.query.name).label("object_type"),
]
)
.select_from(
join(
join(saved_query, tag, tag.c.name == "type:query"),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == saved_query.c.id,
tagged_object.c.object_type == "query",
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, saved_queries)
engine.execute(query)
def add_owners(engine, metadata):
"""
Tag every object according to its owner:
INSERT INTO tagged_object (tag_id, object_id, object_type)
SELECT
tag.id AS tag_id,
slices.id AS object_id,
'chart' AS object_type
FROM slices
JOIN tag
ON tag.name = CONCAT('owner:', slices.created_by_fk)
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = slices.id
AND tagged_object.object_type = 'chart'
WHERE tagged_object.tag_id IS NULL;
SELECT
tag.id AS tag_id,
dashboards.id AS object_id,
'dashboard' AS object_type
FROM dashboards
JOIN tag
ON tag.name = CONCAT('owner:', dashboards.created_by_fk)
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = dashboards.id
AND tagged_object.object_type = 'dashboard'
WHERE tagged_object.tag_id IS NULL;
SELECT
tag.id AS tag_id,
saved_query.id AS object_id,
'query' AS object_type
FROM saved_query
JOIN tag
ON tag.name = CONCAT('owner:', saved_query.created_by_fk)
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = saved_query.id
AND tagged_object.object_type = 'query'
WHERE tagged_object.tag_id IS NULL;
"""
tag = metadata.tables["tag"]
tagged_object = metadata.tables["tagged_object"]
users = metadata.tables["ab_user"]
slices = metadata.tables["slices"]
dashboards = metadata.tables["dashboards"]
saved_query = metadata.tables["saved_query"]
columns = ["tag_id", "object_id", "object_type"]
# create a custom tag for each user
ids = select([users.c.id])
insert = tag.insert()
for (id_,) in engine.execute(ids):
try:
engine.execute(insert, name=f"owner:{id_}", type=TagTypes.owner)
except IntegrityError:
pass # already exists
charts = (
select(
[
tag.c.id.label("tag_id"),
slices.c.id.label("object_id"),
literal(ObjectTypes.chart.name).label("object_type"),
]
)
.select_from(
join(
join(
slices,
tag,
tag.c.name == functions.concat("owner:", slices.c.created_by_fk),
),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == slices.c.id,
tagged_object.c.object_type == "chart",
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, charts)
engine.execute(query)
dashboards = (
select(
[
tag.c.id.label("tag_id"),
dashboards.c.id.label("object_id"),
literal(ObjectTypes.dashboard.name).label("object_type"),
]
)
.select_from(
join(
join(
dashboards,
tag,
tag.c.name
== functions.concat("owner:", dashboards.c.created_by_fk),
),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == dashboards.c.id,
tagged_object.c.object_type == "dashboard",
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, dashboards)
engine.execute(query)
saved_queries = (
select(
[
tag.c.id.label("tag_id"),
saved_query.c.id.label("object_id"),
literal(ObjectTypes.query.name).label("object_type"),
]
)
.select_from(
join(
join(
saved_query,
tag,
tag.c.name
== functions.concat("owner:", saved_query.c.created_by_fk),
),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == saved_query.c.id,
tagged_object.c.object_type == "query",
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, saved_queries)
engine.execute(query)
def add_favorites(engine, metadata):
"""
Tag every object that was favorited:
INSERT INTO tagged_object (tag_id, object_id, object_type)
SELECT
tag.id AS tag_id,
favstar.obj_id AS object_id,
LOWER(favstar.class_name) AS object_type
FROM favstar
JOIN tag
ON tag.name = CONCAT('favorited_by:', favstar.user_id)
LEFT OUTER JOIN tagged_object
ON tagged_object.tag_id = tag.id
AND tagged_object.object_id = favstar.obj_id
AND tagged_object.object_type = LOWER(favstar.class_name)
WHERE tagged_object.tag_id IS NULL;
"""
tag = metadata.tables["tag"]
tagged_object = metadata.tables["tagged_object"]
users = metadata.tables["ab_user"]
favstar = metadata.tables["favstar"]
columns = ["tag_id", "object_id", "object_type"]
# create a custom tag for each user
ids = select([users.c.id])
insert = tag.insert()
for (id_,) in engine.execute(ids):
try:
engine.execute(insert, name=f"favorited_by:{id_}", type=TagTypes.type)
except IntegrityError:
pass # already exists
favstars = (
select(
[
tag.c.id.label("tag_id"),
favstar.c.obj_id.label("object_id"),
func.lower(favstar.c.class_name).label("object_type"),
]
)
.select_from(
join(
join(
favstar,
tag,
tag.c.name == functions.concat("favorited_by:", favstar.c.user_id),
),
tagged_object,
and_(
tagged_object.c.tag_id == tag.c.id,
tagged_object.c.object_id == favstar.c.obj_id,
tagged_object.c.object_type == func.lower(favstar.c.class_name),
),
isouter=True,
full=False,
)
)
.where(tagged_object.c.tag_id.is_(None))
)
query = tagged_object.insert().from_select(columns, favstars)
engine.execute(query)

View File

@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Add index to tagged_object
Revision ID: def97f26fdfb
Revises: d6ffdf31bdd4
Create Date: 2019-07-11 19:02:38.768324
"""
# revision identifiers, used by Alembic.
revision = "def97f26fdfb"
down_revision = "190188938582"
from alembic import op
def upgrade():
op.create_index(
op.f("ix_tagged_object_object_id"), "tagged_object", ["object_id"], unique=False
)
def downgrade():
op.drop_index(op.f("ix_tagged_object_object_id"), table_name="tagged_object")

View File

@ -52,7 +52,7 @@ from sqlalchemy.schema import UniqueConstraint
from sqlalchemy_utils import EncryptedType
import sqlparse
from superset import app, db, db_engine_specs, security_manager
from superset import app, db, db_engine_specs, is_feature_enabled, security_manager
from superset.connectors.connector_registry import ConnectorRegistry
from superset.legacy import update_time_range
from superset.models.helpers import AuditMixinNullable, ImportMixin
@ -1299,11 +1299,12 @@ class DatasourceAccessRequest(Model, AuditMixinNullable):
# events for updating tags
sqla.event.listen(Slice, "after_insert", ChartUpdater.after_insert)
sqla.event.listen(Slice, "after_update", ChartUpdater.after_update)
sqla.event.listen(Slice, "after_delete", ChartUpdater.after_delete)
sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert)
sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update)
sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete)
sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert)
sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete)
if is_feature_enabled("TAGGING_SYSTEM"):
sqla.event.listen(Slice, "after_insert", ChartUpdater.after_insert)
sqla.event.listen(Slice, "after_update", ChartUpdater.after_update)
sqla.event.listen(Slice, "after_delete", ChartUpdater.after_delete)
sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert)
sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update)
sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete)
sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert)
sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete)