fix(celery): Reset DB connection pools for forked worker processes (#13350)
* Reset sqlalchemy connection pool on celery process fork * Fix race condition with async chart loading state * pylint: ignore * prettier
This commit is contained in:
parent
1697e1e3b1
commit
b4ca39ceeb
|
|
@ -38,7 +38,9 @@ const initState = getInitialState(bootstrapData);
|
|||
const asyncEventMiddleware = initAsyncEvents({
|
||||
config: bootstrapData.common.conf,
|
||||
getPendingComponents: ({ charts }) =>
|
||||
Object.values(charts).filter(c => c.chartStatus === 'loading'),
|
||||
Object.values(charts).filter(
|
||||
c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
|
||||
),
|
||||
successAction: (componentId, componentData) =>
|
||||
actions.chartUpdateSucceeded(componentData, componentId),
|
||||
errorAction: (componentId, response) =>
|
||||
|
|
|
|||
|
|
@ -40,7 +40,9 @@ const initState = getInitialState(bootstrapData);
|
|||
const asyncEventMiddleware = initAsyncEvents({
|
||||
config: bootstrapData.common.conf,
|
||||
getPendingComponents: ({ charts }) =>
|
||||
Object.values(charts).filter(c => c.chartStatus === 'loading'),
|
||||
Object.values(charts).filter(
|
||||
c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
|
||||
),
|
||||
successAction: (componentId, componentData) =>
|
||||
actions.chartUpdateSucceeded(componentData, componentId),
|
||||
errorAction: (componentId, response) =>
|
||||
|
|
|
|||
|
|
@ -119,8 +119,7 @@ const initAsyncEvents = (options: AsyncEventOptions) => {
|
|||
};
|
||||
|
||||
const processEvents = async () => {
|
||||
const state = store.getState();
|
||||
const queuedComponents = getPendingComponents(state);
|
||||
let queuedComponents = getPendingComponents(store.getState());
|
||||
const eventArgs = lastReceivedEventId
|
||||
? { last_id: lastReceivedEventId }
|
||||
: {};
|
||||
|
|
@ -128,6 +127,9 @@ const initAsyncEvents = (options: AsyncEventOptions) => {
|
|||
if (queuedComponents && queuedComponents.length) {
|
||||
try {
|
||||
const { result: events } = await fetchEvents(eventArgs);
|
||||
// refetch queuedComponents due to race condition where results are available
|
||||
// before component state is updated with asyncJobId
|
||||
queuedComponents = getPendingComponents(store.getState());
|
||||
if (events && events.length) {
|
||||
const componentsByJobId = queuedComponents.reduce((acc, item) => {
|
||||
acc[item.asyncJobId] = item;
|
||||
|
|
|
|||
|
|
@ -19,13 +19,16 @@
|
|||
This is the main entrypoint used by Celery workers. As such,
|
||||
it needs to call create_app() in order to initialize things properly
|
||||
"""
|
||||
from typing import Any
|
||||
|
||||
from celery.signals import worker_process_init
|
||||
|
||||
# Superset framework imports
|
||||
from superset import create_app
|
||||
from superset.extensions import celery_app
|
||||
from superset.extensions import celery_app, db
|
||||
|
||||
# Init the Flask app / configure everything
|
||||
create_app()
|
||||
flask_app = create_app()
|
||||
|
||||
# Need to import late, as the celery_app will have been setup by "create_app()"
|
||||
# pylint: disable=wrong-import-position, unused-import
|
||||
|
|
@ -33,3 +36,10 @@ from . import cache, schedules, scheduler # isort:skip
|
|||
|
||||
# Export the celery app globally for Celery (as run on the cmd line) to find
|
||||
app = celery_app
|
||||
|
||||
|
||||
@worker_process_init.connect
|
||||
def reset_db_connection_pool(**kwargs: Any) -> None: # pylint: disable=unused-argument
|
||||
with flask_app.app_context():
|
||||
# https://docs.sqlalchemy.org/en/14/core/connections.html#engine-disposal
|
||||
db.engine.dispose()
|
||||
|
|
|
|||
Loading…
Reference in New Issue