refactor(pinot): The `python_date_format` for a temporal column was not being passed to `get_timestamp_expr` (#24942)
This commit is contained in:
parent
69fb309ec3
commit
c2a21d2da0
|
|
@ -14,15 +14,15 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy.sql.expression import ColumnClause
|
||||
from sqlalchemy import types
|
||||
from sqlalchemy.engine.interfaces import Dialect
|
||||
from sqlalchemy.types import TypeEngine
|
||||
|
||||
from superset.constants import TimeGrain
|
||||
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression
|
||||
from superset.db_engine_specs.base import BaseEngineSpec
|
||||
|
||||
|
||||
class PinotEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
|
||||
class PinotEngineSpec(BaseEngineSpec):
|
||||
engine = "pinot"
|
||||
engine_name = "Apache Pinot"
|
||||
allows_subqueries = False
|
||||
|
|
@ -30,93 +30,51 @@ class PinotEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
|
|||
allows_alias_in_select = False
|
||||
allows_alias_in_orderby = False
|
||||
|
||||
# Pinot does its own conversion below
|
||||
# https://docs.pinot.apache.org/users/user-guide-query/supported-transformations#datetime-functions
|
||||
_time_grain_expressions = {
|
||||
TimeGrain.SECOND: "1:SECONDS",
|
||||
TimeGrain.MINUTE: "1:MINUTES",
|
||||
TimeGrain.FIVE_MINUTES: "5:MINUTES",
|
||||
TimeGrain.TEN_MINUTES: "10:MINUTES",
|
||||
TimeGrain.FIFTEEN_MINUTES: "15:MINUTES",
|
||||
TimeGrain.THIRTY_MINUTES: "30:MINUTES",
|
||||
TimeGrain.HOUR: "1:HOURS",
|
||||
TimeGrain.DAY: "1:DAYS",
|
||||
TimeGrain.WEEK: "week",
|
||||
TimeGrain.MONTH: "month",
|
||||
TimeGrain.QUARTER: "quarter",
|
||||
TimeGrain.YEAR: "year",
|
||||
}
|
||||
|
||||
_python_to_java_time_patterns: dict[str, str] = {
|
||||
"%Y": "yyyy",
|
||||
"%m": "MM",
|
||||
"%d": "dd",
|
||||
"%H": "HH",
|
||||
"%M": "mm",
|
||||
"%S": "ss",
|
||||
}
|
||||
|
||||
_use_date_trunc_function: dict[str, bool] = {
|
||||
TimeGrain.SECOND: False,
|
||||
TimeGrain.MINUTE: False,
|
||||
TimeGrain.FIVE_MINUTES: False,
|
||||
TimeGrain.TEN_MINUTES: False,
|
||||
TimeGrain.FIFTEEN_MINUTES: False,
|
||||
TimeGrain.THIRTY_MINUTES: False,
|
||||
TimeGrain.HOUR: False,
|
||||
TimeGrain.DAY: False,
|
||||
TimeGrain.WEEK: True,
|
||||
TimeGrain.MONTH: True,
|
||||
TimeGrain.QUARTER: True,
|
||||
TimeGrain.YEAR: True,
|
||||
None: "{col}",
|
||||
TimeGrain.SECOND: "CAST(DATE_TRUNC('second', "
|
||||
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.MINUTE: "CAST(DATE_TRUNC('minute', "
|
||||
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.FIVE_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
|
||||
+ "CAST({col} AS TIMESTAMP)), 300000) AS TIMESTAMP)",
|
||||
TimeGrain.TEN_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
|
||||
+ "CAST({col} AS TIMESTAMP)), 600000) AS TIMESTAMP)",
|
||||
TimeGrain.FIFTEEN_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
|
||||
+ "CAST({col} AS TIMESTAMP)), 900000) AS TIMESTAMP)",
|
||||
TimeGrain.THIRTY_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
|
||||
+ "CAST({col} AS TIMESTAMP)), 1800000) AS TIMESTAMP)",
|
||||
TimeGrain.HOUR: "CAST(DATE_TRUNC('hour', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.DAY: "CAST(DATE_TRUNC('day', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.WEEK: "CAST(DATE_TRUNC('week', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.MONTH: "CAST(DATE_TRUNC('month', "
|
||||
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.QUARTER: "CAST(DATE_TRUNC('quarter', "
|
||||
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
TimeGrain.YEAR: "CAST(DATE_TRUNC('year', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_timestamp_expr(
|
||||
cls,
|
||||
col: ColumnClause,
|
||||
pdf: Optional[str],
|
||||
time_grain: Optional[str],
|
||||
) -> TimestampExpression:
|
||||
if not pdf:
|
||||
raise NotImplementedError(f"Empty date format for '{col}'")
|
||||
is_epoch = pdf in ("epoch_s", "epoch_ms")
|
||||
def epoch_to_dttm(cls) -> str:
|
||||
return (
|
||||
"DATETIMECONVERT({col}, '1:SECONDS:EPOCH', '1:SECONDS:EPOCH', '1:SECONDS')"
|
||||
)
|
||||
|
||||
# The DATETIMECONVERT pinot udf is documented at
|
||||
# Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF
|
||||
# We are not really converting any time units, just bucketing them.
|
||||
tf = ""
|
||||
java_date_format = ""
|
||||
if not is_epoch:
|
||||
java_date_format = pdf
|
||||
for (
|
||||
python_pattern,
|
||||
java_pattern,
|
||||
) in cls._python_to_java_time_patterns.items():
|
||||
java_date_format = java_date_format.replace(
|
||||
python_pattern, java_pattern
|
||||
)
|
||||
tf = f"1:SECONDS:SIMPLE_DATE_FORMAT:{java_date_format}"
|
||||
else:
|
||||
seconds_or_ms = "MILLISECONDS" if pdf == "epoch_ms" else "SECONDS"
|
||||
tf = f"1:{seconds_or_ms}:EPOCH"
|
||||
if time_grain:
|
||||
granularity = cls.get_time_grain_expressions().get(time_grain)
|
||||
if not granularity:
|
||||
raise NotImplementedError(f"No pinot grain spec for '{time_grain}'")
|
||||
else:
|
||||
return TimestampExpression("{{col}}", col)
|
||||
@classmethod
|
||||
def epoch_ms_to_dttm_(cls) -> str:
|
||||
return (
|
||||
"DATETIMECONVERT({col}, '1:MILLISECONDS:EPOCH', "
|
||||
+ "'1:MILLISECONDS:EPOCH', '1:MILLISECONDS')"
|
||||
)
|
||||
|
||||
# In pinot the output is a string since there is no timestamp column like pg
|
||||
if cls._use_date_trunc_function.get(time_grain):
|
||||
if is_epoch:
|
||||
time_expr = f"DATETRUNC('{granularity}', {{col}}, '{seconds_or_ms}')"
|
||||
else:
|
||||
time_expr = (
|
||||
f"ToDateTime(DATETRUNC('{granularity}', "
|
||||
+ f"FromDateTime({{col}}, '{java_date_format}'), "
|
||||
+ f"'MILLISECONDS'), '{java_date_format}')"
|
||||
)
|
||||
else:
|
||||
time_expr = f"DATETIMECONVERT({{col}}, '{tf}', '{tf}', '{granularity}')"
|
||||
@classmethod
|
||||
def column_datatype_to_string(
|
||||
cls, sqla_column_type: TypeEngine, dialect: Dialect
|
||||
) -> str:
|
||||
# Pinot driver infers TIMESTAMP column as LONG, so make the quick fix.
|
||||
# When the Pinot driver fix this bug, current method could be removed.
|
||||
if isinstance(sqla_column_type, types.TIMESTAMP):
|
||||
return sqla_column_type.compile().upper()
|
||||
|
||||
return TimestampExpression(time_expr, col)
|
||||
return super().column_datatype_to_string(sqla_column_type, dialect)
|
||||
|
|
|
|||
|
|
@ -27,61 +27,66 @@ class TestPinotDbEngineSpec(TestDbEngineSpec):
|
|||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "epoch_s", "P1D")
|
||||
result = str(expr.compile())
|
||||
expected = (
|
||||
"CAST(DATE_TRUNC('day', CAST("
|
||||
+ "DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', "
|
||||
+ "'1:SECONDS:EPOCH', '1:SECONDS') AS TIMESTAMP)) AS TIMESTAMP)"
|
||||
)
|
||||
self.assertEqual(
|
||||
result,
|
||||
"DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', '1:SECONDS:EPOCH', '1:DAYS')",
|
||||
expected,
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_simple_date_format_1d_grain(self):
|
||||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "P1D")
|
||||
result = str(expr.compile())
|
||||
expected = "CAST(DATE_TRUNC('day', CAST(tstamp AS TIMESTAMP)) AS TIMESTAMP)"
|
||||
self.assertEqual(
|
||||
result,
|
||||
(
|
||||
"DATETIMECONVERT(tstamp, "
|
||||
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', "
|
||||
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '1:DAYS')"
|
||||
),
|
||||
expected,
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_simple_date_format_10m_grain(self):
|
||||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "PT10M")
|
||||
result = str(expr.compile())
|
||||
expected = (
|
||||
"CAST(ROUND(DATE_TRUNC('minute', CAST(tstamp AS "
|
||||
+ "TIMESTAMP)), 600000) AS TIMESTAMP)"
|
||||
)
|
||||
self.assertEqual(
|
||||
result,
|
||||
(
|
||||
"DATETIMECONVERT(tstamp, "
|
||||
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', "
|
||||
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '10:MINUTES')"
|
||||
),
|
||||
expected,
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_simple_date_format_1w_grain(self):
|
||||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "P1W")
|
||||
result = str(expr.compile())
|
||||
expected = "CAST(DATE_TRUNC('week', CAST(tstamp AS TIMESTAMP)) AS TIMESTAMP)"
|
||||
self.assertEqual(
|
||||
result,
|
||||
(
|
||||
"ToDateTime(DATETRUNC('week', FromDateTime(tstamp, "
|
||||
+ "'yyyy-MM-dd HH:mm:ss'), 'MILLISECONDS'), 'yyyy-MM-dd HH:mm:ss')"
|
||||
),
|
||||
expected,
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_sec_one_1m_grain(self):
|
||||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "epoch_s", "P1M")
|
||||
result = str(expr.compile())
|
||||
expected = (
|
||||
"CAST(DATE_TRUNC('month', CAST("
|
||||
+ "DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', "
|
||||
+ "'1:SECONDS:EPOCH', '1:SECONDS') AS TIMESTAMP)) AS TIMESTAMP)"
|
||||
)
|
||||
self.assertEqual(
|
||||
result,
|
||||
"DATETRUNC('month', tstamp, 'SECONDS')",
|
||||
expected,
|
||||
)
|
||||
|
||||
def test_invalid_get_time_expression_arguments(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
PinotEngineSpec.get_timestamp_expr(column("tstamp"), None, "P1M")
|
||||
PinotEngineSpec.get_timestamp_expr(column("tstamp"), None, "P0.25Y")
|
||||
|
||||
with self.assertRaises(NotImplementedError):
|
||||
PinotEngineSpec.get_timestamp_expr(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import column
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"time_grain,expected_result",
|
||||
[
|
||||
("PT1S", "CAST(DATE_TRUNC('second', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
|
||||
(
|
||||
"PT5M",
|
||||
"CAST(ROUND(DATE_TRUNC('minute', CAST(col AS TIMESTAMP)), 300000) AS TIMESTAMP)",
|
||||
),
|
||||
("P1W", "CAST(DATE_TRUNC('week', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
|
||||
("P1M", "CAST(DATE_TRUNC('month', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
|
||||
("P3M", "CAST(DATE_TRUNC('quarter', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
|
||||
("P1Y", "CAST(DATE_TRUNC('year', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
|
||||
],
|
||||
)
|
||||
def test_timegrain_expressions(time_grain: str, expected_result: str) -> None:
|
||||
"""
|
||||
DB Eng Specs (pinot): Test time grain expressions
|
||||
"""
|
||||
from superset.db_engine_specs.pinot import PinotEngineSpec as spec
|
||||
|
||||
actual = str(
|
||||
spec.get_timestamp_expr(col=column("col"), pdf=None, time_grain=time_grain)
|
||||
)
|
||||
assert actual == expected_result
|
||||
|
||||
|
||||
def test_extras_without_ssl() -> None:
|
||||
from superset.db_engine_specs.pinot import PinotEngineSpec as spec
|
||||
from tests.integration_tests.fixtures.database import default_db_extra
|
||||
|
||||
db = mock.Mock()
|
||||
db.extra = default_db_extra
|
||||
db.server_cert = None
|
||||
extras = spec.get_extra_params(db)
|
||||
assert "connect_args" not in extras["engine_params"]
|
||||
Loading…
Reference in New Issue