Move metadata cache one layer up (#6153)

* Update wording

* nit update for api endpoint url

* move metadata cache one layer up

* refactor cache

* fix flake8 and DatabaseTablesAsync

* nit

* remove logging for cache

* only fetch for all tables that allows cross schema fetch

* default allow_multi_schema_metadata_fetch to False

* address comments

* remove unused defaultdict

* flake 8
This commit is contained in:
Junda Yang 2018-10-31 13:23:26 -07:00 committed by Beto Dealmeida
parent af38d254f0
commit c552c125d7
8 changed files with 216 additions and 168 deletions

View File

@ -351,12 +351,15 @@ def update_datasources_cache():
"""Refresh sqllab datasources cache"""
from superset.models.core import Database
for database in db.session.query(Database).all():
print('Fetching {} datasources ...'.format(database.name))
try:
database.all_table_names(force=True)
database.all_view_names(force=True)
except Exception as e:
print('{}'.format(str(e)))
if database.allow_multi_schema_metadata_fetch:
print('Fetching {} datasources ...'.format(database.name))
try:
database.all_table_names_in_database(
force=True, cache=True, cache_timeout=24 * 60 * 60)
database.all_view_names_in_database(
force=True, cache=True, cache_timeout=24 * 60 * 60)
except Exception as e:
print('{}'.format(str(e)))
@app.cli.command()

View File

@ -187,8 +187,8 @@ IMG_UPLOAD_URL = '/static/uploads/'
# IMG_SIZE = (300, 200, True)
CACHE_DEFAULT_TIMEOUT = 60 * 60 * 24
CACHE_CONFIG = {'CACHE_TYPE': 'null'}
TABLE_NAMES_CACHE_CONFIG = {'CACHE_TYPE': 'null'}
CACHE_CONFIG = {'CACHE_TYPE': 'simple'}
TABLE_NAMES_CACHE_CONFIG = {'CACHE_TYPE': 'simple'}
# CORS Options
ENABLE_CORS = False

View File

@ -12,7 +12,7 @@ at all. The classes here will use a common interface to specify all this.
The general idea is to use static classes and an inheritance scheme.
"""
from collections import defaultdict, namedtuple
from collections import namedtuple
import inspect
import logging
import os
@ -37,7 +37,7 @@ from werkzeug.utils import secure_filename
from superset import app, conf, db, sql_parse
from superset.exceptions import SupersetTemplateException
from superset.utils import cache as cache_util, core as utils
from superset.utils import core as utils
QueryStatus = utils.QueryStatus
config = app.config
@ -228,31 +228,31 @@ class BaseEngineSpec(object):
return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S'))
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:{}'.format(args[0].id, args[1]),
use_tables_cache=True)
def fetch_result_sets(cls, db, datasource_type, force=False):
"""Returns the dictionary {schema : [result_set_name]}.
def fetch_result_sets(cls, db, datasource_type):
"""Returns a list of tables [schema1.table1, schema2.table2, ...]
Datasource_type can be 'table' or 'view'.
Empty schema corresponds to the list of full names of the all
tables or views: <schema>.<result_set_name>.
"""
schemas = db.inspector.get_schema_names()
result_sets = {}
schemas = db.all_schema_names(cache=db.schema_cache_enabled,
cache_timeout=db.schema_cache_timeout,
force=True)
all_result_sets = []
for schema in schemas:
if datasource_type == 'table':
result_sets[schema] = sorted(
db.inspector.get_table_names(schema))
all_datasource_names = db.all_table_names_in_schema(
schema=schema, force=True,
cache=db.table_cache_enabled,
cache_timeout=db.table_cache_timeout)
elif datasource_type == 'view':
result_sets[schema] = sorted(
db.inspector.get_view_names(schema))
all_datasource_names = db.all_view_names_in_schema(
schema=schema, force=True,
cache=db.table_cache_enabled,
cache_timeout=db.table_cache_timeout)
all_result_sets += [
'{}.{}'.format(schema, t) for t in result_sets[schema]]
if all_result_sets:
result_sets[''] = all_result_sets
return result_sets
'{}.{}'.format(schema, t) for t in all_datasource_names]
return all_result_sets
@classmethod
def handle_cursor(cls, cursor, query, session):
@ -294,35 +294,15 @@ class BaseEngineSpec(object):
pass
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:schema_list'.format(kwargs.get('db_id')))
def get_schema_names(cls, inspector, db_id,
enable_cache, cache_timeout, force=False):
"""A function to get all schema names in this db.
:param inspector: URI string
:param db_id: database id
:param enable_cache: whether to enable cache for the function
:param cache_timeout: timeout settings for cache in second.
:param force: force to refresh
:return: a list of schema names
"""
return inspector.get_schema_names()
def get_schema_names(cls, inspector):
return sorted(inspector.get_schema_names())
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{db_id}:schema:{schema}:table_list'.format(
db_id=kwargs.get('db_id'), schema=kwargs.get('schema')))
def get_table_names(cls, inspector, db_id, schema,
enable_cache, cache_timeout, force=False):
def get_table_names(cls, inspector, schema):
return sorted(inspector.get_table_names(schema))
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{db_id}:schema:{schema}:view_list'.format(
db_id=kwargs.get('db_id'), schema=kwargs.get('schema')))
def get_view_names(cls, inspector, db_id, schema,
enable_cache, cache_timeout, force=False):
def get_view_names(cls, inspector, schema):
return sorted(inspector.get_view_names(schema))
@classmethod
@ -448,11 +428,7 @@ class PostgresEngineSpec(PostgresBaseEngineSpec):
engine = 'postgresql'
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{db_id}:schema:{schema}:table_list'.format(
db_id=kwargs.get('db_id'), schema=kwargs.get('schema')))
def get_table_names(cls, inspector, db_id, schema,
enable_cache, cache_timeout, force=False):
def get_table_names(cls, inspector, schema):
"""Need to consider foreign tables for PostgreSQL"""
tables = inspector.get_table_names(schema)
tables.extend(inspector.get_foreign_table_names(schema))
@ -583,23 +559,25 @@ class SqliteEngineSpec(BaseEngineSpec):
return "datetime({col}, 'unixepoch')"
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:{}'.format(args[0].id, args[1]),
use_tables_cache=True)
def fetch_result_sets(cls, db, datasource_type, force=False):
schemas = db.inspector.get_schema_names()
result_sets = {}
def fetch_result_sets(cls, db, datasource_type):
schemas = db.all_schema_names(cache=db.schema_cache_enabled,
cache_timeout=db.schema_cache_timeout,
force=True)
all_result_sets = []
schema = schemas[0]
if datasource_type == 'table':
result_sets[schema] = sorted(db.inspector.get_table_names())
all_datasource_names = db.all_table_names_in_schema(
schema=schema, force=True,
cache=db.table_cache_enabled,
cache_timeout=db.table_cache_timeout)
elif datasource_type == 'view':
result_sets[schema] = sorted(db.inspector.get_view_names())
all_datasource_names = db.all_view_names_in_schema(
schema=schema, force=True,
cache=db.table_cache_enabled,
cache_timeout=db.table_cache_timeout)
all_result_sets += [
'{}.{}'.format(schema, t) for t in result_sets[schema]]
if all_result_sets:
result_sets[''] = all_result_sets
return result_sets
'{}.{}'.format(schema, t) for t in all_datasource_names]
return all_result_sets
@classmethod
def convert_dttm(cls, target_type, dttm):
@ -609,11 +587,7 @@ class SqliteEngineSpec(BaseEngineSpec):
return "'{}'".format(iso)
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{db_id}:schema:{schema}:table_list'.format(
db_id=kwargs.get('db_id'), schema=kwargs.get('schema')))
def get_table_names(cls, inspector, db_id, schema,
enable_cache, cache_timeout, force=False):
def get_table_names(cls, inspector, schema):
"""Need to disregard the schema for Sqlite"""
return sorted(inspector.get_table_names())
@ -737,11 +711,8 @@ class PrestoEngineSpec(BaseEngineSpec):
return 'from_unixtime({col})'
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:{}'.format(args[0].id, args[1]),
use_tables_cache=True)
def fetch_result_sets(cls, db, datasource_type, force=False):
"""Returns the dictionary {schema : [result_set_name]}.
def fetch_result_sets(cls, db, datasource_type):
"""Returns a list of tables [schema1.table1, schema2.table2, ...]
Datasource_type can be 'table' or 'view'.
Empty schema corresponds to the list of full names of the all
@ -753,10 +724,9 @@ class PrestoEngineSpec(BaseEngineSpec):
datasource_type.upper(),
),
None)
result_sets = defaultdict(list)
result_sets = []
for unused, row in result_set_df.iterrows():
result_sets[row['table_schema']].append(row['table_name'])
result_sets[''].append('{}.{}'.format(
result_sets.append('{}.{}'.format(
row['table_schema'], row['table_name']))
return result_sets
@ -1018,12 +988,9 @@ class HiveEngineSpec(PrestoEngineSpec):
hive.Cursor.fetch_logs = patched_hive.fetch_logs
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:{}'.format(args[0].id, args[1]),
use_tables_cache=True)
def fetch_result_sets(cls, db, datasource_type, force=False):
def fetch_result_sets(cls, db, datasource_type):
return BaseEngineSpec.fetch_result_sets(
db, datasource_type, force=force)
db, datasource_type)
@classmethod
def fetch_data(cls, cursor, limit):
@ -1497,10 +1464,7 @@ class ImpalaEngineSpec(BaseEngineSpec):
return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S'))
@classmethod
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:schema_list'.format(kwargs.get('db_id')))
def get_schema_names(cls, inspector, db_id,
enable_cache, cache_timeout, force=False):
def get_schema_names(cls, inspector):
schemas = [row[0] for row in inspector.engine.execute('SHOW SCHEMAS')
if not row[0].startswith('_')]
return schemas

View File

@ -33,7 +33,10 @@ from superset.connectors.connector_registry import ConnectorRegistry
from superset.legacy import update_time_range
from superset.models.helpers import AuditMixinNullable, ImportMixin
from superset.models.user_attributes import UserAttribute
from superset.utils import core as utils
from superset.utils import (
cache as cache_util,
core as utils,
)
from superset.viz import viz_types
from urllib import parse # noqa
@ -633,7 +636,7 @@ class Database(Model, AuditMixinNullable, ImportMixin):
allow_ctas = Column(Boolean, default=False)
allow_dml = Column(Boolean, default=False)
force_ctas_schema = Column(String(250))
allow_multi_schema_metadata_fetch = Column(Boolean, default=True)
allow_multi_schema_metadata_fetch = Column(Boolean, default=False)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
@ -684,6 +687,26 @@ class Database(Model, AuditMixinNullable, ImportMixin):
url = make_url(self.sqlalchemy_uri_decrypted)
return url.get_backend_name()
@property
def metadata_cache_timeout(self):
return self.get_extra().get('metadata_cache_timeout', {})
@property
def schema_cache_enabled(self):
return 'schema_cache_timeout' in self.metadata_cache_timeout
@property
def schema_cache_timeout(self):
return self.metadata_cache_timeout.get('schema_cache_timeout')
@property
def table_cache_enabled(self):
return 'table_cache_timeout' in self.metadata_cache_timeout
@property
def table_cache_timeout(self):
return self.metadata_cache_timeout.get('table_cache_timeout')
@classmethod
def get_password_masked_url_from_uri(cls, uri):
url = make_url(uri)
@ -838,61 +861,105 @@ class Database(Model, AuditMixinNullable, ImportMixin):
engine = self.get_sqla_engine()
return sqla.inspect(engine)
def all_table_names(self, schema=None, force=False):
if not schema:
if not self.allow_multi_schema_metadata_fetch:
return []
tables_dict = self.db_engine_spec.fetch_result_sets(
self, 'table', force=force)
return tables_dict.get('', [])
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:schema:None:table_list',
attribute_in_key='id')
def all_table_names_in_database(self, cache=False,
cache_timeout=None, force=False):
"""Parameters need to be passed as keyword arguments."""
if not self.allow_multi_schema_metadata_fetch:
return []
return self.db_engine_spec.fetch_result_sets(self, 'table')
extra = self.get_extra()
medatada_cache_timeout = extra.get('metadata_cache_timeout', {})
table_cache_timeout = medatada_cache_timeout.get('table_cache_timeout')
enable_cache = 'table_cache_timeout' in medatada_cache_timeout
return sorted(self.db_engine_spec.get_table_names(
inspector=self.inspector,
db_id=self.id,
schema=schema,
enable_cache=enable_cache,
cache_timeout=table_cache_timeout,
force=force))
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:schema:None:view_list',
attribute_in_key='id')
def all_view_names_in_database(self, cache=False,
cache_timeout=None, force=False):
"""Parameters need to be passed as keyword arguments."""
if not self.allow_multi_schema_metadata_fetch:
return []
return self.db_engine_spec.fetch_result_sets(self, 'view')
def all_view_names(self, schema=None, force=False):
if not schema:
if not self.allow_multi_schema_metadata_fetch:
return []
views_dict = self.db_engine_spec.fetch_result_sets(
self, 'view', force=force)
return views_dict.get('', [])
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{{}}:schema:{}:table_list'.format(
kwargs.get('schema')),
attribute_in_key='id')
def all_table_names_in_schema(self, schema, cache=False,
cache_timeout=None, force=False):
"""Parameters need to be passed as keyword arguments.
For unused parameters, they are referenced in
cache_util.memoized_func decorator.
:param schema: schema name
:type schema: str
:param cache: whether cache is enabled for the function
:type cache: bool
:param cache_timeout: timeout in seconds for the cache
:type cache_timeout: int
:param force: whether to force refresh the cache
:type force: bool
:return: table list
:rtype: list
"""
tables = []
try:
tables = self.db_engine_spec.get_table_names(
inspector=self.inspector, schema=schema)
except Exception as e:
logging.exception(e)
return tables
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{{}}:schema:{}:table_list'.format(
kwargs.get('schema')),
attribute_in_key='id')
def all_view_names_in_schema(self, schema, cache=False,
cache_timeout=None, force=False):
"""Parameters need to be passed as keyword arguments.
For unused parameters, they are referenced in
cache_util.memoized_func decorator.
:param schema: schema name
:type schema: str
:param cache: whether cache is enabled for the function
:type cache: bool
:param cache_timeout: timeout in seconds for the cache
:type cache_timeout: int
:param force: whether to force refresh the cache
:type force: bool
:return: view list
:rtype: list
"""
views = []
try:
extra = self.get_extra()
medatada_cache_timeout = extra.get('metadata_cache_timeout', {})
table_cache_timeout = medatada_cache_timeout.get('table_cache_timeout')
enable_cache = 'table_cache_timeout' in medatada_cache_timeout
views = self.db_engine_spec.get_view_names(
inspector=self.inspector,
db_id=self.id,
schema=schema,
enable_cache=enable_cache,
cache_timeout=table_cache_timeout,
force=force)
except Exception:
pass
inspector=self.inspector, schema=schema)
except Exception as e:
logging.exception(e)
return views
def all_schema_names(self, force_refresh=False):
extra = self.get_extra()
medatada_cache_timeout = extra.get('metadata_cache_timeout', {})
schema_cache_timeout = medatada_cache_timeout.get('schema_cache_timeout')
enable_cache = 'schema_cache_timeout' in medatada_cache_timeout
return sorted(self.db_engine_spec.get_schema_names(
inspector=self.inspector,
enable_cache=enable_cache,
cache_timeout=schema_cache_timeout,
db_id=self.id,
force=force_refresh))
@cache_util.memoized_func(
key=lambda *args, **kwargs: 'db:{}:schema_list',
attribute_in_key='id')
def all_schema_names(self, cache=False, cache_timeout=None, force=False):
"""Parameters need to be passed as keyword arguments.
For unused parameters, they are referenced in
cache_util.memoized_func decorator.
:param cache: whether cache is enabled for the function
:type cache: bool
:param cache_timeout: timeout in seconds for the cache
:type cache_timeout: int
:param force: whether to force refresh the cache
:type force: bool
:return: schema list
:rtype: list
"""
return self.db_engine_spec.get_schema_names(self.inspector)
@property
def db_engine_spec(self):

View File

@ -18,7 +18,7 @@
function update_schemas_allowed_for_csv_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schema_access_for_csv_upload",
url: "/superset/schemas_access_for_csv_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"

View File

@ -1,7 +1,7 @@
# pylint: disable=C,R,W
from flask import request
from superset import cache, tables_cache
from superset import tables_cache
def view_cache_key(*unused_args, **unused_kwargs):
@ -9,7 +9,7 @@ def view_cache_key(*unused_args, **unused_kwargs):
return 'view/{}/{}'.format(request.path, args_hash)
def memoized_func(key=view_cache_key, use_tables_cache=False):
def memoized_func(key=view_cache_key, attribute_in_key=None):
"""Use this decorator to cache functions that have predefined first arg.
enable_cache is treated as True by default,
@ -26,28 +26,26 @@ def memoized_func(key=view_cache_key, use_tables_cache=False):
returns the caching key.
"""
def wrap(f):
selected_cache = None
if use_tables_cache and tables_cache:
selected_cache = tables_cache
elif cache:
selected_cache = cache
if tables_cache:
def wrapped_f(self, *args, **kwargs):
if not kwargs.get('cache', True):
return f(self, *args, **kwargs)
if selected_cache:
def wrapped_f(cls, *args, **kwargs):
if not kwargs.get('enable_cache', True):
return f(cls, *args, **kwargs)
cache_key = key(*args, **kwargs)
o = selected_cache.get(cache_key)
if attribute_in_key:
cache_key = key(*args, **kwargs).format(
getattr(self, attribute_in_key))
else:
cache_key = key(*args, **kwargs)
o = tables_cache.get(cache_key)
if not kwargs.get('force') and o is not None:
return o
o = f(cls, *args, **kwargs)
selected_cache.set(cache_key, o,
timeout=kwargs.get('cache_timeout', 600))
o = f(self, *args, **kwargs)
tables_cache.set(cache_key, o,
timeout=kwargs.get('cache_timeout'))
return o
else:
# noop
def wrapped_f(cls, *args, **kwargs):
return f(cls, *args, **kwargs)
def wrapped_f(self, *args, **kwargs):
return f(self, *args, **kwargs)
return wrapped_f
return wrap

View File

@ -212,7 +212,8 @@ class DatabaseView(SupersetModelView, DeleteMixin, YamlExportMixin): # noqa
'A timeout of 0 indicates that the cache never expires.<br/>'
'3. The ``schemas_allowed_for_csv_upload`` is a comma separated list '
'of schemas that CSVs are allowed to upload to. '
'Specify it as **"schemas_allowed": ["public", "csv_upload"]**. '
'Specify it as **"schemas_allowed_for_csv_upload": '
'["public", "csv_upload"]**. '
'If database flavor does not support schema or any schema is allowed '
'to be accessed, just leave the list empty', True),
'impersonate_user': _(
@ -257,7 +258,7 @@ class DatabaseView(SupersetModelView, DeleteMixin, YamlExportMixin): # noqa
db.set_sqlalchemy_uri(db.sqlalchemy_uri)
security_manager.merge_perm('database_access', db.perm)
# adding a new database we always want to force refresh schema list
for schema in db.all_schema_names(force_refresh=True):
for schema in db.all_schema_names():
security_manager.merge_perm(
'schema_access', security_manager.get_schema_perm(db, schema))
@ -397,7 +398,7 @@ appbuilder.add_view_no_menu(CsvToDatabaseView)
class DatabaseTablesAsync(DatabaseView):
list_columns = ['id', 'all_table_names', 'all_schema_names']
list_columns = ['id', 'all_table_names_in_database', 'all_schema_names']
appbuilder.add_view_no_menu(DatabaseTablesAsync)
@ -1551,7 +1552,9 @@ class Superset(BaseSupersetView):
.filter_by(id=db_id)
.one()
)
schemas = database.all_schema_names(force_refresh=force_refresh)
schemas = database.all_schema_names(cache=database.schema_cache_enabled,
cache_timeout=database.schema_cache_timeout,
force=force_refresh)
schemas = security_manager.schemas_accessible_by_user(database, schemas)
return Response(
json.dumps({'schemas': schemas}),
@ -1568,10 +1571,23 @@ class Superset(BaseSupersetView):
schema = utils.js_string_to_python(schema)
substr = utils.js_string_to_python(substr)
database = db.session.query(models.Database).filter_by(id=db_id).one()
table_names = security_manager.accessible_by_user(
database, database.all_table_names(schema, force_refresh), schema)
view_names = security_manager.accessible_by_user(
database, database.all_view_names(schema, force_refresh), schema)
if schema:
table_names = database.all_table_names_in_schema(
schema=schema, force=force_refresh,
cache=database.table_cache_enabled,
cache_timeout=database.table_cache_timeout)
view_names = database.all_view_names_in_schema(
schema=schema, force=force_refresh,
cache=database.table_cache_enabled,
cache_timeout=database.table_cache_timeout)
else:
table_names = database.all_table_names_in_database(
cache=True, force=False, cache_timeout=24 * 60 * 60)
view_names = database.all_view_names_in_database(
cache=True, force=False, cache_timeout=24 * 60 * 60)
table_names = security_manager.accessible_by_user(database, table_names, schema)
view_names = security_manager.accessible_by_user(database, view_names, schema)
if substr:
table_names = [tn for tn in table_names if substr in tn]
@ -2840,7 +2856,7 @@ class Superset(BaseSupersetView):
@api
@has_access_api
@expose('/schema_access_for_csv_upload')
@expose('/schemas_access_for_csv_upload')
def schemas_access_for_csv_upload(self):
"""
This method exposes an API endpoint to

View File

@ -715,7 +715,7 @@ class CoreTests(SupersetTestCase):
id=db_id,
extra=extra)
data = self.get_json_resp(
url='/superset/schema_access_for_csv_upload?db_id={db_id}'
url='/superset/schemas_access_for_csv_upload?db_id={db_id}'
.format(db_id=dbobj.id))
assert data == ['this_schema_is_allowed_too']