diff --git a/superset-frontend/package-lock.json b/superset-frontend/package-lock.json
index af49ca4ad..4dda3bf3a 100644
--- a/superset-frontend/package-lock.json
+++ b/superset-frontend/package-lock.json
@@ -223,10 +223,12 @@
"jest": "^26.6.3",
"jest-environment-enzyme": "^7.1.2",
"jest-enzyme": "^7.1.2",
+ "jest-websocket-mock": "^2.2.0",
"jsdom": "^16.4.0",
"less": "^3.12.2",
"less-loader": "^5.0.0",
"mini-css-extract-plugin": "^0.4.0",
+ "mock-socket": "^9.0.3",
"node-fetch": "^2.6.1",
"optimize-css-assets-webpack-plugin": "^5.0.1",
"po2json": "^0.4.5",
@@ -38956,6 +38958,15 @@
"node": ">=8.0"
}
},
+ "node_modules/jest-websocket-mock": {
+ "version": "2.2.0",
+ "resolved": "https://registry.npmjs.org/jest-websocket-mock/-/jest-websocket-mock-2.2.0.tgz",
+ "integrity": "sha512-lc3wwXOEyNa4ZpcgJtUG3mmKMAq5FAsKYiZph0p/+PAJrAPuX4JCIfJMdJ/urRsLBG51fwm/wlVPNbR6s2nzNw==",
+ "dev": true,
+ "peerDependencies": {
+ "mock-socket": "^8||^9"
+ }
+ },
"node_modules/jest-worker": {
"version": "26.6.2",
"resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-26.6.2.tgz",
@@ -41179,6 +41190,18 @@
"mkdirp": "bin/cmd.js"
}
},
+ "node_modules/mock-socket": {
+ "version": "9.0.3",
+ "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-9.0.3.tgz",
+ "integrity": "sha512-SxIiD2yE/By79p3cNAAXyLQWTvEFNEzcAO7PH+DzRqKSFaplAPFjiQLmw8ofmpCsZf+Rhfn2/xCJagpdGmYdTw==",
+ "dev": true,
+ "dependencies": {
+ "url-parse": "^1.4.4"
+ },
+ "engines": {
+ "node": ">= 8"
+ }
+ },
"node_modules/moment": {
"version": "2.29.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz",
@@ -91850,6 +91873,13 @@
}
}
},
+ "jest-websocket-mock": {
+ "version": "2.2.0",
+ "resolved": "https://registry.npmjs.org/jest-websocket-mock/-/jest-websocket-mock-2.2.0.tgz",
+ "integrity": "sha512-lc3wwXOEyNa4ZpcgJtUG3mmKMAq5FAsKYiZph0p/+PAJrAPuX4JCIfJMdJ/urRsLBG51fwm/wlVPNbR6s2nzNw==",
+ "dev": true,
+ "requires": {}
+ },
"jest-worker": {
"version": "26.6.2",
"resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-26.6.2.tgz",
@@ -93473,6 +93503,15 @@
"minimist": "^1.2.5"
}
},
+ "mock-socket": {
+ "version": "9.0.3",
+ "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-9.0.3.tgz",
+ "integrity": "sha512-SxIiD2yE/By79p3cNAAXyLQWTvEFNEzcAO7PH+DzRqKSFaplAPFjiQLmw8ofmpCsZf+Rhfn2/xCJagpdGmYdTw==",
+ "dev": true,
+ "requires": {
+ "url-parse": "^1.4.4"
+ }
+ },
"moment": {
"version": "2.29.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz",
diff --git a/superset-frontend/package.json b/superset-frontend/package.json
index 9a3ccfb23..877ef983a 100644
--- a/superset-frontend/package.json
+++ b/superset-frontend/package.json
@@ -275,10 +275,12 @@
"jest": "^26.6.3",
"jest-environment-enzyme": "^7.1.2",
"jest-enzyme": "^7.1.2",
+ "jest-websocket-mock": "^2.2.0",
"jsdom": "^16.4.0",
"less": "^3.12.2",
"less-loader": "^5.0.0",
"mini-css-extract-plugin": "^0.4.0",
+ "mock-socket": "^9.0.3",
"node-fetch": "^2.6.1",
"optimize-css-assets-webpack-plugin": "^5.0.1",
"po2json": "^0.4.5",
diff --git a/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts b/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts
index e42ac9152..60ea10f50 100644
--- a/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts
+++ b/superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts
@@ -17,249 +17,231 @@
* under the License.
*/
import fetchMock from 'fetch-mock';
+import WS from 'jest-websocket-mock';
import sinon from 'sinon';
import * as featureFlags from 'src/featureFlags';
-import initAsyncEvents from 'src/middleware/asyncEvent';
-
-jest.useFakeTimers();
+import { parseErrorJson } from 'src/utils/getClientErrorObject';
+import * as asyncEvent from 'src/middleware/asyncEvent';
describe('asyncEvent middleware', () => {
- const next = sinon.spy();
- const state = {
- charts: {
- 123: {
- id: 123,
- status: 'loading',
- asyncJobId: 'foo123',
+ const asyncPendingEvent = {
+ status: 'pending',
+ result_url: null,
+ job_id: 'foo123',
+ channel_id: '999',
+ errors: [],
+ };
+ const asyncDoneEvent = {
+ id: '1518951480106-0',
+ status: 'done',
+ result_url: '/api/v1/chart/data/cache-key-1',
+ job_id: 'foo123',
+ channel_id: '999',
+ errors: [],
+ };
+ const asyncErrorEvent = {
+ id: '1518951480107-0',
+ status: 'error',
+ result_url: null,
+ job_id: 'foo123',
+ channel_id: '999',
+ errors: [{ message: "Error: relation 'foo' does not exist" }],
+ };
+ const chartData = {
+ result: [
+ {
+ cache_key: '199f01f81f99c98693694821e4458111',
+ cached_dttm: null,
+ cache_timeout: 86400,
+ annotation_data: {},
+ error: null,
+ is_cached: false,
+ query:
+ 'SELECT product_line AS product_line,\n sum(sales) AS "(Sales)"\nFROM cleaned_sales_data\nGROUP BY product_line\nLIMIT 50000',
+ status: 'success',
+ stacktrace: null,
+ rowcount: 7,
+ colnames: ['product_line', '(Sales)'],
+ coltypes: [1, 0],
+ data: [
+ {
+ product_line: 'Classic Cars',
+ '(Sales)': 3919615.66,
+ },
+ ],
+ applied_filters: [
+ {
+ column: '__time_range',
+ },
+ ],
+ rejected_filters: [],
},
- 345: {
- id: 345,
- status: 'loading',
- asyncJobId: 'foo345',
- },
- },
- };
- const events = [
- {
- status: 'done',
- result_url: '/api/v1/chart/data/cache-key-1',
- job_id: 'foo123',
- channel_id: '999',
- errors: [],
- },
- {
- status: 'done',
- result_url: '/api/v1/chart/data/cache-key-2',
- job_id: 'foo345',
- channel_id: '999',
- errors: [],
- },
- ];
- const mockStore = {
- getState: () => state,
- dispatch: sinon.stub(),
- };
- const action = {
- type: 'GENERIC_ACTION',
+ ],
};
+
const EVENTS_ENDPOINT = 'glob:*/api/v1/async_event/*';
const CACHED_DATA_ENDPOINT = 'glob:*/api/v1/chart/data/*';
- const config = {
- GLOBAL_ASYNC_QUERIES_TRANSPORT: 'polling',
- GLOBAL_ASYNC_QUERIES_POLLING_DELAY: 500,
- };
let featureEnabledStub: any;
- function setup() {
- const getPendingComponents = sinon.stub();
- const successAction = sinon.spy();
- const errorAction = sinon.spy();
- const testCallback = sinon.stub();
- const testCallbackPromise = sinon.stub();
- testCallbackPromise.returns(
- new Promise(resolve => {
- testCallback.callsFake(resolve);
- }),
- );
-
- return {
- getPendingComponents,
- successAction,
- errorAction,
- testCallback,
- testCallbackPromise,
- };
- }
-
- beforeEach(() => {
- fetchMock.get(EVENTS_ENDPOINT, {
- status: 200,
- body: { result: [] },
- });
- fetchMock.get(CACHED_DATA_ENDPOINT, {
- status: 200,
- body: { result: { some: 'data' } },
- });
+ beforeEach(async () => {
featureEnabledStub = sinon.stub(featureFlags, 'isFeatureEnabled');
featureEnabledStub.withArgs('GLOBAL_ASYNC_QUERIES').returns(true);
});
+
afterEach(() => {
fetchMock.reset();
- next.resetHistory();
featureEnabledStub.restore();
});
+
afterAll(fetchMock.reset);
- it('should initialize and call next', () => {
- const { getPendingComponents, successAction, errorAction } = setup();
- getPendingComponents.returns([]);
- const asyncEventMiddleware = initAsyncEvents({
- config,
- getPendingComponents,
- successAction,
- errorAction,
- });
- asyncEventMiddleware(mockStore)(next)(action);
- expect(next.callCount).toBe(1);
- });
+ describe('polling transport', () => {
+ const config = {
+ GLOBAL_ASYNC_QUERIES_TRANSPORT: 'polling',
+ GLOBAL_ASYNC_QUERIES_POLLING_DELAY: 50,
+ GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL: '',
+ };
- it('should fetch events when there are pending components', () => {
- const {
- getPendingComponents,
- successAction,
- errorAction,
- testCallback,
- testCallbackPromise,
- } = setup();
- getPendingComponents.returns(Object.values(state.charts));
- const asyncEventMiddleware = initAsyncEvents({
- config,
- getPendingComponents,
- successAction,
- errorAction,
- processEventsCallback: testCallback,
+ beforeEach(async () => {
+ fetchMock.get(EVENTS_ENDPOINT, {
+ status: 200,
+ body: { result: [asyncDoneEvent] },
+ });
+ fetchMock.get(CACHED_DATA_ENDPOINT, {
+ status: 200,
+ body: { result: chartData },
+ });
+ asyncEvent.init(config);
});
- asyncEventMiddleware(mockStore)(next)(action);
+ it('resolves with chart data on event done status', async () => {
+ await expect(
+ asyncEvent.waitForAsyncData(asyncPendingEvent),
+ ).resolves.toEqual([chartData]);
- return testCallbackPromise().then(() => {
expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1);
- });
- });
-
- it('should fetch cached when there are successful events', () => {
- const {
- getPendingComponents,
- successAction,
- errorAction,
- testCallback,
- testCallbackPromise,
- } = setup();
- fetchMock.reset();
- fetchMock.get(EVENTS_ENDPOINT, {
- status: 200,
- body: { result: events },
- });
- fetchMock.get(CACHED_DATA_ENDPOINT, {
- status: 200,
- body: { result: { some: 'data' } },
- });
- getPendingComponents.returns(Object.values(state.charts));
- const asyncEventMiddleware = initAsyncEvents({
- config,
- getPendingComponents,
- successAction,
- errorAction,
- processEventsCallback: testCallback,
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1);
});
- asyncEventMiddleware(mockStore)(next)(action);
+ it('rejects on event error status', async () => {
+ fetchMock.reset();
+ fetchMock.get(EVENTS_ENDPOINT, {
+ status: 200,
+ body: { result: [asyncErrorEvent] },
+ });
+ const errorResponse = await parseErrorJson(asyncErrorEvent);
+ await expect(
+ asyncEvent.waitForAsyncData(asyncPendingEvent),
+ ).rejects.toEqual(errorResponse);
- return testCallbackPromise().then(() => {
expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1);
- expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(2);
- expect(successAction.callCount).toBe(2);
- });
- });
-
- it('should call errorAction for cache fetch error responses', () => {
- const {
- getPendingComponents,
- successAction,
- errorAction,
- testCallback,
- testCallbackPromise,
- } = setup();
- fetchMock.reset();
- fetchMock.get(EVENTS_ENDPOINT, {
- status: 200,
- body: { result: events },
- });
- fetchMock.get(CACHED_DATA_ENDPOINT, {
- status: 400,
- body: { errors: ['error'] },
- });
- getPendingComponents.returns(Object.values(state.charts));
- const asyncEventMiddleware = initAsyncEvents({
- config,
- getPendingComponents,
- successAction,
- errorAction,
- processEventsCallback: testCallback,
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(0);
});
- asyncEventMiddleware(mockStore)(next)(action);
+ it('rejects on cached data fetch error', async () => {
+ fetchMock.reset();
+ fetchMock.get(EVENTS_ENDPOINT, {
+ status: 200,
+ body: { result: [asyncDoneEvent] },
+ });
+ fetchMock.get(CACHED_DATA_ENDPOINT, {
+ status: 400,
+ });
+
+ const errorResponse = [{ error: 'Bad Request' }];
+ await expect(
+ asyncEvent.waitForAsyncData(asyncPendingEvent),
+ ).rejects.toEqual(errorResponse);
- return testCallbackPromise().then(() => {
expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1);
- expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(2);
- expect(errorAction.callCount).toBe(2);
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1);
});
});
- it('should handle event fetching error responses', () => {
- const {
- getPendingComponents,
- successAction,
- errorAction,
- testCallback,
- testCallbackPromise,
- } = setup();
- fetchMock.reset();
- fetchMock.get(EVENTS_ENDPOINT, {
- status: 400,
- body: { message: 'error' },
- });
- getPendingComponents.returns(Object.values(state.charts));
- const asyncEventMiddleware = initAsyncEvents({
- config,
- getPendingComponents,
- successAction,
- errorAction,
- processEventsCallback: testCallback,
+ describe('ws transport', () => {
+ let wsServer: WS;
+ const config = {
+ GLOBAL_ASYNC_QUERIES_TRANSPORT: 'ws',
+ GLOBAL_ASYNC_QUERIES_POLLING_DELAY: 50,
+ GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL: 'ws://127.0.0.1:8080/',
+ };
+
+ beforeEach(async () => {
+ fetchMock.get(EVENTS_ENDPOINT, {
+ status: 200,
+ body: { result: [asyncDoneEvent] },
+ });
+ fetchMock.get(CACHED_DATA_ENDPOINT, {
+ status: 200,
+ body: { result: chartData },
+ });
+
+ wsServer = new WS(config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL);
+ asyncEvent.init(config);
});
- asyncEventMiddleware(mockStore)(next)(action);
-
- return testCallbackPromise().then(() => {
- expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(1);
- });
- });
-
- it('should not fetch events when async queries are disabled', () => {
- featureEnabledStub.restore();
- featureEnabledStub = sinon.stub(featureFlags, 'isFeatureEnabled');
- featureEnabledStub.withArgs('GLOBAL_ASYNC_QUERIES').returns(false);
- const { getPendingComponents, successAction, errorAction } = setup();
- getPendingComponents.returns(Object.values(state.charts));
- const asyncEventMiddleware = initAsyncEvents({
- config,
- getPendingComponents,
- successAction,
- errorAction,
+ afterEach(() => {
+ WS.clean();
});
- asyncEventMiddleware(mockStore)(next)(action);
- expect(getPendingComponents.called).toBe(false);
+ it('resolves with chart data on event done status', async () => {
+ await wsServer.connected;
+
+ const promise = asyncEvent.waitForAsyncData(asyncPendingEvent);
+
+ wsServer.send(JSON.stringify(asyncDoneEvent));
+
+ await expect(promise).resolves.toEqual([chartData]);
+
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1);
+ expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0);
+ });
+
+ it('rejects on event error status', async () => {
+ await wsServer.connected;
+
+ const promise = asyncEvent.waitForAsyncData(asyncPendingEvent);
+
+ wsServer.send(JSON.stringify(asyncErrorEvent));
+
+ const errorResponse = await parseErrorJson(asyncErrorEvent);
+
+ await expect(promise).rejects.toEqual(errorResponse);
+
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(0);
+ expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0);
+ });
+
+ it('rejects on cached data fetch error', async () => {
+ fetchMock.reset();
+ fetchMock.get(CACHED_DATA_ENDPOINT, {
+ status: 400,
+ });
+
+ await wsServer.connected;
+
+ const promise = asyncEvent.waitForAsyncData(asyncPendingEvent);
+
+ wsServer.send(JSON.stringify(asyncDoneEvent));
+
+ const errorResponse = [{ error: 'Bad Request' }];
+
+ await expect(promise).rejects.toEqual(errorResponse);
+
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1);
+ expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0);
+ });
+
+ it('resolves when events are received before listener', async () => {
+ await wsServer.connected;
+
+ wsServer.send(JSON.stringify(asyncDoneEvent));
+
+ const promise = asyncEvent.waitForAsyncData(asyncPendingEvent);
+ await expect(promise).resolves.toEqual([chartData]);
+
+ expect(fetchMock.calls(CACHED_DATA_ENDPOINT)).toHaveLength(1);
+ expect(fetchMock.calls(EVENTS_ENDPOINT)).toHaveLength(0);
+ });
});
});
diff --git a/superset-frontend/src/chart/chartAction.js b/superset-frontend/src/chart/chartAction.js
index 7162cbc96..906a41c4f 100644
--- a/superset-frontend/src/chart/chartAction.js
+++ b/superset-frontend/src/chart/chartAction.js
@@ -42,6 +42,7 @@ import { Logger, LOG_ACTIONS_LOAD_CHART } from '../logger/LogUtils';
import { getClientErrorObject } from '../utils/getClientErrorObject';
import { allowCrossDomain as domainShardingEnabled } from '../utils/hostNamesConfig';
import { updateDataMask } from '../dataMask/actions';
+import { waitForAsyncData } from '../middleware/asyncEvent';
export const CHART_UPDATE_STARTED = 'CHART_UPDATE_STARTED';
export function chartUpdateStarted(queryController, latestQueryFormData, key) {
@@ -68,11 +69,6 @@ export function chartUpdateFailed(queriesResponse, key) {
return { type: CHART_UPDATE_FAILED, queriesResponse, key };
}
-export const CHART_UPDATE_QUEUED = 'CHART_UPDATE_QUEUED';
-export function chartUpdateQueued(asyncJobMeta, key) {
- return { type: CHART_UPDATE_QUEUED, asyncJobMeta, key };
-}
-
export const CHART_RENDERING_FAILED = 'CHART_RENDERING_FAILED';
export function chartRenderingFailed(error, key, stackTrace) {
return { type: CHART_RENDERING_FAILED, error, key, stackTrace };
@@ -387,9 +383,11 @@ export function exploreJSON(
if (isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) {
// deal with getChartDataRequest transforming the response data
const result = 'result' in response ? response.result[0] : response;
- return dispatch(chartUpdateQueued(result, key));
+ return waitForAsyncData(result);
}
-
+ return queriesResponse;
+ })
+ .then(queriesResponse => {
queriesResponse.forEach(resultItem =>
dispatch(
logEvent(LOG_ACTIONS_LOAD_CHART, {
@@ -414,6 +412,10 @@ export function exploreJSON(
return dispatch(chartUpdateSucceeded(queriesResponse, key));
})
.catch(response => {
+ if (isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) {
+ return dispatch(chartUpdateFailed([response], key));
+ }
+
const appendErrorLog = (errorDetails, isCached) => {
dispatch(
logEvent(LOG_ACTIONS_LOAD_CHART, {
diff --git a/superset-frontend/src/chart/chartReducer.ts b/superset-frontend/src/chart/chartReducer.ts
index e517db28a..3f68c0434 100644
--- a/superset-frontend/src/chart/chartReducer.ts
+++ b/superset-frontend/src/chart/chartReducer.ts
@@ -81,14 +81,6 @@ export default function chartReducer(
chartUpdateEndTime: now(),
};
},
- [actions.CHART_UPDATE_QUEUED](state) {
- return {
- ...state,
- asyncJobId: action.asyncJobMeta.job_id,
- chartStatus: 'loading',
- chartUpdateEndTime: now(),
- };
- },
[actions.CHART_RENDERING_SUCCEEDED](state) {
return { ...state, chartStatus: 'rendered', chartUpdateEndTime: now() };
},
diff --git a/superset-frontend/src/dashboard/index.jsx b/superset-frontend/src/dashboard/index.jsx
index 78ba1d2aa..5d696bd3f 100644
--- a/superset-frontend/src/dashboard/index.jsx
+++ b/superset-frontend/src/dashboard/index.jsx
@@ -24,36 +24,17 @@ import { initFeatureFlags } from 'src/featureFlags';
import { initEnhancer } from '../reduxUtils';
import getInitialState from './reducers/getInitialState';
import rootReducer from './reducers/index';
-import initAsyncEvents from '../middleware/asyncEvent';
import logger from '../middleware/loggerMiddleware';
-import * as actions from '../chart/chartAction';
-
import App from './App';
const appContainer = document.getElementById('app');
const bootstrapData = JSON.parse(appContainer.getAttribute('data-bootstrap'));
initFeatureFlags(bootstrapData.common.feature_flags);
const initState = getInitialState(bootstrapData);
-
-const asyncEventMiddleware = initAsyncEvents({
- config: bootstrapData.common.conf,
- getPendingComponents: ({ charts }) =>
- Object.values(charts).filter(
- c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
- ),
- successAction: (componentId, componentData) =>
- actions.chartUpdateSucceeded(componentData, componentId),
- errorAction: (componentId, response) =>
- actions.chartUpdateFailed(response, componentId),
-});
-
const store = createStore(
rootReducer,
initState,
- compose(
- applyMiddleware(thunk, logger, asyncEventMiddleware),
- initEnhancer(false),
- ),
+ compose(applyMiddleware(thunk, logger), initEnhancer(false)),
);
ReactDOM.render(, document.getElementById('app'));
diff --git a/superset-frontend/src/explore/index.jsx b/superset-frontend/src/explore/index.jsx
index 7698dee01..cc99666e0 100644
--- a/superset-frontend/src/explore/index.jsx
+++ b/superset-frontend/src/explore/index.jsx
@@ -25,9 +25,6 @@ import { initFeatureFlags } from '../featureFlags';
import { initEnhancer } from '../reduxUtils';
import getInitialState from './reducers/getInitialState';
import rootReducer from './reducers/index';
-import initAsyncEvents from '../middleware/asyncEvent';
-import * as actions from '../chart/chartAction';
-
import App from './App';
const exploreViewContainer = document.getElementById('app');
@@ -36,26 +33,10 @@ const bootstrapData = JSON.parse(
);
initFeatureFlags(bootstrapData.common.feature_flags);
const initState = getInitialState(bootstrapData);
-
-const asyncEventMiddleware = initAsyncEvents({
- config: bootstrapData.common.conf,
- getPendingComponents: ({ charts }) =>
- Object.values(charts).filter(
- c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
- ),
- successAction: (componentId, componentData) =>
- actions.chartUpdateSucceeded(componentData, componentId),
- errorAction: (componentId, response) =>
- actions.chartUpdateFailed(response, componentId),
-});
-
const store = createStore(
rootReducer,
initState,
- compose(
- applyMiddleware(thunk, logger, asyncEventMiddleware),
- initEnhancer(false),
- ),
+ compose(applyMiddleware(thunk, logger), initEnhancer(false)),
);
ReactDOM.render(, document.getElementById('app'));
diff --git a/superset-frontend/src/explore/types.ts b/superset-frontend/src/explore/types.ts
index 7c05fa64a..32ad4ebe7 100644
--- a/superset-frontend/src/explore/types.ts
+++ b/superset-frontend/src/explore/types.ts
@@ -49,7 +49,6 @@ export interface ChartState {
queryController: AbortController | null;
queriesResponse: QueryData | null;
triggerQuery: boolean;
- asyncJobId?: string;
}
export type OptionSortType = Partial<
diff --git a/superset-frontend/src/middleware/asyncEvent.ts b/superset-frontend/src/middleware/asyncEvent.ts
index 1beb99711..aafd7fc6e 100644
--- a/superset-frontend/src/middleware/asyncEvent.ts
+++ b/superset-frontend/src/middleware/asyncEvent.ts
@@ -16,8 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Dispatch, Middleware, MiddlewareAPI } from 'redux';
-import { makeApi, SupersetClient } from '@superset-ui/core';
+import { ensureIsArray, makeApi, SupersetClient } from '@superset-ui/core';
import { SupersetError } from 'src/components/ErrorMessage/types';
import { FeatureFlag, isFeatureEnabled } from '../featureFlags';
import {
@@ -25,178 +24,240 @@ import {
parseErrorJson,
} from '../utils/getClientErrorObject';
-export type AsyncEvent = {
- id: string;
+type AsyncEvent = {
+ id?: string | null;
channel_id: string;
job_id: string;
- user_id: string;
+ user_id?: string;
status: string;
- errors: SupersetError[];
- result_url: string;
-};
-
-type AsyncEventOptions = {
- config: {
- GLOBAL_ASYNC_QUERIES_TRANSPORT: string;
- GLOBAL_ASYNC_QUERIES_POLLING_DELAY: number;
- };
- getPendingComponents: (state: any) => any[];
- successAction: (componentId: number, componentData: any) => { type: string };
- errorAction: (componentId: number, response: any) => { type: string };
- processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests
+ errors?: SupersetError[];
+ result_url: string | null;
};
type CachedDataResponse = {
- componentId: number;
status: string;
data: any;
};
+type AppConfig = Record;
+type ListenerFn = (asyncEvent: AsyncEvent) => Promise;
-const initAsyncEvents = (options: AsyncEventOptions) => {
- // TODO: implement websocket support
- const TRANSPORT_POLLING = 'polling';
- const {
- config,
- getPendingComponents,
- successAction,
- errorAction,
- processEventsCallback,
- } = options;
- const transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
- const polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
+const TRANSPORT_POLLING = 'polling';
+const TRANSPORT_WS = 'ws';
+const JOB_STATUS = {
+ PENDING: 'pending',
+ RUNNING: 'running',
+ ERROR: 'error',
+ DONE: 'done',
+};
+const LOCALSTORAGE_KEY = 'last_async_event_id';
+const POLLING_URL = '/api/v1/async_event/';
+const MAX_RETRIES = 6;
+const RETRY_DELAY = 100;
- const middleware: Middleware = (store: MiddlewareAPI) => (next: Dispatch) => {
- const JOB_STATUS = {
- PENDING: 'pending',
- RUNNING: 'running',
- ERROR: 'error',
- DONE: 'done',
- };
- const LOCALSTORAGE_KEY = 'last_async_event_id';
- const POLLING_URL = '/api/v1/async_event/';
- let lastReceivedEventId: string | null;
+let config: AppConfig;
+let transport: string;
+let polling_delay: number;
+let listenersByJobId: Record;
+let retriesByJobId: Record;
+let lastReceivedEventId: string | null | undefined;
- try {
- lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
- } catch (err) {
- console.warn('Failed to fetch last event Id from localStorage');
+export const init = (appConfig?: AppConfig) => {
+ if (!isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) return;
+
+ listenersByJobId = {};
+ retriesByJobId = {};
+ lastReceivedEventId = null;
+
+ if (appConfig) {
+ config = appConfig;
+ } else {
+ // load bootstrap data from DOM
+ const appContainer = document.getElementById('app');
+ if (appContainer) {
+ const bootstrapData = JSON.parse(
+ appContainer?.getAttribute('data-bootstrap') || '{}',
+ );
+ config = bootstrapData?.common?.conf;
+ } else {
+ config = {};
+ console.warn('asyncEvent: app config data not found');
}
+ }
+ transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
+ polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
- const fetchEvents = makeApi<
- { last_id?: string | null },
- { result: AsyncEvent[] }
- >({
- method: 'GET',
- endpoint: POLLING_URL,
- });
+ try {
+ lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
+ } catch (err) {
+ console.warn('Failed to fetch last event Id from localStorage');
+ }
- const fetchCachedData = async (
- asyncEvent: AsyncEvent,
- componentId: number,
- ): Promise => {
- let status = 'success';
- let data;
- try {
- const { json } = await SupersetClient.get({
- endpoint: asyncEvent.result_url,
- });
- data = 'result' in json ? json.result : json;
- } catch (response) {
- status = 'error';
- data = await getClientErrorObject(response);
- }
-
- return { componentId, status, data };
- };
-
- const setLastId = (asyncEvent: AsyncEvent) => {
- lastReceivedEventId = asyncEvent.id;
- try {
- localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
- } catch (err) {
- console.warn('Error saving event Id to localStorage', err);
- }
- };
-
- const processEvents = async () => {
- let queuedComponents = getPendingComponents(store.getState());
- const eventArgs = lastReceivedEventId
- ? { last_id: lastReceivedEventId }
- : {};
- const events: AsyncEvent[] = [];
- if (queuedComponents && queuedComponents.length) {
- try {
- const { result: events } = await fetchEvents(eventArgs);
- // refetch queuedComponents due to race condition where results are available
- // before component state is updated with asyncJobId
- queuedComponents = getPendingComponents(store.getState());
- if (events && events.length) {
- const componentsByJobId = queuedComponents.reduce((acc, item) => {
- acc[item.asyncJobId] = item;
- return acc;
- }, {});
- const fetchDataEvents: Promise[] = [];
- events.forEach((asyncEvent: AsyncEvent) => {
- const component = componentsByJobId[asyncEvent.job_id];
- if (!component) {
- console.warn(
- 'Component not found for job_id',
- asyncEvent.job_id,
- );
- return setLastId(asyncEvent);
- }
- const componentId = component.id;
- switch (asyncEvent.status) {
- case JOB_STATUS.DONE:
- fetchDataEvents.push(
- fetchCachedData(asyncEvent, componentId),
- );
- break;
- case JOB_STATUS.ERROR:
- store.dispatch(
- errorAction(componentId, [parseErrorJson(asyncEvent)]),
- );
- break;
- default:
- console.warn('Received event with status', asyncEvent.status);
- }
-
- return setLastId(asyncEvent);
- });
-
- const fetchResults = await Promise.all(fetchDataEvents);
- fetchResults.forEach(result => {
- const data = Array.isArray(result.data)
- ? result.data
- : [result.data];
- if (result.status === 'success') {
- store.dispatch(successAction(result.componentId, data));
- } else {
- store.dispatch(errorAction(result.componentId, data));
- }
- });
- }
- } catch (err) {
- console.warn(err);
- }
- }
-
- if (processEventsCallback) processEventsCallback(events);
-
- return setTimeout(processEvents, polling_delay);
- };
-
- if (
- isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES) &&
- transport === TRANSPORT_POLLING
- ) {
- processEvents();
- }
-
- return action => next(action);
- };
-
- return middleware;
+ if (transport === TRANSPORT_POLLING) {
+ loadEventsFromApi();
+ }
+ if (transport === TRANSPORT_WS) {
+ wsConnect();
+ }
};
-export default initAsyncEvents;
+const addListener = (id: string, fn: any) => {
+ listenersByJobId[id] = fn;
+};
+
+const removeListener = (id: string) => {
+ if (!listenersByJobId[id]) return;
+ delete listenersByJobId[id];
+};
+
+export const waitForAsyncData = async (asyncResponse: AsyncEvent) =>
+ new Promise((resolve, reject) => {
+ const jobId = asyncResponse.job_id;
+ const listener = async (asyncEvent: AsyncEvent) => {
+ switch (asyncEvent.status) {
+ case JOB_STATUS.DONE: {
+ let { data, status } = await fetchCachedData(asyncEvent); // eslint-disable-line prefer-const
+ data = ensureIsArray(data);
+ if (status === 'success') {
+ resolve(data);
+ } else {
+ reject(data);
+ }
+ break;
+ }
+ case JOB_STATUS.ERROR: {
+ const err = parseErrorJson(asyncEvent);
+ reject(err);
+ break;
+ }
+ default: {
+ console.warn('received event with status', asyncEvent.status);
+ }
+ }
+ removeListener(jobId);
+ };
+ addListener(jobId, listener);
+ });
+
+const fetchEvents = makeApi<
+ { last_id?: string | null },
+ { result: AsyncEvent[] }
+>({
+ method: 'GET',
+ endpoint: POLLING_URL,
+});
+
+const fetchCachedData = async (
+ asyncEvent: AsyncEvent,
+): Promise => {
+ let status = 'success';
+ let data;
+ try {
+ const { json } = await SupersetClient.get({
+ endpoint: String(asyncEvent.result_url),
+ });
+ data = 'result' in json ? json.result : json;
+ } catch (response) {
+ status = 'error';
+ data = await getClientErrorObject(response);
+ }
+
+ return { status, data };
+};
+
+const setLastId = (asyncEvent: AsyncEvent) => {
+ lastReceivedEventId = asyncEvent.id;
+ try {
+ localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
+ } catch (err) {
+ console.warn('Error saving event Id to localStorage', err);
+ }
+};
+
+const loadEventsFromApi = async () => {
+ const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {};
+ if (Object.keys(listenersByJobId).length) {
+ try {
+ const { result: events } = await fetchEvents(eventArgs);
+ if (events && events.length) await processEvents(events);
+ } catch (err) {
+ console.warn(err);
+ }
+ }
+
+ if (transport === TRANSPORT_POLLING) {
+ setTimeout(loadEventsFromApi, polling_delay);
+ }
+};
+
+export const processEvents = async (events: AsyncEvent[]) => {
+ events.forEach((asyncEvent: AsyncEvent) => {
+ const jobId = asyncEvent.job_id;
+ const listener = listenersByJobId[jobId];
+ if (listener) {
+ listener(asyncEvent);
+ delete retriesByJobId[jobId];
+ } else {
+ // handle race condition where event is received
+ // before listener is registered
+ if (!retriesByJobId[jobId]) retriesByJobId[jobId] = 0;
+ retriesByJobId[jobId] += 1;
+
+ if (retriesByJobId[jobId] <= MAX_RETRIES) {
+ setTimeout(() => {
+ processEvents([asyncEvent]);
+ }, RETRY_DELAY * retriesByJobId[jobId]);
+ } else {
+ delete retriesByJobId[jobId];
+ console.warn('listener not found for job_id', asyncEvent.job_id);
+ }
+ }
+ setLastId(asyncEvent);
+ });
+};
+
+const wsConnectMaxRetries = 6;
+const wsConnectErrorDelay = 2500;
+let wsConnectRetries = 0;
+let wsConnectTimeout: any;
+let ws: WebSocket;
+
+const wsConnect = (): void => {
+ let url = config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL;
+ if (lastReceivedEventId) url += `?last_id=${lastReceivedEventId}`;
+ ws = new WebSocket(url);
+
+ ws.addEventListener('open', event => {
+ console.log('WebSocket connected');
+ clearTimeout(wsConnectTimeout);
+ wsConnectRetries = 0;
+ });
+
+ ws.addEventListener('close', event => {
+ wsConnectTimeout = setTimeout(() => {
+ wsConnectRetries += 1;
+ if (wsConnectRetries <= wsConnectMaxRetries) {
+ wsConnect();
+ } else {
+ console.warn('WebSocket not available, falling back to async polling');
+ loadEventsFromApi();
+ }
+ }, wsConnectErrorDelay);
+ });
+
+ ws.addEventListener('error', event => {
+ // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
+ if (ws.readyState < 2) ws.close();
+ });
+
+ ws.addEventListener('message', async event => {
+ let events: AsyncEvent[] = [];
+ try {
+ events = [JSON.parse(event.data)];
+ await processEvents(events);
+ } catch (err) {
+ console.warn(err);
+ }
+ });
+};
+
+init();