From 452b53092b227984a03ee3460fd2ecd86e18d0aa Mon Sep 17 00:00:00 2001 From: Rob DiCiuccio Date: Tue, 23 Mar 2021 07:23:23 -0700 Subject: [PATCH] feat: Refactor asyncEvent middleware and add websocket support (#13696) --- superset-frontend/package-lock.json | 39 ++ superset-frontend/package.json | 2 + .../javascripts/middleware/asyncEvent_spec.ts | 392 +++++++++--------- superset-frontend/src/chart/chartAction.js | 16 +- superset-frontend/src/chart/chartReducer.ts | 8 - superset-frontend/src/dashboard/index.jsx | 21 +- superset-frontend/src/explore/index.jsx | 21 +- superset-frontend/src/explore/types.ts | 1 - .../src/middleware/asyncEvent.ts | 381 ++++++++++------- 9 files changed, 460 insertions(+), 421 deletions(-) diff --git a/superset-frontend/package-lock.json b/superset-frontend/package-lock.json index af49ca4ad..4dda3bf3a 100644 --- a/superset-frontend/package-lock.json +++ b/superset-frontend/package-lock.json @@ -223,10 +223,12 @@ "jest": "^26.6.3", "jest-environment-enzyme": "^7.1.2", "jest-enzyme": "^7.1.2", + "jest-websocket-mock": "^2.2.0", "jsdom": "^16.4.0", "less": "^3.12.2", "less-loader": "^5.0.0", "mini-css-extract-plugin": "^0.4.0", + "mock-socket": "^9.0.3", "node-fetch": "^2.6.1", "optimize-css-assets-webpack-plugin": "^5.0.1", "po2json": "^0.4.5", @@ -38956,6 +38958,15 @@ "node": ">=8.0" } }, + "node_modules/jest-websocket-mock": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/jest-websocket-mock/-/jest-websocket-mock-2.2.0.tgz", + "integrity": "sha512-lc3wwXOEyNa4ZpcgJtUG3mmKMAq5FAsKYiZph0p/+PAJrAPuX4JCIfJMdJ/urRsLBG51fwm/wlVPNbR6s2nzNw==", + "dev": true, + "peerDependencies": { + "mock-socket": "^8||^9" + } + }, "node_modules/jest-worker": { "version": "26.6.2", "resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-26.6.2.tgz", @@ -41179,6 +41190,18 @@ "mkdirp": "bin/cmd.js" } }, + "node_modules/mock-socket": { + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-9.0.3.tgz", + "integrity": "sha512-SxIiD2yE/By79p3cNAAXyLQWTvEFNEzcAO7PH+DzRqKSFaplAPFjiQLmw8ofmpCsZf+Rhfn2/xCJagpdGmYdTw==", + "dev": true, + "dependencies": { + "url-parse": "^1.4.4" + }, + "engines": { + "node": ">= 8" + } + }, "node_modules/moment": { "version": "2.29.1", "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz", @@ -91850,6 +91873,13 @@ } } }, + "jest-websocket-mock": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/jest-websocket-mock/-/jest-websocket-mock-2.2.0.tgz", + "integrity": "sha512-lc3wwXOEyNa4ZpcgJtUG3mmKMAq5FAsKYiZph0p/+PAJrAPuX4JCIfJMdJ/urRsLBG51fwm/wlVPNbR6s2nzNw==", + "dev": true, + "requires": {} + }, "jest-worker": { "version": "26.6.2", "resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-26.6.2.tgz", @@ -93473,6 +93503,15 @@ "minimist": "^1.2.5" } }, + "mock-socket": { + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-9.0.3.tgz", + "integrity": "sha512-SxIiD2yE/By79p3cNAAXyLQWTvEFNEzcAO7PH+DzRqKSFaplAPFjiQLmw8ofmpCsZf+Rhfn2/xCJagpdGmYdTw==", + "dev": true, + "requires": { + "url-parse": "^1.4.4" + } + }, "moment": { "version": "2.29.1", "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz", diff --git a/superset-frontend/package.json b/superset-frontend/package.json index 9a3ccfb23..877ef983a 100644 --- a/superset-frontend/package.json +++ b/superset-frontend/package.json @@ -275,10 +275,12 @@ "jest": "^26.6.3", "jest-environment-enzyme": "^7.1.2", "jest-enzyme": "^7.1.2", + "jest-websocket-mock": "^2.2.0", "jsdom": "^16.4.0", "less": "^3.12.2", "less-loader": "^5.0.0", "mini-css-extract-plugin": "^0.4.0", + "mock-socket": "^9.0.3", "node-fetch": "^2.6.1", "optimize-css-assets-webpack-plugin": "^5.0.1", "po2json": "^0.4.5", diff --git a/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts b/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts index e42ac9152..60ea10f50 100644 --- a/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts +++ b/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts @@ -17,249 +17,231 @@ * under the License. */ import fetchMock from 'fetch-mock'; +import WS from 'jest-websocket-mock'; import sinon from 'sinon'; import * as featureFlags from 'src/featureFlags'; -import initAsyncEvents from 'src/middleware/asyncEvent'; - -jest.useFakeTimers(); +import { parseErrorJson } from 'src/utils/getClientErrorObject'; +import * as asyncEvent from 'src/middleware/asyncEvent'; describe('asyncEvent middleware', () => { - const next = sinon.spy(); - const state = { - charts: { - 123: { - id: 123, - status: 'loading', - asyncJobId: 'foo123', + const asyncPendingEvent = { + status: 'pending', + result_url: null, + job_id: 'foo123', + channel_id: '999', + errors: [], + }; + const asyncDoneEvent = { + id: '1518951480106-0', + status: 'done', + result_url: '/api/v1/chart/data/cache-key-1', + job_id: 'foo123', + channel_id: '999', + errors: [], + }; + const asyncErrorEvent = { + id: '1518951480107-0', + status: 'error', + result_url: null, + job_id: 'foo123', + channel_id: '999', + errors: [{ message: "Error: relation 'foo' does not exist" }], + }; + const chartData = { + result: [ + { + cache_key: '199f01f81f99c98693694821e4458111', + cached_dttm: null, + cache_timeout: 86400, + annotation_data: {}, + error: null, + is_cached: false, + query: + 'SELECT product_line AS product_line,\n sum(sales) AS "(Sales)"\nFROM cleaned_sales_data\nGROUP BY product_line\nLIMIT 50000', + status: 'success', + stacktrace: null, + rowcount: 7, + colnames: ['product_line', '(Sales)'], + coltypes: [1, 0], + data: [ + { + product_line: 'Classic Cars', + '(Sales)': 3919615.66, + }, + ], + applied_filters: [ + { + column: '__time_range', + }, + ], + rejected_filters: [], }, - 345: { - id: 345, - status: 'loading', - asyncJobId: 'foo345', - }, - }, - }; - const events = [ - { - status: 'done', - result_url: '/api/v1/chart/data/cache-key-1', - job_id: 'foo123', - channel_id: '999', - errors: [], - }, - { - status: 'done', - result_url: '/api/v1/chart/data/cache-key-2', - job_id: 'foo345', - channel_id: '999', - errors: [], - }, - ]; - const mockStore = { - getState: () => state, - dispatch: sinon.stub(), - }; - const action = { - type: 'GENERIC_ACTION', + ], }; + const EVENTS_ENDPOINT = 'glob:*/api/v1/async_event/*'; const CACHED_DATA_ENDPOINT = 'glob:*/api/v1/chart/data/*'; - const config = { - GLOBAL_ASYNC_QUERIES_TRANSPORT: 'polling', - GLOBAL_ASYNC_QUERIES_POLLING_DELAY: 500, - }; let featureEnabledStub: any; - function setup() { - const getPendingComponents = sinon.stub(); - const successAction = sinon.spy(); - const errorAction = sinon.spy(); - const testCallback = sinon.stub(); - const testCallbackPromise = sinon.stub(); - testCallbackPromise.returns( - new Promise(resolve => { - testCallback.callsFake(resolve); - }), - ); - - return { - getPendingComponents, - successAction, - errorAction, - testCallback, - testCallbackPromise, - }; - } - - beforeEach(() => { - fetchMock.get(EVENTS_ENDPOINT, { - status: 200, - body: { result: [] }, - }); - fetchMock.get(CACHED_DATA_ENDPOINT, { - status: 200, - body: { result: { some: 'data' } }, - }); + beforeEach(async () => { featureEnabledStub = sinon.stub(featureFlags, 'isFeatureEnabled'); featureEnabledStub.withArgs('GLOBAL_ASYNC_QUERIES').returns(true); }); + afterEach(() => { fetchMock.reset(); - next.resetHistory(); featureEnabledStub.restore(); }); + afterAll(fetchMock.reset); - it('should initialize and call next', () => { - const { getPendingComponents, successAction, errorAction } = setup(); - getPendingComponents.returns([]); - const asyncEventMiddleware = initAsyncEvents({ - config, - getPendingComponents, - successAction, - errorAction, - }); - asyncEventMiddleware(mockStore)(next)(action); - expect(next.callCount).toBe(1); - }); + describe('polling transport', () => { + const config = { + GLOBAL_ASYNC_QUERIES_TRANSPORT: 'polling', + GLOBAL_ASYNC_QUERIES_POLLING_DELAY: 50, + GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL: '', + }; - it('should fetch events when there are pending components', () => { - const { - getPendingComponents, - successAction, - errorAction, - testCallback, - testCallbackPromise, - } = setup(); - getPendingComponents.returns(Object.values(state.charts)); - const asyncEventMiddleware = initAsyncEvents({ - config, - getPendingComponents, - successAction, - errorAction, - processEventsCallback: testCallback, + beforeEach(async () => { + fetchMock.get(EVENTS_ENDPOINT, { + status: 200, + body: { result: [asyncDoneEvent] }, + }); + fetchMock.get(CACHED_DATA_ENDPOINT, { + status: 200, + body: { result: chartData }, + }); + asyncEvent.init(config); }); - asyncEventMiddleware(mockStore)(next)(action); + it('resolves with chart data on event done status', async () => { + await expect( + asyncEvent.waitForAsyncData(asyncPendingEvent), + ).resolves.toEqual([chartData]); - return testCallbackPromise().then(() => { expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1); - }); - }); - - it('should fetch cached when there are successful events', () => { - const { - getPendingComponents, - successAction, - errorAction, - testCallback, - testCallbackPromise, - } = setup(); - fetchMock.reset(); - fetchMock.get(EVENTS_ENDPOINT, { - status: 200, - body: { result: events }, - }); - fetchMock.get(CACHED_DATA_ENDPOINT, { - status: 200, - body: { result: { some: 'data' } }, - }); - getPendingComponents.returns(Object.values(state.charts)); - const asyncEventMiddleware = initAsyncEvents({ - config, - getPendingComponents, - successAction, - errorAction, - processEventsCallback: testCallback, + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1); }); - asyncEventMiddleware(mockStore)(next)(action); + it('rejects on event error status', async () => { + fetchMock.reset(); + fetchMock.get(EVENTS_ENDPOINT, { + status: 200, + body: { result: [asyncErrorEvent] }, + }); + const errorResponse = await parseErrorJson(asyncErrorEvent); + await expect( + asyncEvent.waitForAsyncData(asyncPendingEvent), + ).rejects.toEqual(errorResponse); - return testCallbackPromise().then(() => { expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1); - expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(2); - expect(successAction.callCount).toBe(2); - }); - }); - - it('should call errorAction for cache fetch error responses', () => { - const { - getPendingComponents, - successAction, - errorAction, - testCallback, - testCallbackPromise, - } = setup(); - fetchMock.reset(); - fetchMock.get(EVENTS_ENDPOINT, { - status: 200, - body: { result: events }, - }); - fetchMock.get(CACHED_DATA_ENDPOINT, { - status: 400, - body: { errors: ['error'] }, - }); - getPendingComponents.returns(Object.values(state.charts)); - const asyncEventMiddleware = initAsyncEvents({ - config, - getPendingComponents, - successAction, - errorAction, - processEventsCallback: testCallback, + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(0); }); - asyncEventMiddleware(mockStore)(next)(action); + it('rejects on cached data fetch error', async () => { + fetchMock.reset(); + fetchMock.get(EVENTS_ENDPOINT, { + status: 200, + body: { result: [asyncDoneEvent] }, + }); + fetchMock.get(CACHED_DATA_ENDPOINT, { + status: 400, + }); + + const errorResponse = [{ error: 'Bad Request' }]; + await expect( + asyncEvent.waitForAsyncData(asyncPendingEvent), + ).rejects.toEqual(errorResponse); - return testCallbackPromise().then(() => { expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1); - expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(2); - expect(errorAction.callCount).toBe(2); + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1); }); }); - it('should handle event fetching error responses', () => { - const { - getPendingComponents, - successAction, - errorAction, - testCallback, - testCallbackPromise, - } = setup(); - fetchMock.reset(); - fetchMock.get(EVENTS_ENDPOINT, { - status: 400, - body: { message: 'error' }, - }); - getPendingComponents.returns(Object.values(state.charts)); - const asyncEventMiddleware = initAsyncEvents({ - config, - getPendingComponents, - successAction, - errorAction, - processEventsCallback: testCallback, + describe('ws transport', () => { + let wsServer: WS; + const config = { + GLOBAL_ASYNC_QUERIES_TRANSPORT: 'ws', + GLOBAL_ASYNC_QUERIES_POLLING_DELAY: 50, + GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL: 'ws://127.0.0.1:8080/', + }; + + beforeEach(async () => { + fetchMock.get(EVENTS_ENDPOINT, { + status: 200, + body: { result: [asyncDoneEvent] }, + }); + fetchMock.get(CACHED_DATA_ENDPOINT, { + status: 200, + body: { result: chartData }, + }); + + wsServer = new WS(config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL); + asyncEvent.init(config); }); - asyncEventMiddleware(mockStore)(next)(action); - - return testCallbackPromise().then(() => { - expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1); - }); - }); - - it('should not fetch events when async queries are disabled', () => { - featureEnabledStub.restore(); - featureEnabledStub = sinon.stub(featureFlags, 'isFeatureEnabled'); - featureEnabledStub.withArgs('GLOBAL_ASYNC_QUERIES').returns(false); - const { getPendingComponents, successAction, errorAction } = setup(); - getPendingComponents.returns(Object.values(state.charts)); - const asyncEventMiddleware = initAsyncEvents({ - config, - getPendingComponents, - successAction, - errorAction, + afterEach(() => { + WS.clean(); }); - asyncEventMiddleware(mockStore)(next)(action); - expect(getPendingComponents.called).toBe(false); + it('resolves with chart data on event done status', async () => { + await wsServer.connected; + + const promise = asyncEvent.waitForAsyncData(asyncPendingEvent); + + wsServer.send(JSON.stringify(asyncDoneEvent)); + + await expect(promise).resolves.toEqual([chartData]); + + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1); + expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0); + }); + + it('rejects on event error status', async () => { + await wsServer.connected; + + const promise = asyncEvent.waitForAsyncData(asyncPendingEvent); + + wsServer.send(JSON.stringify(asyncErrorEvent)); + + const errorResponse = await parseErrorJson(asyncErrorEvent); + + await expect(promise).rejects.toEqual(errorResponse); + + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(0); + expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0); + }); + + it('rejects on cached data fetch error', async () => { + fetchMock.reset(); + fetchMock.get(CACHED_DATA_ENDPOINT, { + status: 400, + }); + + await wsServer.connected; + + const promise = asyncEvent.waitForAsyncData(asyncPendingEvent); + + wsServer.send(JSON.stringify(asyncDoneEvent)); + + const errorResponse = [{ error: 'Bad Request' }]; + + await expect(promise).rejects.toEqual(errorResponse); + + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1); + expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0); + }); + + it('resolves when events are received before listener', async () => { + await wsServer.connected; + + wsServer.send(JSON.stringify(asyncDoneEvent)); + + const promise = asyncEvent.waitForAsyncData(asyncPendingEvent); + await expect(promise).resolves.toEqual([chartData]); + + expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1); + expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0); + }); }); }); diff --git a/superset-frontend/src/chart/chartAction.js b/superset-frontend/src/chart/chartAction.js index 7162cbc96..906a41c4f 100644 --- a/superset-frontend/src/chart/chartAction.js +++ b/superset-frontend/src/chart/chartAction.js @@ -42,6 +42,7 @@ import { Logger, LOG_ACTIONS_LOAD_CHART } from '../logger/LogUtils'; import { getClientErrorObject } from '../utils/getClientErrorObject'; import { allowCrossDomain as domainShardingEnabled } from '../utils/hostNamesConfig'; import { updateDataMask } from '../dataMask/actions'; +import { waitForAsyncData } from '../middleware/asyncEvent'; export const CHART_UPDATE_STARTED = 'CHART_UPDATE_STARTED'; export function chartUpdateStarted(queryController, latestQueryFormData, key) { @@ -68,11 +69,6 @@ export function chartUpdateFailed(queriesResponse, key) { return { type: CHART_UPDATE_FAILED, queriesResponse, key }; } -export const CHART_UPDATE_QUEUED = 'CHART_UPDATE_QUEUED'; -export function chartUpdateQueued(asyncJobMeta, key) { - return { type: CHART_UPDATE_QUEUED, asyncJobMeta, key }; -} - export const CHART_RENDERING_FAILED = 'CHART_RENDERING_FAILED'; export function chartRenderingFailed(error, key, stackTrace) { return { type: CHART_RENDERING_FAILED, error, key, stackTrace }; @@ -387,9 +383,11 @@ export function exploreJSON( if (isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) { // deal with getChartDataRequest transforming the response data const result = 'result' in response ? response.result[0] : response; - return dispatch(chartUpdateQueued(result, key)); + return waitForAsyncData(result); } - + return queriesResponse; + }) + .then(queriesResponse => { queriesResponse.forEach(resultItem => dispatch( logEvent(LOG_ACTIONS_LOAD_CHART, { @@ -414,6 +412,10 @@ export function exploreJSON( return dispatch(chartUpdateSucceeded(queriesResponse, key)); }) .catch(response => { + if (isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) { + return dispatch(chartUpdateFailed([response], key)); + } + const appendErrorLog = (errorDetails, isCached) => { dispatch( logEvent(LOG_ACTIONS_LOAD_CHART, { diff --git a/superset-frontend/src/chart/chartReducer.ts b/superset-frontend/src/chart/chartReducer.ts index e517db28a..3f68c0434 100644 --- a/superset-frontend/src/chart/chartReducer.ts +++ b/superset-frontend/src/chart/chartReducer.ts @@ -81,14 +81,6 @@ export default function chartReducer( chartUpdateEndTime: now(), }; }, - [actions.CHART_UPDATE_QUEUED](state) { - return { - ...state, - asyncJobId: action.asyncJobMeta.job_id, - chartStatus: 'loading', - chartUpdateEndTime: now(), - }; - }, [actions.CHART_RENDERING_SUCCEEDED](state) { return { ...state, chartStatus: 'rendered', chartUpdateEndTime: now() }; }, diff --git a/superset-frontend/src/dashboard/index.jsx b/superset-frontend/src/dashboard/index.jsx index 78ba1d2aa..5d696bd3f 100644 --- a/superset-frontend/src/dashboard/index.jsx +++ b/superset-frontend/src/dashboard/index.jsx @@ -24,36 +24,17 @@ import { initFeatureFlags } from 'src/featureFlags'; import { initEnhancer } from '../reduxUtils'; import getInitialState from './reducers/getInitialState'; import rootReducer from './reducers/index'; -import initAsyncEvents from '../middleware/asyncEvent'; import logger from '../middleware/loggerMiddleware'; -import * as actions from '../chart/chartAction'; - import App from './App'; const appContainer = document.getElementById('app'); const bootstrapData = JSON.parse(appContainer.getAttribute('data-bootstrap')); initFeatureFlags(bootstrapData.common.feature_flags); const initState = getInitialState(bootstrapData); - -const asyncEventMiddleware = initAsyncEvents({ - config: bootstrapData.common.conf, - getPendingComponents: ({ charts }) => - Object.values(charts).filter( - c => c.chartStatus === 'loading' && c.asyncJobId !== undefined, - ), - successAction: (componentId, componentData) => - actions.chartUpdateSucceeded(componentData, componentId), - errorAction: (componentId, response) => - actions.chartUpdateFailed(response, componentId), -}); - const store = createStore( rootReducer, initState, - compose( - applyMiddleware(thunk, logger, asyncEventMiddleware), - initEnhancer(false), - ), + compose(applyMiddleware(thunk, logger), initEnhancer(false)), ); ReactDOM.render(, document.getElementById('app')); diff --git a/superset-frontend/src/explore/index.jsx b/superset-frontend/src/explore/index.jsx index 7698dee01..cc99666e0 100644 --- a/superset-frontend/src/explore/index.jsx +++ b/superset-frontend/src/explore/index.jsx @@ -25,9 +25,6 @@ import { initFeatureFlags } from '../featureFlags'; import { initEnhancer } from '../reduxUtils'; import getInitialState from './reducers/getInitialState'; import rootReducer from './reducers/index'; -import initAsyncEvents from '../middleware/asyncEvent'; -import * as actions from '../chart/chartAction'; - import App from './App'; const exploreViewContainer = document.getElementById('app'); @@ -36,26 +33,10 @@ const bootstrapData = JSON.parse( ); initFeatureFlags(bootstrapData.common.feature_flags); const initState = getInitialState(bootstrapData); - -const asyncEventMiddleware = initAsyncEvents({ - config: bootstrapData.common.conf, - getPendingComponents: ({ charts }) => - Object.values(charts).filter( - c => c.chartStatus === 'loading' && c.asyncJobId !== undefined, - ), - successAction: (componentId, componentData) => - actions.chartUpdateSucceeded(componentData, componentId), - errorAction: (componentId, response) => - actions.chartUpdateFailed(response, componentId), -}); - const store = createStore( rootReducer, initState, - compose( - applyMiddleware(thunk, logger, asyncEventMiddleware), - initEnhancer(false), - ), + compose(applyMiddleware(thunk, logger), initEnhancer(false)), ); ReactDOM.render(, document.getElementById('app')); diff --git a/superset-frontend/src/explore/types.ts b/superset-frontend/src/explore/types.ts index 7c05fa64a..32ad4ebe7 100644 --- a/superset-frontend/src/explore/types.ts +++ b/superset-frontend/src/explore/types.ts @@ -49,7 +49,6 @@ export interface ChartState { queryController: AbortController | null; queriesResponse: QueryData | null; triggerQuery: boolean; - asyncJobId?: string; } export type OptionSortType = Partial< diff --git a/superset-frontend/src/middleware/asyncEvent.ts b/superset-frontend/src/middleware/asyncEvent.ts index 1beb99711..aafd7fc6e 100644 --- a/superset-frontend/src/middleware/asyncEvent.ts +++ b/superset-frontend/src/middleware/asyncEvent.ts @@ -16,8 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Dispatch, Middleware, MiddlewareAPI } from 'redux'; -import { makeApi, SupersetClient } from '@superset-ui/core'; +import { ensureIsArray, makeApi, SupersetClient } from '@superset-ui/core'; import { SupersetError } from 'src/components/ErrorMessage/types'; import { FeatureFlag, isFeatureEnabled } from '../featureFlags'; import { @@ -25,178 +24,240 @@ import { parseErrorJson, } from '../utils/getClientErrorObject'; -export type AsyncEvent = { - id: string; +type AsyncEvent = { + id?: string | null; channel_id: string; job_id: string; - user_id: string; + user_id?: string; status: string; - errors: SupersetError[]; - result_url: string; -}; - -type AsyncEventOptions = { - config: { - GLOBAL_ASYNC_QUERIES_TRANSPORT: string; - GLOBAL_ASYNC_QUERIES_POLLING_DELAY: number; - }; - getPendingComponents: (state: any) => any[]; - successAction: (componentId: number, componentData: any) => { type: string }; - errorAction: (componentId: number, response: any) => { type: string }; - processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests + errors?: SupersetError[]; + result_url: string | null; }; type CachedDataResponse = { - componentId: number; status: string; data: any; }; +type AppConfig = Record; +type ListenerFn = (asyncEvent: AsyncEvent) => Promise; -const initAsyncEvents = (options: AsyncEventOptions) => { - // TODO: implement websocket support - const TRANSPORT_POLLING = 'polling'; - const { - config, - getPendingComponents, - successAction, - errorAction, - processEventsCallback, - } = options; - const transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING; - const polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500; +const TRANSPORT_POLLING = 'polling'; +const TRANSPORT_WS = 'ws'; +const JOB_STATUS = { + PENDING: 'pending', + RUNNING: 'running', + ERROR: 'error', + DONE: 'done', +}; +const LOCALSTORAGE_KEY = 'last_async_event_id'; +const POLLING_URL = '/api/v1/async_event/'; +const MAX_RETRIES = 6; +const RETRY_DELAY = 100; - const middleware: Middleware = (store: MiddlewareAPI) => (next: Dispatch) => { - const JOB_STATUS = { - PENDING: 'pending', - RUNNING: 'running', - ERROR: 'error', - DONE: 'done', - }; - const LOCALSTORAGE_KEY = 'last_async_event_id'; - const POLLING_URL = '/api/v1/async_event/'; - let lastReceivedEventId: string | null; +let config: AppConfig; +let transport: string; +let polling_delay: number; +let listenersByJobId: Record; +let retriesByJobId: Record; +let lastReceivedEventId: string | null | undefined; - try { - lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY); - } catch (err) { - console.warn('Failed to fetch last event Id from localStorage'); +export const init = (appConfig?: AppConfig) => { + if (!isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) return; + + listenersByJobId = {}; + retriesByJobId = {}; + lastReceivedEventId = null; + + if (appConfig) { + config = appConfig; + } else { + // load bootstrap data from DOM + const appContainer = document.getElementById('app'); + if (appContainer) { + const bootstrapData = JSON.parse( + appContainer?.getAttribute('data-bootstrap') || '{}', + ); + config = bootstrapData?.common?.conf; + } else { + config = {}; + console.warn('asyncEvent: app config data not found'); } + } + transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING; + polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500; - const fetchEvents = makeApi< - { last_id?: string | null }, - { result: AsyncEvent[] } - >({ - method: 'GET', - endpoint: POLLING_URL, - }); + try { + lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY); + } catch (err) { + console.warn('Failed to fetch last event Id from localStorage'); + } - const fetchCachedData = async ( - asyncEvent: AsyncEvent, - componentId: number, - ): Promise => { - let status = 'success'; - let data; - try { - const { json } = await SupersetClient.get({ - endpoint: asyncEvent.result_url, - }); - data = 'result' in json ? json.result : json; - } catch (response) { - status = 'error'; - data = await getClientErrorObject(response); - } - - return { componentId, status, data }; - }; - - const setLastId = (asyncEvent: AsyncEvent) => { - lastReceivedEventId = asyncEvent.id; - try { - localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string); - } catch (err) { - console.warn('Error saving event Id to localStorage', err); - } - }; - - const processEvents = async () => { - let queuedComponents = getPendingComponents(store.getState()); - const eventArgs = lastReceivedEventId - ? { last_id: lastReceivedEventId } - : {}; - const events: AsyncEvent[] = []; - if (queuedComponents && queuedComponents.length) { - try { - const { result: events } = await fetchEvents(eventArgs); - // refetch queuedComponents due to race condition where results are available - // before component state is updated with asyncJobId - queuedComponents = getPendingComponents(store.getState()); - if (events && events.length) { - const componentsByJobId = queuedComponents.reduce((acc, item) => { - acc[item.asyncJobId] = item; - return acc; - }, {}); - const fetchDataEvents: Promise[] = []; - events.forEach((asyncEvent: AsyncEvent) => { - const component = componentsByJobId[asyncEvent.job_id]; - if (!component) { - console.warn( - 'Component not found for job_id', - asyncEvent.job_id, - ); - return setLastId(asyncEvent); - } - const componentId = component.id; - switch (asyncEvent.status) { - case JOB_STATUS.DONE: - fetchDataEvents.push( - fetchCachedData(asyncEvent, componentId), - ); - break; - case JOB_STATUS.ERROR: - store.dispatch( - errorAction(componentId, [parseErrorJson(asyncEvent)]), - ); - break; - default: - console.warn('Received event with status', asyncEvent.status); - } - - return setLastId(asyncEvent); - }); - - const fetchResults = await Promise.all(fetchDataEvents); - fetchResults.forEach(result => { - const data = Array.isArray(result.data) - ? result.data - : [result.data]; - if (result.status === 'success') { - store.dispatch(successAction(result.componentId, data)); - } else { - store.dispatch(errorAction(result.componentId, data)); - } - }); - } - } catch (err) { - console.warn(err); - } - } - - if (processEventsCallback) processEventsCallback(events); - - return setTimeout(processEvents, polling_delay); - }; - - if ( - isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES) && - transport === TRANSPORT_POLLING - ) { - processEvents(); - } - - return action => next(action); - }; - - return middleware; + if (transport === TRANSPORT_POLLING) { + loadEventsFromApi(); + } + if (transport === TRANSPORT_WS) { + wsConnect(); + } }; -export default initAsyncEvents; +const addListener = (id: string, fn: any) => { + listenersByJobId[id] = fn; +}; + +const removeListener = (id: string) => { + if (!listenersByJobId[id]) return; + delete listenersByJobId[id]; +}; + +export const waitForAsyncData = async (asyncResponse: AsyncEvent) => + new Promise((resolve, reject) => { + const jobId = asyncResponse.job_id; + const listener = async (asyncEvent: AsyncEvent) => { + switch (asyncEvent.status) { + case JOB_STATUS.DONE: { + let { data, status } = await fetchCachedData(asyncEvent); // eslint-disable-line prefer-const + data = ensureIsArray(data); + if (status === 'success') { + resolve(data); + } else { + reject(data); + } + break; + } + case JOB_STATUS.ERROR: { + const err = parseErrorJson(asyncEvent); + reject(err); + break; + } + default: { + console.warn('received event with status', asyncEvent.status); + } + } + removeListener(jobId); + }; + addListener(jobId, listener); + }); + +const fetchEvents = makeApi< + { last_id?: string | null }, + { result: AsyncEvent[] } +>({ + method: 'GET', + endpoint: POLLING_URL, +}); + +const fetchCachedData = async ( + asyncEvent: AsyncEvent, +): Promise => { + let status = 'success'; + let data; + try { + const { json } = await SupersetClient.get({ + endpoint: String(asyncEvent.result_url), + }); + data = 'result' in json ? json.result : json; + } catch (response) { + status = 'error'; + data = await getClientErrorObject(response); + } + + return { status, data }; +}; + +const setLastId = (asyncEvent: AsyncEvent) => { + lastReceivedEventId = asyncEvent.id; + try { + localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string); + } catch (err) { + console.warn('Error saving event Id to localStorage', err); + } +}; + +const loadEventsFromApi = async () => { + const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {}; + if (Object.keys(listenersByJobId).length) { + try { + const { result: events } = await fetchEvents(eventArgs); + if (events && events.length) await processEvents(events); + } catch (err) { + console.warn(err); + } + } + + if (transport === TRANSPORT_POLLING) { + setTimeout(loadEventsFromApi, polling_delay); + } +}; + +export const processEvents = async (events: AsyncEvent[]) => { + events.forEach((asyncEvent: AsyncEvent) => { + const jobId = asyncEvent.job_id; + const listener = listenersByJobId[jobId]; + if (listener) { + listener(asyncEvent); + delete retriesByJobId[jobId]; + } else { + // handle race condition where event is received + // before listener is registered + if (!retriesByJobId[jobId]) retriesByJobId[jobId] = 0; + retriesByJobId[jobId] += 1; + + if (retriesByJobId[jobId] <= MAX_RETRIES) { + setTimeout(() => { + processEvents([asyncEvent]); + }, RETRY_DELAY * retriesByJobId[jobId]); + } else { + delete retriesByJobId[jobId]; + console.warn('listener not found for job_id', asyncEvent.job_id); + } + } + setLastId(asyncEvent); + }); +}; + +const wsConnectMaxRetries = 6; +const wsConnectErrorDelay = 2500; +let wsConnectRetries = 0; +let wsConnectTimeout: any; +let ws: WebSocket; + +const wsConnect = (): void => { + let url = config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL; + if (lastReceivedEventId) url += `?last_id=${lastReceivedEventId}`; + ws = new WebSocket(url); + + ws.addEventListener('open', event => { + console.log('WebSocket connected'); + clearTimeout(wsConnectTimeout); + wsConnectRetries = 0; + }); + + ws.addEventListener('close', event => { + wsConnectTimeout = setTimeout(() => { + wsConnectRetries += 1; + if (wsConnectRetries <= wsConnectMaxRetries) { + wsConnect(); + } else { + console.warn('WebSocket not available, falling back to async polling'); + loadEventsFromApi(); + } + }, wsConnectErrorDelay); + }); + + ws.addEventListener('error', event => { + // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState + if (ws.readyState < 2) ws.close(); + }); + + ws.addEventListener('message', async event => { + let events: AsyncEvent[] = []; + try { + events = [JSON.parse(event.data)]; + await processEvents(events); + } catch (err) { + console.warn(err); + } + }); +}; + +init();