fix(trino): db session error in handle cursor (#31265)

This commit is contained in:
JUST.in DO IT 2024-12-03 11:57:37 -08:00 committed by GitHub
parent 56b973f3cc
commit 1e0c04fc15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 7 additions and 3 deletions

View File

@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=consider-using-transaction
from __future__ import annotations
import contextlib
@ -216,6 +215,8 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
if tracking_url := cls.get_tracking_url(cursor):
query.tracking_url = tracking_url
db.session.commit() # pylint: disable=consider-using-transaction
# if query cancelation was requested prior to the handle_cursor call, but
# the query was still executed, trigger the actual query cancelation now
if query.extra.get(QUERY_EARLY_CANCEL_KEY):
@ -244,6 +245,7 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
# Fetch the query ID beforehand, since it might fail inside the thread due to
# how the SQLAlchemy session is handled.
query_id = query.id
query_database = query.database
execute_result: dict[str, Any] = {}
execute_event = threading.Event()
@ -266,7 +268,7 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
with app.app_context():
for key, value in g_copy.__dict__.items():
setattr(g, key, value)
cls.execute(cursor, sql, query.database)
cls.execute(cursor, sql, query_database)
except Exception as ex: # pylint: disable=broad-except
results["error"] = ex
finally:
@ -283,6 +285,8 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
)
execute_thread.start()
# Wait for the thread to start before continuing
time.sleep(0.1)
# Wait for a query ID to be available before handling the cursor, as
# it's required by that method; it may never become available on error.
while not cursor.query_id and not execute_event.is_set():
@ -304,7 +308,7 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
def prepare_cancel_query(cls, query: Query) -> None:
if QUERY_CANCEL_KEY not in query.extra:
query.set_extra_json_key(QUERY_EARLY_CANCEL_KEY, True)
db.session.commit()
db.session.commit() # pylint: disable=consider-using-transaction
@classmethod
def cancel_query(cls, cursor: Cursor, query: Query, cancel_query_id: str) -> bool: