feat: Add parquet upload (#14449)

* allow csv upload to accept parquet file

* fix mypy

* fix if statement

* add test for specificying columns in CSV upload

* clean up test

* change order in test

* fix failures

* upload parquet to seperate table in test

* fix error message

* fix mypy again

* rename other extensions to columnar

* add new form for columnar upload

* add support for zip files

* undo csv form changes except usecols

* add more tests for zip

* isort & black

* pylint

* fix trailing space

* address more review comments

* pylint

* black

* resolve remaining issues
This commit is contained in:
Shiva Raisinghani 2021-08-31 00:20:25 -07:00 committed by GitHub
parent ad8336a5b4
commit d25b0967a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 493 additions and 10 deletions

View File

@ -556,7 +556,8 @@ SUPERSET_WEBSERVER_DOMAINS = None
# Allowed format types for upload on Database view
EXCEL_EXTENSIONS = {"xlsx", "xls"}
CSV_EXTENSIONS = {"csv", "tsv", "txt"}
ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS}
COLUMNAR_EXTENSIONS = {"parquet", "zip"}
ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS, *COLUMNAR_EXTENSIONS}
# CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv
# method.

View File

@ -167,6 +167,7 @@ class SupersetAppInitializer:
DashboardModelViewAsync,
)
from superset.views.database.views import (
ColumnarToDatabaseView,
CsvToDatabaseView,
DatabaseView,
ExcelToDatabaseView,
@ -281,6 +282,7 @@ class SupersetAppInitializer:
appbuilder.add_view_no_menu(CssTemplateAsyncModelView)
appbuilder.add_view_no_menu(CsvToDatabaseView)
appbuilder.add_view_no_menu(ExcelToDatabaseView)
appbuilder.add_view_no_menu(ColumnarToDatabaseView)
appbuilder.add_view_no_menu(Dashboard)
appbuilder.add_view_no_menu(DashboardModelViewAsync)
appbuilder.add_view_no_menu(Datasource)
@ -371,7 +373,20 @@ class SupersetAppInitializer:
)
),
)
appbuilder.add_link(
"Upload a Columnar file",
label=__("Upload a Columnar file"),
href="/columnartodatabaseview/form",
icon="fa-upload",
category="Data",
category_label=__("Data"),
category_icon="fa-wrench",
cond=lambda: bool(
self.config["COLUMNAR_EXTENSIONS"].intersection(
self.config["ALLOWED_EXTENSIONS"]
)
),
)
try:
import xlrd # pylint: disable=unused-import

View File

@ -0,0 +1,64 @@
{#
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
#}
{% extends 'appbuilder/general/model/edit.html' %}
{% block tail_js %}
{{ super() }}
<script>
var db = $("#con");
var schema = $("#schema");
// this element is a text input
// copy it here so it can be reused later
var any_schema_is_allowed = schema.clone();
update_schemas_allowed_for_columnar_upload(db.val());
db.change(function(){
update_schemas_allowed_for_columnar_upload(db.val());
});
function update_schemas_allowed_for_columnar_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schemas_access_for_file_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"
}).done(function(data) {
change_schema_field_in_formview(data)
}).fail(function(error) {
var errorMsg = error.responseJSON.error;
alert("ERROR: " + errorMsg);
});
}
function change_schema_field_in_formview(schemas_allowed){
if (schemas_allowed && schemas_allowed.length > 0) {
var dropdown_schema_lists = '<select id="schema" name="schema" required>';
schemas_allowed.forEach(function(schema_allowed) {
dropdown_schema_lists += ('<option value="' + schema_allowed + '">' + schema_allowed + '</option>');
});
dropdown_schema_lists += '</select>';
$("#schema").replaceWith(dropdown_schema_lists);
} else {
$("#schema").replaceWith(any_schema_is_allowed)
}
}
</script>
{% endblock %}

View File

@ -36,7 +36,7 @@
function update_schemas_allowed_for_csv_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schemas_access_for_csv_upload",
url: "/superset/schemas_access_for_file_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"

View File

@ -36,7 +36,7 @@
function update_schemas_allowed_for_excel_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schemas_access_for_excel_upload",
url: "/superset/schemas_access_for_file_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"

View File

@ -3072,11 +3072,11 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
@api
@has_access_api
@event_logger.log_this
@expose("/schemas_access_for_csv_upload")
def schemas_access_for_csv_upload(self) -> FlaskResponse:
@expose("/schemas_access_for_file_upload")
def schemas_access_for_file_upload(self) -> FlaskResponse:
"""
This method exposes an API endpoint to
get the schema access control settings for csv upload in this database
get the schema access control settings for file upload in this database
"""
if not request.args.get("db_id"):
return json_error_response("No database is allowed for your csv upload")

View File

@ -21,7 +21,13 @@ from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_appbuilder.forms import DynamicForm
from flask_babel import lazy_gettext as _
from flask_wtf.file import FileAllowed, FileField, FileRequired
from wtforms import BooleanField, IntegerField, SelectField, StringField
from wtforms import (
BooleanField,
IntegerField,
MultipleFileField,
SelectField,
StringField,
)
from wtforms.ext.sqlalchemy.fields import QuerySelectField
from wtforms.validators import DataRequired, Length, NumberRange, Optional
@ -163,6 +169,15 @@ class CsvToDatabaseForm(DynamicForm):
_("Mangle Duplicate Columns"),
description=_('Specify duplicate columns as "X.0, X.1".'),
)
usecols = JsonListField(
_("Use Columns"),
default=None,
description=_(
"Json list of the column names that should be read. "
"If not None, only these columns will be read from the file."
),
validators=[Optional()],
)
skipinitialspace = BooleanField(
_("Skip Initial Space"), description=_("Skip spaces after delimiter.")
)
@ -402,3 +417,130 @@ class ExcelToDatabaseForm(DynamicForm):
'Use [""] for empty string.'
),
)
class ColumnarToDatabaseForm(DynamicForm):
# pylint: disable=E0211
def columnar_allowed_dbs() -> List[Database]: # type: ignore
# TODO: change allow_csv_upload to allow_file_upload
columnar_enabled_dbs = (
db.session.query(Database).filter_by(allow_csv_upload=True).all()
)
return [
columnar_enabled_db
for columnar_enabled_db in columnar_enabled_dbs
if ColumnarToDatabaseForm.at_least_one_schema_is_allowed(
columnar_enabled_db
)
]
@staticmethod
def at_least_one_schema_is_allowed(database: Database) -> bool:
"""
If the user has access to the database or all datasource
1. if schemas_allowed_for_csv_upload is empty
a) if database does not support schema
user is able to upload columnar without specifying schema name
b) if database supports schema
user is able to upload columnar to any schema
2. if schemas_allowed_for_csv_upload is not empty
a) if database does not support schema
This situation is impossible and upload will fail
b) if database supports schema
user is able to upload to schema in schemas_allowed_for_csv_upload
elif the user does not access to the database or all datasource
1. if schemas_allowed_for_csv_upload is empty
a) if database does not support schema
user is unable to upload columnar
b) if database supports schema
user is unable to upload columnar
2. if schemas_allowed_for_csv_upload is not empty
a) if database does not support schema
This situation is impossible and user is unable to upload columnar
b) if database supports schema
user is able to upload to schema in schemas_allowed_for_csv_upload
"""
if security_manager.can_access_database(database):
return True
schemas = database.get_schema_access_for_csv_upload()
if schemas and security_manager.schemas_accessible_by_user(
database, schemas, False
):
return True
return False
name = StringField(
_("Table Name"),
description=_("Name of table to be created from columnar data."),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
)
columnar_file = MultipleFileField(
_("Columnar File"),
description=_("Select a Columnar file to be uploaded to a database."),
validators=[
DataRequired(),
FileAllowed(
config["ALLOWED_EXTENSIONS"].intersection(
config["COLUMNAR_EXTENSIONS"]
),
_(
"Only the following file extensions are allowed: "
"%(allowed_extensions)s",
allowed_extensions=", ".join(
config["ALLOWED_EXTENSIONS"].intersection(
config["COLUMNAR_EXTENSIONS"]
)
),
),
),
],
)
con = QuerySelectField(
_("Database"),
query_factory=columnar_allowed_dbs,
get_pk=lambda a: a.id,
get_label=lambda a: a.database_name,
)
schema = StringField(
_("Schema"),
description=_("Specify a schema (if database flavor supports this)."),
validators=[Optional()],
widget=BS3TextFieldWidget(),
)
if_exists = SelectField(
_("Table Exists"),
description=_(
"If table exists do one of the following: "
"Fail (do nothing), Replace (drop and recreate table) "
"or Append (insert data)."
),
choices=[
("fail", _("Fail")),
("replace", _("Replace")),
("append", _("Append")),
],
validators=[DataRequired()],
)
usecols = JsonListField(
_("Use Columns"),
default=None,
description=_(
"Json list of the column names that should be read. "
"If not None, only these columns will be read from the file."
),
validators=[Optional()],
)
index = BooleanField(
_("Dataframe Index"), description=_("Write dataframe index as a column.")
)
index_label = StringField(
_("Column Label(s)"),
description=_(
"Column label for index column(s). If None is given "
"and Dataframe Index is True, Index Names are used."
),
validators=[Optional()],
widget=BS3TextFieldWidget(),
)

View File

@ -14,8 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import os
import tempfile
import zipfile
from typing import TYPE_CHECKING
import pandas as pd
@ -38,7 +40,7 @@ from superset.typing import FlaskResponse
from superset.utils import core as utils
from superset.views.base import DeleteMixin, SupersetModelView, YamlExportMixin
from .forms import CsvToDatabaseForm, ExcelToDatabaseForm
from .forms import ColumnarToDatabaseForm, CsvToDatabaseForm, ExcelToDatabaseForm
from .mixins import DatabaseMixin
from .validators import schema_allows_csv_upload, sqlalchemy_uri_validator
@ -162,6 +164,7 @@ class CsvToDatabaseView(SimpleFormView):
iterator=True,
keep_default_na=not form.null_values.data,
mangle_dupe_cols=form.mangle_dupe_cols.data,
usecols=form.usecols.data if form.usecols.data else None,
na_values=form.null_values.data if form.null_values.data else None,
nrows=form.nrows.data,
parse_dates=form.parse_dates.data,
@ -392,3 +395,150 @@ class ExcelToDatabaseView(SimpleFormView):
flash(message, "info")
stats_logger.incr("successful_excel_upload")
return redirect("/tablemodelview/list/")
class ColumnarToDatabaseView(SimpleFormView):
form = ColumnarToDatabaseForm
form_template = "superset/form_view/columnar_to_database_view/edit.html"
form_title = _("Columnar to Database configuration")
add_columns = ["database", "schema", "table_name"]
def form_get(self, form: ColumnarToDatabaseForm) -> None:
form.if_exists.data = "fail"
def form_post( # pylint: disable=too-many-locals
self, form: ColumnarToDatabaseForm
) -> Response:
database = form.con.data
columnar_table = Table(table=form.name.data, schema=form.schema.data)
files = form.columnar_file.data
file_type = {file.filename.split(".")[-1] for file in files}
if file_type == {"zip"}:
zipfile_ob = zipfile.ZipFile( # pylint: disable=consider-using-with
form.columnar_file.data[0]
) # pylint: disable=consider-using-with
file_type = {filename.split(".")[-1] for filename in zipfile_ob.namelist()}
files = [
io.BytesIO((zipfile_ob.open(filename).read(), filename)[0])
for filename in zipfile_ob.namelist()
]
if len(file_type) > 1:
message = _(
"Multiple file extensions are not allowed for columnar uploads."
" Please make sure all files are of the same extension.",
)
flash(message, "danger")
return redirect("/columnartodatabaseview/form")
read = pd.read_parquet
kwargs = {
"columns": form.usecols.data if form.usecols.data else None,
}
if not schema_allows_csv_upload(database, columnar_table.schema):
message = _(
'Database "%(database_name)s" schema "%(schema_name)s" '
"is not allowed for columnar uploads. "
"Please contact your Superset Admin.",
database_name=database.database_name,
schema_name=columnar_table.schema,
)
flash(message, "danger")
return redirect("/columnartodatabaseview/form")
if "." in columnar_table.table and columnar_table.schema:
message = _(
"You cannot specify a namespace both in the name of the table: "
'"%(columnar_table.table)s" and in the schema field: '
'"%(columnar_table.schema)s". Please remove one',
table=columnar_table.table,
schema=columnar_table.schema,
)
flash(message, "danger")
return redirect("/columnartodatabaseview/form")
try:
chunks = [read(file, **kwargs) for file in files]
df = pd.concat(chunks)
database = (
db.session.query(models.Database)
.filter_by(id=form.data.get("con").data.get("id"))
.one()
)
database.db_engine_spec.df_to_sql(
database,
columnar_table,
df,
to_sql_kwargs={
"chunksize": 1000,
"if_exists": form.if_exists.data,
"index": form.index.data,
"index_label": form.index_label.data,
},
)
# Connect table to the database that should be used for exploration.
# E.g. if hive was used to upload a csv, presto will be a better option
# to explore the table.
expore_database = database
explore_database_id = database.explore_database_id
if explore_database_id:
expore_database = (
db.session.query(models.Database)
.filter_by(id=explore_database_id)
.one_or_none()
or database
)
sqla_table = (
db.session.query(SqlaTable)
.filter_by(
table_name=columnar_table.table,
schema=columnar_table.schema,
database_id=expore_database.id,
)
.one_or_none()
)
if sqla_table:
sqla_table.fetch_metadata()
if not sqla_table:
sqla_table = SqlaTable(table_name=columnar_table.table)
sqla_table.database = expore_database
sqla_table.database_id = database.id
sqla_table.user_id = g.user.get_id()
sqla_table.schema = columnar_table.schema
sqla_table.fetch_metadata()
db.session.add(sqla_table)
db.session.commit()
except Exception as ex: # pylint: disable=broad-except
db.session.rollback()
message = _(
'Unable to upload Columnar file "%(filename)s" to table '
'"%(table_name)s" in database "%(db_name)s". '
"Error message: %(error_msg)s",
filename=[file.filename for file in form.columnar_file.data],
table_name=form.name.data,
db_name=database.database_name,
error_msg=str(ex),
)
flash(message, "danger")
stats_logger.incr("failed_columnar_upload")
return redirect("/columnartodatabaseview/form")
# Go back to welcome page / splash screen
message = _(
'Columnar file "%(columnar_filename)s" uploaded to table "%(table_name)s" '
'in database "%(db_name)s"',
columnar_filename=[file.filename for file in form.columnar_file.data],
table_name=str(columnar_table),
db_name=sqla_table.database.database_name,
)
flash(message, "info")
stats_logger.incr("successful_columnar_upload")
return redirect("/tablemodelview/list/")

View File

@ -1184,7 +1184,7 @@ class TestCore(SupersetTestCase):
mock_can_access_database.return_value = False
mock_schemas_accessible.return_value = ["this_schema_is_allowed_too"]
data = self.get_json_resp(
url="/superset/schemas_access_for_csv_upload?db_id={db_id}".format(
url="/superset/schemas_access_for_file_upload?db_id={db_id}".format(
db_id=dbobj.id
)
)

View File

@ -19,6 +19,7 @@
import json
import logging
import os
import shutil
from typing import Dict, Optional
from unittest import mock
@ -43,9 +44,14 @@ CSV_UPLOAD_DATABASE = "csv_explore_db"
CSV_FILENAME1 = "testCSV1.csv"
CSV_FILENAME2 = "testCSV2.csv"
EXCEL_FILENAME = "testExcel.xlsx"
PARQUET_FILENAME1 = "testZip/testParquet1.parquet"
PARQUET_FILENAME2 = "testZip/testParquet2.parquet"
ZIP_DIRNAME = "testZip"
ZIP_FILENAME = "testZip.zip"
EXCEL_UPLOAD_TABLE = "excel_upload"
CSV_UPLOAD_TABLE = "csv_upload"
PARQUET_UPLOAD_TABLE = "parquet_upload"
CSV_UPLOAD_TABLE_W_SCHEMA = "csv_upload_w_schema"
CSV_UPLOAD_TABLE_W_EXPLORE = "csv_upload_w_explore"
@ -70,6 +76,7 @@ def setup_csv_upload():
engine = upload_db.get_sqla_engine()
engine.execute(f"DROP TABLE IF EXISTS {EXCEL_UPLOAD_TABLE}")
engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE}")
engine.execute(f"DROP TABLE IF EXISTS {PARQUET_UPLOAD_TABLE}")
engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE_W_SCHEMA}")
engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE_W_EXPLORE}")
db.session.delete(upload_db)
@ -97,6 +104,17 @@ def create_excel_files():
os.remove(EXCEL_FILENAME)
@pytest.fixture()
def create_columnar_files():
os.mkdir(ZIP_DIRNAME)
pd.DataFrame({"a": ["john", "paul"], "b": [1, 2]}).to_parquet(PARQUET_FILENAME1)
pd.DataFrame({"a": ["max", "bob"], "b": [3, 4]}).to_parquet(PARQUET_FILENAME2)
shutil.make_archive(ZIP_DIRNAME, "zip", ZIP_DIRNAME)
yield
os.remove(ZIP_FILENAME)
shutil.rmtree(ZIP_DIRNAME)
def get_upload_db():
return db.session.query(Database).filter_by(database_name=CSV_UPLOAD_DATABASE).one()
@ -134,6 +152,22 @@ def upload_excel(
return get_resp(test_client, "/exceltodatabaseview/form", data=form_data)
def upload_columnar(
filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
columnar_upload_db_id = get_upload_db().id
form_data = {
"columnar_file": open(filename, "rb"),
"name": table_name,
"con": columnar_upload_db_id,
"if_exists": "fail",
"index_label": "test_label",
}
if extra:
form_data.update(extra)
return get_resp(test_client, "/columnartodatabaseview/form", data=form_data)
def mock_upload_to_s3(filename: str, upload_prefix: str, table: Table) -> str:
"""
HDFS is used instead of S3 for the unit tests.integration_tests.
@ -249,6 +283,18 @@ def test_import_csv(setup_csv_upload, create_csv_files):
)
assert success_msg_f1 in resp
# upload again with replace mode and specific columns
resp = upload_csv(
CSV_FILENAME1,
CSV_UPLOAD_TABLE,
extra={"if_exists": "replace", "usecols": '["a"]'},
)
assert success_msg_f1 in resp
# make sure only specified column name was read
table = SupersetTestCase.get_table(name=CSV_UPLOAD_TABLE)
assert "b" not in table.column_names
# upload again with replace mode
resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"})
assert success_msg_f1 in resp
@ -328,3 +374,68 @@ def test_import_excel(setup_csv_upload, create_excel_files):
.fetchall()
)
assert data == [(0, "john", 1), (1, "paul", 2)]
@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
def test_import_parquet(setup_csv_upload, create_columnar_files):
if utils.backend() == "hive":
pytest.skip("Hive doesn't allow parquet upload.")
success_msg_f1 = f'Columnar file "[\'{PARQUET_FILENAME1}\']" uploaded to table "{PARQUET_UPLOAD_TABLE}"'
# initial upload with fail mode
resp = upload_columnar(PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE)
assert success_msg_f1 in resp
# upload again with fail mode; should fail
fail_msg = f'Unable to upload Columnar file "[\'{PARQUET_FILENAME1}\']" to table "{PARQUET_UPLOAD_TABLE}"'
resp = upload_columnar(PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE)
assert fail_msg in resp
if utils.backend() != "hive":
# upload again with append mode
resp = upload_columnar(
PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE, extra={"if_exists": "append"}
)
assert success_msg_f1 in resp
# upload again with replace mode and specific columns
resp = upload_columnar(
PARQUET_FILENAME1,
PARQUET_UPLOAD_TABLE,
extra={"if_exists": "replace", "usecols": '["a"]'},
)
assert success_msg_f1 in resp
# make sure only specified column name was read
table = SupersetTestCase.get_table(name=PARQUET_UPLOAD_TABLE)
assert "b" not in table.column_names
# upload again with replace mode
resp = upload_columnar(
PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE, extra={"if_exists": "replace"}
)
assert success_msg_f1 in resp
data = (
get_upload_db()
.get_sqla_engine()
.execute(f"SELECT * from {PARQUET_UPLOAD_TABLE}")
.fetchall()
)
assert data == [("john", 1), ("paul", 2)]
# replace table with zip file
resp = upload_columnar(
ZIP_FILENAME, PARQUET_UPLOAD_TABLE, extra={"if_exists": "replace"}
)
success_msg_f2 = f'Columnar file "[\'{ZIP_FILENAME}\']" uploaded to table "{PARQUET_UPLOAD_TABLE}"'
assert success_msg_f2 in resp
data = (
get_upload_db()
.get_sqla_engine()
.execute(f"SELECT * from {PARQUET_UPLOAD_TABLE}")
.fetchall()
)
assert data == [("john", 1), ("paul", 2), ("max", 3), ("bob", 4)]