fix: Fixing pinot query generation for date format conversion from python datetime format to java simple date format (#13163)
* Fixing pinot query generation for date format conversion from python datetime format to java simple date format * Address comments * fix test Co-authored-by: Ville Brofeldt <ville.v.brofeldt@gmail.com>
This commit is contained in:
parent
eea5fe3b60
commit
786c12d52d
|
|
@ -14,7 +14,6 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from sqlalchemy.sql.expression import ColumnClause, ColumnElement
|
||||
|
|
@ -69,24 +68,24 @@ class PinotEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
|
|||
time_grain: Optional[str],
|
||||
type_: Optional[str] = None,
|
||||
) -> TimestampExpression:
|
||||
if not pdf:
|
||||
raise NotImplementedError(f"Empty date format for '{col}'")
|
||||
is_epoch = pdf in ("epoch_s", "epoch_ms")
|
||||
|
||||
# The DATETIMECONVERT pinot udf is documented at
|
||||
# Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF
|
||||
# We are not really converting any time units, just bucketing them.
|
||||
tf = ""
|
||||
java_date_format = ""
|
||||
if not is_epoch:
|
||||
try:
|
||||
today = datetime.datetime.today()
|
||||
today.strftime(str(pdf))
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid column datetime format:{str(pdf)}")
|
||||
java_date_format = str(pdf)
|
||||
java_date_format = pdf
|
||||
for (
|
||||
python_pattern,
|
||||
java_pattern,
|
||||
) in cls._python_to_java_time_patterns.items():
|
||||
java_date_format.replace(python_pattern, java_pattern)
|
||||
java_date_format = java_date_format.replace(
|
||||
python_pattern, java_pattern
|
||||
)
|
||||
tf = f"1:SECONDS:SIMPLE_DATE_FORMAT:{java_date_format}"
|
||||
else:
|
||||
seconds_or_ms = "MILLISECONDS" if pdf == "epoch_ms" else "SECONDS"
|
||||
|
|
@ -94,13 +93,20 @@ class PinotEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
|
|||
if time_grain:
|
||||
granularity = cls.get_time_grain_expressions().get(time_grain)
|
||||
if not granularity:
|
||||
raise NotImplementedError("No pinot grain spec for " + str(time_grain))
|
||||
raise NotImplementedError(f"No pinot grain spec for '{time_grain}'")
|
||||
else:
|
||||
return TimestampExpression("{{col}}", col)
|
||||
|
||||
# In pinot the output is a string since there is no timestamp column like pg
|
||||
if cls._use_date_trunc_function.get(time_grain):
|
||||
time_expr = f"DATETRUNC('{granularity}', {{col}}, '{seconds_or_ms}')"
|
||||
if is_epoch:
|
||||
time_expr = f"DATETRUNC('{granularity}', {{col}}, '{seconds_or_ms}')"
|
||||
else:
|
||||
time_expr = (
|
||||
f"ToDateTime(DATETRUNC('{granularity}', "
|
||||
+ f"FromDateTime({{col}}, '{java_date_format}'), "
|
||||
+ f"'MILLISECONDS'), '{java_date_format}')"
|
||||
)
|
||||
else:
|
||||
time_expr = f"DATETIMECONVERT({{col}}, '{tf}', '{tf}', '{granularity}')"
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,32 @@ class TestPinotDbEngineSpec(TestDbEngineSpec):
|
|||
self.assertEqual(
|
||||
result,
|
||||
"DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', '1:SECONDS:EPOCH', '1:DAYS')",
|
||||
) # noqa
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_simple_date_format_1d_grain(self):
|
||||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "P1D")
|
||||
result = str(expr.compile())
|
||||
self.assertEqual(
|
||||
result,
|
||||
(
|
||||
"DATETIMECONVERT(tstamp, "
|
||||
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', "
|
||||
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '1:DAYS')"
|
||||
),
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_simple_date_format_1w_grain(self):
|
||||
col = column("tstamp")
|
||||
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "P1W")
|
||||
result = str(expr.compile())
|
||||
self.assertEqual(
|
||||
result,
|
||||
(
|
||||
"ToDateTime(DATETRUNC('week', FromDateTime(tstamp, "
|
||||
+ "'yyyy-MM-dd HH:mm:ss'), 'MILLISECONDS'), 'yyyy-MM-dd HH:mm:ss')"
|
||||
),
|
||||
)
|
||||
|
||||
def test_pinot_time_expression_sec_one_1m_grain(self):
|
||||
col = column("tstamp")
|
||||
|
|
@ -38,4 +63,13 @@ class TestPinotDbEngineSpec(TestDbEngineSpec):
|
|||
result = str(expr.compile())
|
||||
self.assertEqual(
|
||||
result, "DATETRUNC('month', tstamp, 'SECONDS')",
|
||||
) # noqa
|
||||
)
|
||||
|
||||
def test_invalid_get_time_expression_arguments(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
PinotEngineSpec.get_timestamp_expr(column("tstamp"), None, "P1M")
|
||||
|
||||
with self.assertRaises(NotImplementedError):
|
||||
PinotEngineSpec.get_timestamp_expr(
|
||||
column("tstamp"), "epoch_s", "invalid_grain"
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue