feat: Add support for Azure Data Explorer (Kusto) db engine spec (#17898)

* Add two Kusto engine specs: KQL and SQL. Some minor changes in core code to support Kusto engine specs.

* Remove redundant imports and logging.

* docs: Kusto sqlalchemy docs

* fix: Fix mypy and linting errors

* fix: Handle Black vs Pylint checks

* fix: isort problem

* refactor: Merge kustosql and kustokql in the single kusto module

* test: Add tests for Kusto db spec

* feat: Schema override does not require in KQL anymore

* Removed redundant imports.

* Added ".show" queries to readonly query determination.

* Fixed some bugs.
Added tests for convert_dttm.

* Fixed major sqlalchemy-kusto version.

* Fixed by isort.

Co-authored-by: Eugene Bikkinin <xnegxneg@gmail.com>
Co-authored-by: k.tomak <k.tomak@dodopizza.com>
Co-authored-by: Eugene Bikkinin <e.bikkinin@dodopizza.com>
This commit is contained in:
Mikhail Kumachev 2022-01-10 15:42:20 +03:00 committed by GitHub
parent c0a769581f
commit d2d4f8eb44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 390 additions and 3 deletions

View File

@ -789,6 +789,23 @@ If you are using JDBC to connect to Drill, the connection string looks like this
For a complete tutorial about how to use Apache Drill with Superset, see this tutorial:
`Visualize Anything with Superset and Drill <http://thedataist.com/visualize-anything-with-superset-and-drill/>`_
Kusto
---------
The recommended connector library for Kusto is
[sqlalchemy-kusto](https://pypi.org/project/sqlalchemy-kusto/1.0.1/)>=1.0.1.
The connection string for Kusto looks like this:
```
kustosql+https://{cluster_url}/{database}?azure_ad_client_id={azure_ad_client_id}&azure_ad_client_secret={azure_ad_client_secret}&azure_ad_tenant_id={azure_ad_tenant_id}&msi=False
```
Make sure the user has privileges to access and use all required
databases/tables/views.
See `Azure Data Explorer (Kusto) dialect for SQLAlchemy <https://github.com/dodopizza/sqlalchemy-kusto/>`_.
Deeper SQLAlchemy integration
-----------------------------

View File

@ -46,6 +46,7 @@ A list of some of the recommended packages.
|[Hologres](/docs/databases/hologres)|```pip install psycopg2```|```postgresql+psycopg2://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
|[IBM Db2](/docs/databases/ibm-db2)|```pip install ibm_db_sa```|```db2+ibm_db://```|
|[IBM Netezza Performance Server](/docs/databases/netezza)|```pip install nzalchemy```|```netezza+nzpy://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
|[Kusto](/docs/databases/kusto)|```pip install sqlalchemy-kusto```|```kustosql+https://{cluster_url}/{database}?azure_ad_client_id={azure_ad_client_id}&azure_ad_client_secret={azure_ad_client_secret}&azure_ad_tenant_id={azure_ad_tenant_id}&msi=False```|
|[MySQL](/docs/databases/mysql)|```pip install mysqlclient```|```mysql://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|
|[Oracle](/docs/databases/oracle)|```pip install cx_Oracle```|```oracle://```|
|[PostgreSQL](/docs/databases/postgres)|```pip install psycopg2```|```postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>```|

View File

@ -0,0 +1,21 @@
---
name: Kusto
menu: Connecting to Databases
route: /docs/databases/kusto
index: 32
version: 1
---
## Kusto
The recommended connector library for Kusto is
[sqlalchemy-kusto](https://pypi.org/project/sqlalchemy-kusto/1.0.1/)>=1.0.1.
The connection string for Kusto looks like this:
```
kustosql+https://{cluster_url}/{database}?azure_ad_client_id={azure_ad_client_id}&azure_ad_client_secret={azure_ad_client_secret}&azure_ad_tenant_id={azure_ad_tenant_id}&msi=False
```
Make sure the user has privileges to access and use all required
databases/tables/views.

View File

@ -140,6 +140,7 @@ setup(
"hana": ["hdbcli==2.4.162", "sqlalchemy_hana==0.4.0"],
"hive": ["pyhive[hive]>=0.6.1", "tableschema", "thrift>=0.11.0, <1.0.0"],
"impala": ["impyla>0.16.2, <0.17"],
"kusto": ["sqlalchemy-kusto>=1.0.1, <2"],
"kylin": ["kylinpy>=2.8.1, <2.9"],
"mmsql": ["pymssql>=2.1.4, <2.2"],
"mysql": ["mysqlclient>=2.1.0, <3"],

View File

@ -1381,6 +1381,10 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
return False
@classmethod
def parse_sql(cls, sql: str) -> List[str]:
return [str(s).strip(" ;") for s in sqlparse.parse(sql)]
# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI

View File

@ -0,0 +1,164 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
from superset.db_engine_specs.base import BaseEngineSpec, LimitMethod
from superset.db_engine_specs.exceptions import (
SupersetDBAPIDatabaseError,
SupersetDBAPIOperationalError,
SupersetDBAPIProgrammingError,
)
from superset.sql_parse import ParsedQuery
from superset.utils import core as utils
class KustoSqlEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
limit_method = LimitMethod.WRAP_SQL
engine = "kustosql"
engine_name = "KustoSQL"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = True
allows_subqueries = True
allows_sql_comments = False
_time_grain_expressions = {
None: "{col}",
"PT1S": "DATEADD(second, DATEDIFF(second, '2000-01-01', {col}), '2000-01-01')",
"PT1M": "DATEADD(minute, DATEDIFF(minute, 0, {col}), 0)",
"PT5M": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 5 * 5, 0)",
"PT10M": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 10 * 10, 0)",
"PT15M": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 15 * 15, 0)",
"PT0.5H": "DATEADD(minute, DATEDIFF(minute, 0, {col}) / 30 * 30, 0)",
"PT1H": "DATEADD(hour, DATEDIFF(hour, 0, {col}), 0)",
"P1D": "DATEADD(day, DATEDIFF(day, 0, {col}), 0)",
"P1W": "DATEADD(day, -1, DATEADD(week, DATEDIFF(week, 0, {col}), 0))",
"P1M": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)",
"P0.25Y": "DATEADD(quarter, DATEDIFF(quarter, 0, {col}), 0)",
"P1Y": "DATEADD(year, DATEDIFF(year, 0, {col}), 0)",
"1969-12-28T00:00:00Z/P1W": "DATEADD(day, -1,"
" DATEADD(week, DATEDIFF(week, 0, {col}), 0))",
"1969-12-29T00:00:00Z/P1W": "DATEADD(week,"
" DATEDIFF(week, 0, DATEADD(day, -1, {col})), 0)",
}
type_code_map: Dict[int, str] = {} # loaded from get_datatype only if needed
@classmethod
def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
# pylint: disable=import-outside-toplevel,import-error
import sqlalchemy_kusto.errors as kusto_exceptions
return {
kusto_exceptions.DatabaseError: SupersetDBAPIDatabaseError,
kusto_exceptions.OperationalError: SupersetDBAPIOperationalError,
kusto_exceptions.ProgrammingError: SupersetDBAPIProgrammingError,
}
@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CONVERT(DATE, '{dttm.date().isoformat()}', 23)"
if tt == utils.TemporalType.DATETIME:
datetime_formatted = dttm.isoformat(timespec="milliseconds")
return f"""CONVERT(DATETIME, '{datetime_formatted}', 126)"""
if tt == utils.TemporalType.SMALLDATETIME:
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return f"""CONVERT(SMALLDATETIME, '{datetime_formatted}', 20)"""
if tt == utils.TemporalType.TIMESTAMP:
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return f"""CONVERT(TIMESTAMP, '{datetime_formatted}', 20)"""
return None
@classmethod
def is_readonly_query(cls, parsed_query: ParsedQuery) -> bool:
"""Pessimistic readonly, 100% sure statement won't mutate anything"""
return parsed_query.sql.lower().startswith("select")
class KustoKqlEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
limit_method = LimitMethod.WRAP_SQL
engine = "kustokql"
engine_name = "KustoKQL"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = True
allows_subqueries = True
allows_sql_comments = False
run_multiple_statements_as_one = True
_time_grain_expressions = {
None: "{col}",
"PT1S": "{col}/ time(1s)",
"PT1M": "{col}/ time(1min)",
"PT1H": "{col}/ time(1h)",
"P1D": "{col}/ time(1d)",
"P1M": "datetime_diff('month',CreateDate, datetime(0001-01-01 00:00:00))+1",
"P1Y": "datetime_diff('year',CreateDate, datetime(0001-01-01 00:00:00))+1",
}
type_code_map: Dict[int, str] = {} # loaded from get_datatype only if needed
@classmethod
def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
# pylint: disable=import-outside-toplevel,import-error
import sqlalchemy_kusto.errors as kusto_exceptions
return {
kusto_exceptions.DatabaseError: SupersetDBAPIDatabaseError,
kusto_exceptions.OperationalError: SupersetDBAPIOperationalError,
kusto_exceptions.ProgrammingError: SupersetDBAPIProgrammingError,
}
@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
if target_type.upper() in [
utils.TemporalType.DATETIME,
utils.TemporalType.TIMESTAMP,
]:
return f"""datetime({dttm.isoformat(timespec="microseconds")})"""
if target_type.upper() == utils.TemporalType.DATE:
return f"""datetime({dttm.date().isoformat()})"""
return None
@classmethod
def is_readonly_query(cls, parsed_query: ParsedQuery) -> bool:
"""
Pessimistic readonly, 100% sure statement won't mutate anything.
"""
return KustoKqlEngineSpec.is_select_query(
parsed_query
) or parsed_query.sql.startswith(".show")
@classmethod
def is_select_query(cls, parsed_query: ParsedQuery) -> bool:
return not parsed_query.sql.startswith(".")
@classmethod
def parse_sql(cls, sql: str) -> List[str]:
"""
Kusto supports a single query statement, but it could include sub queries
and variables declared via let keyword.
"""
return [sql]

View File

@ -29,7 +29,6 @@ from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type
import numpy
import pandas as pd
import sqlalchemy as sqla
import sqlparse
from flask import g, request
from flask_appbuilder import Model
from sqlalchemy import (
@ -399,7 +398,7 @@ class Database(
mutator: Optional[Callable[[pd.DataFrame], None]] = None,
username: Optional[str] = None,
) -> pd.DataFrame:
sqls = [str(s).strip(" ;") for s in sqlparse.parse(sql)]
sqls = self.db_engine_spec.parse_sql(sql)
engine = self.get_sqla_engine(schema=schema, user_name=username)
username = utils.get_username() or username

View File

@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=unused-argument, import-outside-toplevel, protected-access
import re
from flask.ctx import AppContext
@ -31,3 +30,29 @@ def test_get_text_clause_with_colon(app_context: AppContext) -> None:
"SELECT foo FROM tbl WHERE foo = '123:456')"
)
assert text_clause.text == "SELECT foo FROM tbl WHERE foo = '123\\:456')"
def test_parse_sql_single_statement(app_context: AppContext) -> None:
"""
`parse_sql` should properly strip leading and trailing spaces and semicolons
"""
from superset.db_engine_specs.base import BaseEngineSpec
queries = BaseEngineSpec.parse_sql(" SELECT foo FROM tbl ; ")
assert queries == ["SELECT foo FROM tbl"]
def test_parse_sql_multi_statement(app_context: AppContext) -> None:
"""
For string with multiple SQL-statements `parse_sql` method should return list
where each element represents the single SQL-statement
"""
from superset.db_engine_specs.base import BaseEngineSpec
queries = BaseEngineSpec.parse_sql("SELECT foo FROM tbl1; SELECT bar FROM tbl2;")
assert queries == [
"SELECT foo FROM tbl1",
"SELECT bar FROM tbl2",
]

View File

@ -0,0 +1,155 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=unused-argument, import-outside-toplevel, protected-access
from datetime import datetime
import pytest
from flask.ctx import AppContext
@pytest.mark.parametrize(
"sql,expected",
[
("SELECT foo FROM tbl", True),
("SHOW TABLES", False),
("EXPLAIN SELECT foo FROM tbl", False),
("INSERT INTO tbl (foo) VALUES (1)", False),
],
)
def test_sql_is_readonly_query(
app_context: AppContext, sql: str, expected: bool
) -> None:
"""
Make sure that SQL dialect consider only SELECT statements as read-only
"""
from superset.db_engine_specs.kusto import KustoSqlEngineSpec
from superset.sql_parse import ParsedQuery
parsed_query = ParsedQuery(sql)
is_readonly = KustoSqlEngineSpec.is_readonly_query(parsed_query)
assert expected == is_readonly
@pytest.mark.parametrize(
"kql,expected",
[
("tbl | limit 100", True),
("let foo = 1; tbl | where bar == foo", True),
(".show tables", False),
],
)
def test_kql_is_select_query(app_context: AppContext, kql: str, expected: bool) -> None:
"""
Make sure that KQL dialect consider only statements that do not start with "." (dot)
as a SELECT statements
"""
from superset.db_engine_specs.kusto import KustoKqlEngineSpec
from superset.sql_parse import ParsedQuery
parsed_query = ParsedQuery(kql)
is_select = KustoKqlEngineSpec.is_select_query(parsed_query)
assert expected == is_select
@pytest.mark.parametrize(
"kql,expected",
[
("tbl | limit 100", True),
("let foo = 1; tbl | where bar == foo", True),
(".show tables", True),
("print 1", True),
("set querytrace; Events | take 100", True),
(".drop table foo", False),
(".set-or-append table foo <| bar", False),
],
)
def test_kql_is_readonly_query(
app_context: AppContext, kql: str, expected: bool
) -> None:
"""
Make sure that KQL dialect consider only SELECT statements as read-only
"""
from superset.db_engine_specs.kusto import KustoKqlEngineSpec
from superset.sql_parse import ParsedQuery
parsed_query = ParsedQuery(kql)
is_readonly = KustoKqlEngineSpec.is_readonly_query(parsed_query)
assert expected == is_readonly
def test_kql_parse_sql(app_context: AppContext) -> None:
"""
parse_sql method should always return a list with a single element
which is an original query
"""
from superset.db_engine_specs.kusto import KustoKqlEngineSpec
queries = KustoKqlEngineSpec.parse_sql("let foo = 1; tbl | where bar == foo")
assert queries == ["let foo = 1; tbl | where bar == foo"]
@pytest.mark.parametrize(
"target_type,expected_dttm",
[
("DATETIME", "datetime(2019-01-02T03:04:05.678900)"),
("TIMESTAMP", "datetime(2019-01-02T03:04:05.678900)"),
("DATE", "datetime(2019-01-02)"),
],
)
def test_kql_convert_dttm(
app_context: AppContext, target_type: str, expected_dttm: str
) -> None:
"""
Test that date objects are converted correctly.
"""
from superset.db_engine_specs.kusto import KustoKqlEngineSpec
dttm = datetime.strptime("2019-01-02 03:04:05.678900", "%Y-%m-%d %H:%M:%S.%f")
print(dttm)
assert expected_dttm == KustoKqlEngineSpec.convert_dttm(target_type, dttm)
@pytest.mark.parametrize(
"target_type,expected_dttm",
[
("DATETIME", "CONVERT(DATETIME, '2019-01-02T03:04:05.678', 126)"),
("DATE", "CONVERT(DATE, '2019-01-02', 23)"),
("SMALLDATETIME", "CONVERT(SMALLDATETIME, '2019-01-02 03:04:05', 20)"),
("TIMESTAMP", "CONVERT(TIMESTAMP, '2019-01-02 03:04:05', 20)"),
],
)
def test_sql_convert_dttm(
app_context: AppContext, target_type: str, expected_dttm: str
) -> None:
"""
Test that date objects are converted correctly.
"""
from superset.db_engine_specs.kusto import KustoSqlEngineSpec
dttm = datetime.strptime("2019-01-02 03:04:05.678900", "%Y-%m-%d %H:%M:%S.%f")
print(dttm)
assert expected_dttm == KustoSqlEngineSpec.convert_dttm(target_type, dttm)