Fix for BigQuery connection checks and CSV uploads (#8511)
* Fix for BigQuery connection checks and CSV uploads * Don't assume encrypted_extra will be populated * Fix undefined method error * Refactor to avoid circular import strangeness
This commit is contained in:
parent
3b97ae3b9d
commit
d70e0fc359
|
|
@ -27,7 +27,6 @@ import sqlparse
|
|||
from flask import g
|
||||
from flask_babel import lazy_gettext as _
|
||||
from sqlalchemy import column, DateTime, select
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.engine.base import Engine
|
||||
from sqlalchemy.engine.interfaces import Compiled, Dialect
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
|
|
@ -52,9 +51,6 @@ class TimeGrain(NamedTuple): # pylint: disable=too-few-public-methods
|
|||
duration: Optional[str]
|
||||
|
||||
|
||||
config = app.config
|
||||
|
||||
|
||||
QueryStatus = utils.QueryStatus
|
||||
config = app.config
|
||||
|
||||
|
|
@ -388,12 +384,13 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
df.to_sql(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def create_table_from_csv(cls, form) -> None:
|
||||
def create_table_from_csv(cls, form, database) -> None:
|
||||
"""
|
||||
Create table from contents of a csv. Note: this method does not create
|
||||
metadata for the table.
|
||||
|
||||
:param form: Parameters defining how to process data
|
||||
:param database: Database model object for the target database
|
||||
"""
|
||||
|
||||
def _allowed_file(filename: str) -> bool:
|
||||
|
|
@ -422,10 +419,12 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
}
|
||||
df = cls.csv_to_df(**csv_to_df_kwargs)
|
||||
|
||||
engine = cls.get_engine(database)
|
||||
|
||||
df_to_sql_kwargs = {
|
||||
"df": df,
|
||||
"name": form.name.data,
|
||||
"con": create_engine(form.con.data.sqlalchemy_uri_decrypted, echo=False),
|
||||
"con": engine,
|
||||
"schema": form.schema.data,
|
||||
"if_exists": form.if_exists.data,
|
||||
"index": form.index.data,
|
||||
|
|
|
|||
|
|
@ -182,6 +182,7 @@ class BigQueryEngineSpec(BaseEngineSpec):
|
|||
"""
|
||||
try:
|
||||
import pandas_gbq
|
||||
from google.oauth2 import service_account
|
||||
except ImportError:
|
||||
raise Exception(
|
||||
"Could not import the library `pandas_gbq`, which is "
|
||||
|
|
@ -191,10 +192,17 @@ class BigQueryEngineSpec(BaseEngineSpec):
|
|||
|
||||
if not ("name" in kwargs and "schema" in kwargs):
|
||||
raise Exception("name and schema need to be defined in kwargs")
|
||||
|
||||
gbq_kwargs = {}
|
||||
gbq_kwargs["project_id"] = kwargs["con"].engine.url.host
|
||||
gbq_kwargs["destination_table"] = f"{kwargs.pop('schema')}.{kwargs.pop('name')}"
|
||||
|
||||
# add credentials if they are set on the SQLAlchemy Dialect:
|
||||
creds = kwargs["con"].dialect.credentials_info
|
||||
if creds:
|
||||
credentials = service_account.Credentials.from_service_account_info(creds)
|
||||
gbq_kwargs["credentials"] = credentials
|
||||
|
||||
# Only pass through supported kwargs
|
||||
supported_kwarg_keys = {"if_exists"}
|
||||
for key in supported_kwarg_keys:
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ from typing import Any, Dict, List, Optional, Tuple
|
|||
from urllib import parse
|
||||
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.engine.base import Engine
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
from sqlalchemy.engine.url import make_url
|
||||
|
|
@ -98,7 +97,9 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
return []
|
||||
|
||||
@classmethod
|
||||
def create_table_from_csv(cls, form) -> None: # pylint: disable=too-many-locals
|
||||
def create_table_from_csv( # pylint: disable=too-many-locals
|
||||
cls, form, database
|
||||
) -> None:
|
||||
"""Uploads a csv file and creates a superset datasource in Hive."""
|
||||
|
||||
def convert_to_hive_type(col_type):
|
||||
|
|
@ -174,8 +175,7 @@ class HiveEngineSpec(PrestoEngineSpec):
|
|||
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
|
||||
TEXTFILE LOCATION '{location}'
|
||||
tblproperties ('skip.header.line.count'='1')"""
|
||||
logging.info(form.con.data)
|
||||
engine = create_engine(form.con.data.sqlalchemy_uri_decrypted)
|
||||
engine = cls.get_engine(database)
|
||||
engine.execute(sql)
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@
|
|||
name: $('#database_name').val(),
|
||||
impersonate_user: $('#impersonate_user').is(':checked'),
|
||||
extras: JSON.parse($("#extra").val()),
|
||||
encrypted_extra: JSON.parse($("#encrypted_extra").val()),
|
||||
})
|
||||
} catch(parse_error){
|
||||
alert("Malformed JSON in the extras field: " + parse_error);
|
||||
|
|
|
|||
|
|
@ -1745,6 +1745,7 @@ class Superset(BaseSupersetView):
|
|||
# extras is sent as json, but required to be a string in the Database model
|
||||
extra=json.dumps(request.json.get("extras", {})),
|
||||
impersonate_user=request.json.get("impersonate_user"),
|
||||
encrypted_extra=json.dumps(request.json.get("encrypted_extra", {})),
|
||||
)
|
||||
database.set_sqlalchemy_uri(uri)
|
||||
|
||||
|
|
|
|||
|
|
@ -120,8 +120,12 @@ class CsvToDatabaseView(SimpleFormView):
|
|||
utils.ensure_path_exists(config["UPLOAD_FOLDER"])
|
||||
csv_file.save(path)
|
||||
table_name = form.name.data
|
||||
database = form.data.get("con")
|
||||
database.db_engine_spec.create_table_from_csv(form)
|
||||
|
||||
con = form.data.get("con")
|
||||
database = (
|
||||
db.session.query(models.Database).filter_by(id=con.data.get("id")).one()
|
||||
)
|
||||
database.db_engine_spec.create_table_from_csv(form, database)
|
||||
|
||||
table = (
|
||||
db.session.query(SqlaTable)
|
||||
|
|
|
|||
Loading…
Reference in New Issue