chore: Add Apache Spark Jinja template processor (#28335)

This commit is contained in:
John Bodley 2024-05-06 09:24:25 -07:00 committed by GitHub
parent bf5ff55074
commit 27c93f438a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 31 additions and 0 deletions

View File

@ -641,6 +641,19 @@ class HiveTemplateProcessor(PrestoTemplateProcessor):
engine = "hive"
class SparkTemplateProcessor(HiveTemplateProcessor):
engine = "spark"
def process_template(self, sql: str, **kwargs: Any) -> str:
template = self.env.from_string(sql)
kwargs.update(self._context)
# Backwards compatibility if migrating from Hive.
context = validate_template_context(self.engine, kwargs)
context["hive"] = context["spark"]
return template.render(context)
class TrinoTemplateProcessor(PrestoTemplateProcessor):
engine = "trino"
@ -657,6 +670,7 @@ class TrinoTemplateProcessor(PrestoTemplateProcessor):
DEFAULT_PROCESSORS = {
"presto": PrestoTemplateProcessor,
"hive": HiveTemplateProcessor,
"spark": SparkTemplateProcessor,
"trino": TrinoTemplateProcessor,
}

View File

@ -121,6 +121,23 @@ def test_template_hive(app_context: AppContext, mocker: MockFixture) -> None:
assert tp.process_template(template) == "the_latest"
def test_template_spark(app_context: AppContext, mocker: MockFixture) -> None:
lp_mock = mocker.patch(
"superset.jinja_context.SparkTemplateProcessor.latest_partition"
)
lp_mock.return_value = "the_latest"
database = mock.Mock()
database.backend = "spark"
template = "{{ spark.latest_partition('my_table') }}"
tp = get_template_processor(database=database)
assert tp.process_template(template) == "the_latest"
# Backwards compatibility if migrating from Hive.
template = "{{ hive.latest_partition('my_table') }}"
tp = get_template_processor(database=database)
assert tp.process_template(template) == "the_latest"
def test_template_trino(app_context: AppContext, mocker: MockFixture) -> None:
lp_mock = mocker.patch(
"superset.jinja_context.TrinoTemplateProcessor.latest_partition"