diff --git a/docs/installation.rst b/docs/installation.rst index 7bc9f61f3..a06da876d 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -479,6 +479,26 @@ into your global default defined in ``CACHE_CONFIG``. 'CACHE_REDIS_URL': 'redis://localhost:6379/0', } +Superset has a Celery task that will periodically warm up the cache based on +different strategies. To use it, add the following to the `CELERYBEAT_SCHEDULE` +section in `config.py`: + +.. code-block:: python + + CELERYBEAT_SCHEDULE = { + 'cache-warmup-hourly': { + 'task': 'cache-warmup', + 'schedule': crontab(minute=0, hour='*'), # hourly + 'kwargs': { + 'strategy_name': 'top_n_dashboards', + 'top_n': 5, + 'since': '7 days ago', + }, + }, + } + +This will cache all the charts in the top 5 most popular dashboards every hour. +For other strategies, check the `superset/tasks/cache.py` file. Deeper SQLAlchemy integration diff --git a/superset/assets/package-lock.json b/superset/assets/package-lock.json index 8eeca9bde..110e53ac3 100644 --- a/superset/assets/package-lock.json +++ b/superset/assets/package-lock.json @@ -8684,8 +8684,7 @@ "ansi-regex": { "version": "2.1.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "aproba": { "version": "1.2.0", @@ -8708,14 +8707,12 @@ "balanced-match": { "version": "1.0.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "brace-expansion": { "version": "1.1.11", "bundled": true, "dev": true, - "optional": true, "requires": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -8731,20 +8728,17 @@ "code-point-at": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "concat-map": { "version": "0.0.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "console-control-strings": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "core-util-is": { "version": "1.0.2", @@ -8874,8 +8868,7 @@ "inherits": { "version": "2.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "ini": { "version": "1.3.5", @@ -8888,7 +8881,6 @@ "version": "1.0.0", "bundled": true, "dev": true, - "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -8904,7 +8896,6 @@ "version": "3.0.4", "bundled": true, "dev": true, - "optional": true, "requires": { "brace-expansion": "^1.1.7" } @@ -8912,14 +8903,12 @@ "minimist": { "version": "0.0.8", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "minipass": { "version": "2.2.4", "bundled": true, "dev": true, - "optional": true, "requires": { "safe-buffer": "^5.1.1", "yallist": "^3.0.0" @@ -8939,7 +8928,6 @@ "version": "0.5.1", "bundled": true, "dev": true, - "optional": true, "requires": { "minimist": "0.0.8" } @@ -9027,8 +9015,7 @@ "number-is-nan": { "version": "1.0.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "object-assign": { "version": "4.1.1", @@ -9041,7 +9028,6 @@ "version": "1.4.0", "bundled": true, "dev": true, - "optional": true, "requires": { "wrappy": "1" } @@ -9136,8 +9122,7 @@ "safe-buffer": { "version": "5.1.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "safer-buffer": { "version": "2.1.2", @@ -9178,7 +9163,6 @@ "version": "1.0.2", "bundled": true, "dev": true, - "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -9199,7 +9183,6 @@ "version": "3.0.1", "bundled": true, "dev": true, - "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -9247,14 +9230,12 @@ "wrappy": { "version": "1.0.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "yallist": { "version": "3.0.2", "bundled": true, - "dev": true, - "optional": true + "dev": true } } }, diff --git a/superset/tasks/__init__.py b/superset/tasks/__init__.py index 58702ac4c..c2be5085f 100644 --- a/superset/tasks/__init__.py +++ b/superset/tasks/__init__.py @@ -16,3 +16,4 @@ # specific language governing permissions and limitations # under the License. from . import schedules # noqa +from . import cache # noqa diff --git a/superset/tasks/cache.py b/superset/tasks/cache.py new file mode 100644 index 000000000..831bb6642 --- /dev/null +++ b/superset/tasks/cache.py @@ -0,0 +1,316 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=too-few-public-methods + +import json +import logging +import urllib.parse + +from celery.utils.log import get_task_logger +from flask import url_for +import requests +from requests.exceptions import RequestException +from sqlalchemy import and_, func + +from superset import app, db +from superset.models.core import Dashboard, Log, Slice +from superset.models.tags import Tag, TaggedObject +from superset.tasks.celery_app import app as celery_app +from superset.utils.core import parse_human_datetime + + +logger = get_task_logger(__name__) +logger.setLevel(logging.INFO) + + +def get_form_data(chart_id, dashboard=None): + """ + Build `form_data` for chart GET request from dashboard's `default_filters`. + + When a dashboard has `default_filters` they need to be added as extra + filters in the GET request for charts. + + """ + form_data = {'slice_id': chart_id} + + if dashboard is None or not dashboard.json_metadata: + return form_data + + json_metadata = json.loads(dashboard.json_metadata) + + # do not apply filters if chart is immune to them + if chart_id in json_metadata.get('filter_immune_slices', []): + return form_data + + default_filters = json.loads(json_metadata.get('default_filters', 'null')) + if not default_filters: + return form_data + + # are some of the fields in the chart immune to filters? + filter_immune_slice_fields = json_metadata.get('filter_immune_slice_fields', {}) + immune_fields = filter_immune_slice_fields.get(str(chart_id), []) + + extra_filters = [] + for filters in default_filters.values(): + for col, val in filters.items(): + if col not in immune_fields: + extra_filters.append({'col': col, 'op': 'in', 'val': val}) + if extra_filters: + form_data['extra_filters'] = extra_filters + + return form_data + + +def get_url(params): + """Return external URL for warming up a given chart/table cache.""" + baseurl = 'http://{SUPERSET_WEBSERVER_ADDRESS}:{SUPERSET_WEBSERVER_PORT}/'.format( + **app.config) + with app.test_request_context(): + return urllib.parse.urljoin( + baseurl, + url_for('Superset.explore_json', **params), + ) + + +class Strategy: + """ + A cache warm up strategy. + + Each strategy defines a `get_urls` method that returns a list of URLs to + be fetched from the `/superset/warm_up_cache/` endpoint. + + Strategies can be configured in `superset/config.py`: + + CELERYBEAT_SCHEDULE = { + 'cache-warmup-hourly': { + 'task': 'cache-warmup', + 'schedule': crontab(minute=1, hour='*'), # @hourly + 'kwargs': { + 'strategy_name': 'top_n_dashboards', + 'top_n': 10, + 'since': '7 days ago', + }, + }, + } + + """ + def __init__(self): + pass + + def get_urls(self): + raise NotImplementedError('Subclasses must implement get_urls!') + + +class DummyStrategy(Strategy): + """ + Warm up all charts. + + This is a dummy strategy that will fetch all charts. Can be configured by: + + CELERYBEAT_SCHEDULE = { + 'cache-warmup-hourly': { + 'task': 'cache-warmup', + 'schedule': crontab(minute=1, hour='*'), # @hourly + 'kwargs': {'strategy_name': 'dummy'}, + }, + } + + """ + + name = 'dummy' + + def get_urls(self): + session = db.create_scoped_session() + charts = session.query(Slice).all() + + return [get_url({'form_data': get_form_data(chart.id)}) for chart in charts] + + +class TopNDashboardsStrategy(Strategy): + """ + Warm up charts in the top-n dashboards. + + CELERYBEAT_SCHEDULE = { + 'cache-warmup-hourly': { + 'task': 'cache-warmup', + 'schedule': crontab(minute=1, hour='*'), # @hourly + 'kwargs': { + 'strategy_name': 'top_n_dashboards', + 'top_n': 5, + 'since': '7 days ago', + }, + }, + } + + """ + + name = 'top_n_dashboards' + + def __init__(self, top_n=5, since='7 days ago'): + super(TopNDashboardsStrategy, self).__init__() + self.top_n = top_n + self.since = parse_human_datetime(since) + + def get_urls(self): + urls = [] + session = db.create_scoped_session() + + records = ( + session + .query(Log.dashboard_id, func.count(Log.dashboard_id)) + .filter(and_( + Log.dashboard_id.isnot(None), + Log.dttm >= self.since, + )) + .group_by(Log.dashboard_id) + .order_by(func.count(Log.dashboard_id).desc()) + .limit(self.top_n) + .all() + ) + dash_ids = [record.dashboard_id for record in records] + dashboards = ( + session + .query(Dashboard) + .filter(Dashboard.id.in_(dash_ids)) + .all() + ) + for dashboard in dashboards: + for chart in dashboard.slices: + urls.append( + get_url({'form_data': get_form_data(chart.id, dashboard)})) + + return urls + + +class DashboardTagsStrategy(Strategy): + """ + Warm up charts in dashboards with custom tags. + + CELERYBEAT_SCHEDULE = { + 'cache-warmup-hourly': { + 'task': 'cache-warmup', + 'schedule': crontab(minute=1, hour='*'), # @hourly + 'kwargs': { + 'strategy_name': 'dashboard_tags', + 'tags': ['core', 'warmup'], + }, + }, + } + """ + + name = 'dashboard_tags' + + def __init__(self, tags=None): + super(DashboardTagsStrategy, self).__init__() + self.tags = tags or [] + + def get_urls(self): + urls = [] + session = db.create_scoped_session() + + tags = ( + session + .query(Tag) + .filter(Tag.name.in_(self.tags)) + .all() + ) + tag_ids = [tag.id for tag in tags] + + # add dashboards that are tagged + tagged_objects = ( + session + .query(TaggedObject) + .filter(and_( + TaggedObject.object_type == 'dashboard', + TaggedObject.tag_id.in_(tag_ids), + )) + .all() + ) + dash_ids = [tagged_object.object_id for tagged_object in tagged_objects] + tagged_dashboards = ( + session + .query(Dashboard) + .filter(Dashboard.id.in_(dash_ids)) + ) + for dashboard in tagged_dashboards: + for chart in dashboard.slices: + urls.append( + get_url({'form_data': get_form_data(chart.id, dashboard)})) + + # add charts that are tagged + tagged_objects = ( + session + .query(TaggedObject) + .filter(and_( + TaggedObject.object_type == 'chart', + TaggedObject.tag_id.in_(tag_ids), + )) + .all() + ) + chart_ids = [tagged_object.object_id for tagged_object in tagged_objects] + tagged_charts = ( + session + .query(Slice) + .filter(Slice.id.in_(chart_ids)) + ) + for chart in tagged_charts: + urls.append(get_url({'form_data': get_form_data(chart.id)})) + + return urls + + +strategies = [DummyStrategy, TopNDashboardsStrategy, DashboardTagsStrategy] + + +@celery_app.task(name='cache-warmup') +def cache_warmup(strategy_name, *args, **kwargs): + """ + Warm up cache. + + This task periodically hits charts to warm up the cache. + + """ + logger.info('Loading strategy') + class_ = None + for class_ in strategies: + if class_.name == strategy_name: + break + else: + message = f'No strategy {strategy_name} found!' + logger.error(message) + return message + + logger.info(f'Loading {class_.__name__}') + try: + strategy = class_(*args, **kwargs) + logger.info('Success!') + except TypeError: + message = 'Error loading strategy!' + logger.exception(message) + return message + + results = {'success': [], 'errors': []} + for url in strategy.get_urls(): + try: + logger.info(f'Fetching {url}') + requests.get(url) + results['success'].append(url) + except RequestException: + logger.exception('Error warming up cache!') + results['errors'].append(url) + + return results diff --git a/tests/strategy_tests.py b/tests/strategy_tests.py new file mode 100644 index 000000000..204f9547c --- /dev/null +++ b/tests/strategy_tests.py @@ -0,0 +1,236 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Unit tests for Superset cache warmup""" +import json +from unittest.mock import MagicMock + +from superset import db +from superset.models.core import Log +from superset.models.tags import ( + get_tag, + ObjectTypes, + TaggedObject, + TagTypes, +) +from superset.tasks.cache import ( + DashboardTagsStrategy, + DummyStrategy, + get_form_data, + TopNDashboardsStrategy, +) +from .base_tests import SupersetTestCase + + +TEST_URL = 'http://0.0.0.0:8081/superset/explore_json' + + +class CacheWarmUpTests(SupersetTestCase): + + def __init__(self, *args, **kwargs): + super(CacheWarmUpTests, self).__init__(*args, **kwargs) + + def test_get_form_data_chart_only(self): + chart_id = 1 + result = get_form_data(chart_id, None) + expected = {'slice_id': chart_id} + self.assertEqual(result, expected) + + def test_get_form_data_no_dashboard_metadata(self): + chart_id = 1 + dashboard = MagicMock() + dashboard.json_metadata = None + result = get_form_data(chart_id, dashboard) + expected = {'slice_id': chart_id} + self.assertEqual(result, expected) + + def test_get_form_data_immune_slice(self): + chart_id = 1 + filter_box_id = 2 + dashboard = MagicMock() + dashboard.json_metadata = json.dumps({ + 'filter_immune_slices': [chart_id], + 'default_filters': json.dumps({ + str(filter_box_id): {'name': ['Alice', 'Bob']}, + }), + }) + result = get_form_data(chart_id, dashboard) + expected = {'slice_id': chart_id} + self.assertEqual(result, expected) + + def test_get_form_data_no_default_filters(self): + chart_id = 1 + dashboard = MagicMock() + dashboard.json_metadata = json.dumps({}) + result = get_form_data(chart_id, dashboard) + expected = {'slice_id': chart_id} + self.assertEqual(result, expected) + + def test_get_form_data_immune_fields(self): + chart_id = 1 + filter_box_id = 2 + dashboard = MagicMock() + dashboard.json_metadata = json.dumps({ + 'default_filters': json.dumps({ + str(filter_box_id): { + 'name': ['Alice', 'Bob'], + '__time_range': '100 years ago : today', + }, + }), + 'filter_immune_slice_fields': {chart_id: ['__time_range']}, + }) + result = get_form_data(chart_id, dashboard) + expected = { + 'slice_id': chart_id, + 'extra_filters': [ + { + 'col': 'name', + 'op': 'in', + 'val': ['Alice', 'Bob'], + }, + ], + } + self.assertEqual(result, expected) + + def test_get_form_data_no_extra_filters(self): + chart_id = 1 + filter_box_id = 2 + dashboard = MagicMock() + dashboard.json_metadata = json.dumps({ + 'default_filters': json.dumps({ + str(filter_box_id): { + '__time_range': '100 years ago : today', + }, + }), + 'filter_immune_slice_fields': {chart_id: ['__time_range']}, + }) + result = get_form_data(chart_id, dashboard) + expected = {'slice_id': chart_id} + self.assertEqual(result, expected) + + def test_get_form_data(self): + chart_id = 1 + filter_box_id = 2 + dashboard = MagicMock() + dashboard.json_metadata = json.dumps({ + 'default_filters': json.dumps({ + str(filter_box_id): { + 'name': ['Alice', 'Bob'], + '__time_range': '100 years ago : today', + }, + }), + }) + result = get_form_data(chart_id, dashboard) + expected = { + 'slice_id': chart_id, + 'extra_filters': [ + { + 'col': 'name', + 'op': 'in', + 'val': ['Alice', 'Bob'], + }, + { + 'col': '__time_range', + 'op': 'in', + 'val': '100 years ago : today', + }, + ], + } + self.assertEqual(result, expected) + + def test_dummy_strategy(self): + strategy = DummyStrategy() + result = sorted(strategy.get_urls()) + expected = [ + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+1%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+17%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+18%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+19%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+30%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+31%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+8%7D', + ] + self.assertEqual(result, expected) + + def test_top_n_dashboards_strategy(self): + # create a top visited dashboard + db.session.query(Log).delete() + self.login(username='admin') + for _ in range(10): + self.client.get('/superset/dashboard/3/') + + strategy = TopNDashboardsStrategy(1) + result = sorted(strategy.get_urls()) + expected = [ + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+31%7D', + ] + self.assertEqual(result, expected) + + def test_dashboard_tags(self): + strategy = DashboardTagsStrategy(['tag1']) + + result = sorted(strategy.get_urls()) + expected = [] + self.assertEqual(result, expected) + + # tag dashboard 3 with `tag1` + tag1 = get_tag('tag1', db.session, TagTypes.custom) + object_id = 3 + tagged_object = TaggedObject( + tag_id=tag1.id, + object_id=object_id, + object_type=ObjectTypes.dashboard, + ) + db.session.add(tagged_object) + db.session.commit() + + result = sorted(strategy.get_urls()) + expected = [ + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+31%7D', + ] + self.assertEqual(result, expected) + + strategy = DashboardTagsStrategy(['tag2']) + + result = sorted(strategy.get_urls()) + expected = [] + self.assertEqual(result, expected) + + # tag chart 30 with `tag2` + tag2 = get_tag('tag2', db.session, TagTypes.custom) + object_id = 30 + tagged_object = TaggedObject( + tag_id=tag2.id, + object_id=object_id, + object_type=ObjectTypes.chart, + ) + db.session.add(tagged_object) + db.session.commit() + + result = sorted(strategy.get_urls()) + expected = [ + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+30%7D', + ] + self.assertEqual(result, expected) + + strategy = DashboardTagsStrategy(['tag1', 'tag2']) + + result = sorted(strategy.get_urls()) + expected = [ + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+30%7D', + f'{TEST_URL}/?form_data=%7B%27slice_id%27%3A+31%7D', + ] + self.assertEqual(result, expected)