fix: remove explicit cursor closing for BQ (#12836)
This commit is contained in:
parent
61e56b5027
commit
dcef5e1381
|
|
@ -647,14 +647,14 @@ class SqlaTable( # pylint: disable=too-many-public-methods,too-many-instance-at
|
|||
# TODO(villebro): refactor to use same code that's used by
|
||||
# sql_lab.py:execute_sql_statements
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
query = self.database.apply_limit_to_sql(statements[0])
|
||||
db_engine_spec.execute(cursor, query)
|
||||
result = db_engine_spec.fetch_data(cursor, limit=1)
|
||||
result_set = SupersetResultSet(
|
||||
result, cursor.description, db_engine_spec
|
||||
)
|
||||
cols = result_set.columns
|
||||
cursor = conn.cursor()
|
||||
query = self.database.apply_limit_to_sql(statements[0])
|
||||
db_engine_spec.execute(cursor, query)
|
||||
result = db_engine_spec.fetch_data(cursor, limit=1)
|
||||
result_set = SupersetResultSet(
|
||||
result, cursor.description, db_engine_spec
|
||||
)
|
||||
cols = result_set.columns
|
||||
else:
|
||||
db_dialect = self.database.get_dialect()
|
||||
cols = self.database.get_columns(
|
||||
|
|
|
|||
|
|
@ -854,14 +854,12 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
engine = cls.get_engine(database, schema=schema, source=source)
|
||||
costs = []
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
for statement in statements:
|
||||
processed_statement = cls.process_statement(
|
||||
statement, database, user_name
|
||||
)
|
||||
costs.append(
|
||||
cls.estimate_statement_cost(processed_statement, cursor)
|
||||
)
|
||||
cursor = conn.cursor()
|
||||
for statement in statements:
|
||||
processed_statement = cls.process_statement(
|
||||
statement, database, user_name
|
||||
)
|
||||
costs.append(cls.estimate_statement_cost(processed_statement, cursor))
|
||||
return costs
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -173,9 +173,9 @@ class PrestoEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-metho
|
|||
|
||||
engine = cls.get_engine(database, schema=schema)
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
cursor.execute(sql, params)
|
||||
results = cursor.fetchall()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(sql, params)
|
||||
results = cursor.fetchall()
|
||||
|
||||
return [row[0] for row in results]
|
||||
|
||||
|
|
@ -758,18 +758,18 @@ class PrestoEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-metho
|
|||
|
||||
engine = cls.get_engine(database, schema)
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
sql = f"SHOW CREATE VIEW {schema}.{table}"
|
||||
try:
|
||||
cls.execute(cursor, sql)
|
||||
polled = cursor.poll()
|
||||
cursor = conn.cursor()
|
||||
sql = f"SHOW CREATE VIEW {schema}.{table}"
|
||||
try:
|
||||
cls.execute(cursor, sql)
|
||||
polled = cursor.poll()
|
||||
|
||||
while polled:
|
||||
time.sleep(0.2)
|
||||
polled = cursor.poll()
|
||||
except DatabaseError: # not a VIEW
|
||||
return None
|
||||
rows = cls.fetch_data(cursor, 1)
|
||||
while polled:
|
||||
time.sleep(0.2)
|
||||
polled = cursor.poll()
|
||||
except DatabaseError: # not a VIEW
|
||||
return None
|
||||
rows = cls.fetch_data(cursor, 1)
|
||||
return rows[0][0]
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -384,28 +384,28 @@ class Database(
|
|||
log_query(engine.url, sql, schema, username, __name__, security_manager)
|
||||
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
for sql_ in sqls[:-1]:
|
||||
_log_query(sql_)
|
||||
self.db_engine_spec.execute(cursor, sql_)
|
||||
cursor.fetchall()
|
||||
cursor = conn.cursor()
|
||||
for sql_ in sqls[:-1]:
|
||||
_log_query(sql_)
|
||||
self.db_engine_spec.execute(cursor, sql_)
|
||||
cursor.fetchall()
|
||||
|
||||
_log_query(sqls[-1])
|
||||
self.db_engine_spec.execute(cursor, sqls[-1])
|
||||
_log_query(sqls[-1])
|
||||
self.db_engine_spec.execute(cursor, sqls[-1])
|
||||
|
||||
data = self.db_engine_spec.fetch_data(cursor)
|
||||
result_set = SupersetResultSet(
|
||||
data, cursor.description, self.db_engine_spec
|
||||
)
|
||||
df = result_set.to_pandas_df()
|
||||
if mutator:
|
||||
mutator(df)
|
||||
data = self.db_engine_spec.fetch_data(cursor)
|
||||
result_set = SupersetResultSet(
|
||||
data, cursor.description, self.db_engine_spec
|
||||
)
|
||||
df = result_set.to_pandas_df()
|
||||
if mutator:
|
||||
mutator(df)
|
||||
|
||||
for col, coltype in df.dtypes.to_dict().items():
|
||||
if coltype == numpy.object_ and needs_conversion(df[col]):
|
||||
df[col] = df[col].apply(utils.json_dumps_w_dates)
|
||||
for col, coltype in df.dtypes.to_dict().items():
|
||||
if coltype == numpy.object_ and needs_conversion(df[col]):
|
||||
df[col] = df[col].apply(utils.json_dumps_w_dates)
|
||||
|
||||
return df
|
||||
return df
|
||||
|
||||
def compile_sqla_query(self, qry: Select, schema: Optional[str] = None) -> str:
|
||||
engine = self.get_sqla_engine(schema=schema)
|
||||
|
|
|
|||
|
|
@ -166,13 +166,13 @@ class PrestoDBSQLValidator(BaseSQLValidator):
|
|||
# execution of all statements (if many)
|
||||
annotations: List[SQLValidationAnnotation] = []
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
for statement in parsed_query.get_statements():
|
||||
annotation = cls.validate_statement(
|
||||
statement, database, cursor, user_name
|
||||
)
|
||||
if annotation:
|
||||
annotations.append(annotation)
|
||||
cursor = conn.cursor()
|
||||
for statement in parsed_query.get_statements():
|
||||
annotation = cls.validate_statement(
|
||||
statement, database, cursor, user_name
|
||||
)
|
||||
if annotation:
|
||||
annotations.append(annotation)
|
||||
logger.debug("Validation found %i error(s)", len(annotations))
|
||||
|
||||
return annotations
|
||||
|
|
|
|||
Loading…
Reference in New Issue