Remove unused methods from Presto (#8350)
* Remove unused methods from Presto * Remove tests * Fix bug in PRESTO_SPLIT_VIEWS_FROM_TABLES * Fix lint
This commit is contained in:
parent
ec86d9de17
commit
04c3d1f58d
|
|
@ -14,7 +14,7 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from collections import defaultdict, deque, OrderedDict
|
||||
from collections import defaultdict, deque
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
from distutils.version import StrictVersion
|
||||
|
|
@ -22,7 +22,7 @@ import logging
|
|||
import re
|
||||
import textwrap
|
||||
import time
|
||||
from typing import Any, cast, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
|
||||
from typing import Any, cast, Dict, List, Optional, Tuple, TYPE_CHECKING
|
||||
from urllib import parse
|
||||
|
||||
import simplejson as json
|
||||
|
|
@ -157,7 +157,7 @@ class PrestoEngineSpec(BaseEngineSpec):
|
|||
|
||||
if schema:
|
||||
sql = (
|
||||
"SELECT table_name FROM information_schema.views"
|
||||
"SELECT table_name FROM information_schema.views "
|
||||
"WHERE table_schema=%(schema)s"
|
||||
)
|
||||
params = {"schema": schema}
|
||||
|
|
@ -403,43 +403,6 @@ class PrestoEngineSpec(BaseEngineSpec):
|
|||
column_clauses.append(column_clause)
|
||||
return column_clauses
|
||||
|
||||
@classmethod
|
||||
def _filter_out_array_nested_cols(
|
||||
cls, cols: List[dict]
|
||||
) -> Tuple[List[dict], List[dict]]:
|
||||
"""
|
||||
Filter out columns that correspond to array content. We know which columns to
|
||||
skip because cols is a list provided to us in a specific order where a
|
||||
structural column is positioned right before its content.
|
||||
|
||||
Example: Column Name: ColA, Column Data Type: array(row(nest_obj int))
|
||||
cols = [ ..., ColA, ColA.nest_obj, ... ]
|
||||
|
||||
When we run across an array, check if subsequent column names start with the
|
||||
array name and skip them.
|
||||
:param cols: columns
|
||||
:return: filtered list of columns and list of array columns and its nested
|
||||
fields
|
||||
"""
|
||||
filtered_cols = []
|
||||
array_cols = []
|
||||
curr_array_col_name = None
|
||||
for col in cols:
|
||||
# col corresponds to an array's content and should be skipped
|
||||
if curr_array_col_name and col["name"].startswith(curr_array_col_name):
|
||||
array_cols.append(col)
|
||||
continue
|
||||
# col is an array so we need to check if subsequent
|
||||
# columns correspond to the array's contents
|
||||
elif str(col["type"]) == "ARRAY":
|
||||
curr_array_col_name = col["name"]
|
||||
array_cols.append(col)
|
||||
filtered_cols.append(col)
|
||||
else:
|
||||
curr_array_col_name = None
|
||||
filtered_cols.append(col)
|
||||
return filtered_cols, array_cols
|
||||
|
||||
@classmethod
|
||||
def select_star( # pylint: disable=too-many-arguments
|
||||
cls,
|
||||
|
|
@ -589,276 +552,6 @@ class PrestoEngineSpec(BaseEngineSpec):
|
|||
)
|
||||
return datasource_names
|
||||
|
||||
@classmethod
|
||||
def _build_column_hierarchy(
|
||||
cls, columns: List[dict], parent_column_types: List[str], column_hierarchy: dict
|
||||
) -> None:
|
||||
"""
|
||||
Build a graph where the root node represents a column whose data type is in
|
||||
parent_column_types. A node's children represent that column's nested fields
|
||||
:param columns: list of columns
|
||||
:param parent_column_types: list of data types that decide what columns can
|
||||
be root nodes
|
||||
:param column_hierarchy: dictionary representing the graph
|
||||
"""
|
||||
if not columns:
|
||||
return
|
||||
root = columns.pop(0)
|
||||
root_info = {"type": root["type"], "children": []}
|
||||
column_hierarchy[root["name"]] = root_info
|
||||
while columns:
|
||||
column = columns[0]
|
||||
# If the column name does not start with the root's name,
|
||||
# then this column is not a nested field
|
||||
if not column["name"].startswith(f"{root['name']}."):
|
||||
break
|
||||
# If the column's data type is one of the parent types,
|
||||
# then this column may have nested fields
|
||||
if str(column["type"]) in parent_column_types:
|
||||
cls._build_column_hierarchy(
|
||||
columns, parent_column_types, column_hierarchy
|
||||
)
|
||||
root_info["children"].append(column["name"])
|
||||
continue
|
||||
else: # The column is a nested field
|
||||
root_info["children"].append(column["name"])
|
||||
columns.pop(0)
|
||||
|
||||
@classmethod
|
||||
def _create_row_and_array_hierarchy(
|
||||
cls, selected_columns: List[dict]
|
||||
) -> Tuple[dict, dict, List[dict]]:
|
||||
"""
|
||||
Build graphs where the root node represents a row or array and its children
|
||||
are that column's nested fields
|
||||
:param selected_columns: columns selected in a query
|
||||
:return: graph representing a row, graph representing an array, and a list
|
||||
of all the nested fields
|
||||
"""
|
||||
row_column_hierarchy: OrderedDict = OrderedDict()
|
||||
array_column_hierarchy: OrderedDict = OrderedDict()
|
||||
expanded_columns: List[dict] = []
|
||||
for column in selected_columns:
|
||||
if column["type"].startswith("ROW"):
|
||||
parsed_row_columns: List[dict] = []
|
||||
cls._parse_structural_column(
|
||||
column["name"], column["type"].lower(), parsed_row_columns
|
||||
)
|
||||
expanded_columns = expanded_columns + parsed_row_columns[1:]
|
||||
filtered_row_columns, array_columns = cls._filter_out_array_nested_cols(
|
||||
parsed_row_columns
|
||||
)
|
||||
cls._build_column_hierarchy(
|
||||
filtered_row_columns, ["ROW"], row_column_hierarchy
|
||||
)
|
||||
cls._build_column_hierarchy(
|
||||
array_columns, ["ROW", "ARRAY"], array_column_hierarchy
|
||||
)
|
||||
elif column["type"].startswith("ARRAY"):
|
||||
parsed_array_columns: List[dict] = []
|
||||
cls._parse_structural_column(
|
||||
column["name"], column["type"].lower(), parsed_array_columns
|
||||
)
|
||||
expanded_columns = expanded_columns + parsed_array_columns[1:]
|
||||
cls._build_column_hierarchy(
|
||||
parsed_array_columns, ["ROW", "ARRAY"], array_column_hierarchy
|
||||
)
|
||||
return row_column_hierarchy, array_column_hierarchy, expanded_columns
|
||||
|
||||
@classmethod
|
||||
def _create_empty_row_of_data(cls, columns: List[dict]) -> dict:
|
||||
"""
|
||||
Create an empty row of data
|
||||
:param columns: list of columns
|
||||
:return: dictionary representing an empty row of data
|
||||
"""
|
||||
return {column["name"]: "" for column in columns}
|
||||
|
||||
@classmethod
|
||||
def _expand_row_data(cls, datum: dict, column: str, column_hierarchy: dict) -> None:
|
||||
"""
|
||||
Separate out nested fields and its value in a row of data
|
||||
:param datum: row of data
|
||||
:param column: row column name
|
||||
:param column_hierarchy: dictionary tracking structural columns and its
|
||||
nested fields
|
||||
"""
|
||||
if column in datum:
|
||||
row_data = datum[column]
|
||||
row_children = column_hierarchy[column]["children"]
|
||||
if row_data and len(row_data) != len(row_children):
|
||||
raise Exception(
|
||||
"The number of data values and number of nested"
|
||||
"fields are not equal"
|
||||
)
|
||||
elif row_data:
|
||||
for index, data_value in enumerate(row_data):
|
||||
datum[row_children[index]] = data_value
|
||||
else:
|
||||
for row_child in row_children:
|
||||
datum[row_child] = ""
|
||||
|
||||
@classmethod
|
||||
def _split_ary_cols_by_proc_state(
|
||||
cls, array_columns: List[str], array_column_hierarchy: dict, datum: dict
|
||||
) -> Tuple[List[str], Set[str]]:
|
||||
"""
|
||||
Take a list of array columns and split them according to whether or not we are
|
||||
ready to process them from a data set
|
||||
:param array_columns: list of array columns
|
||||
:param array_column_hierarchy: graph representing array columns
|
||||
:param datum: row of data
|
||||
:return: list of array columns ready to be processed and set of array columns
|
||||
not ready to be processed
|
||||
"""
|
||||
array_columns_to_process = []
|
||||
unprocessed_array_columns = set()
|
||||
child_array = None
|
||||
for array_column in array_columns:
|
||||
if array_column in datum:
|
||||
array_columns_to_process.append(array_column)
|
||||
elif str(array_column_hierarchy[array_column]["type"]) == "ARRAY":
|
||||
child_array = array_column
|
||||
unprocessed_array_columns.add(child_array)
|
||||
elif child_array and array_column.startswith(child_array):
|
||||
unprocessed_array_columns.add(array_column)
|
||||
else:
|
||||
# array without any data
|
||||
array_columns_to_process.append(array_column)
|
||||
datum[array_column] = []
|
||||
return array_columns_to_process, unprocessed_array_columns
|
||||
|
||||
@classmethod
|
||||
def _convert_data_lst_to_ary_dict(
|
||||
cls, data: List[dict], array_columns_to_process: List[str]
|
||||
) -> dict:
|
||||
"""
|
||||
Pull out array data from rows of data into a dictionary where the key represents
|
||||
the index in the data list and the value is the array data values
|
||||
Example:
|
||||
data = [
|
||||
{'ColumnA': [1, 2], 'ColumnB': 3},
|
||||
{'ColumnA': [11, 22], 'ColumnB': 3}
|
||||
]
|
||||
data dictionary = {
|
||||
0: [{'ColumnA': [1, 2]],
|
||||
1: [{'ColumnA': [11, 22]]
|
||||
}
|
||||
:param data: rows of data
|
||||
:param array_columns_to_process: array columns we want to pull out
|
||||
:return: data dictionary
|
||||
"""
|
||||
array_data_dict = {}
|
||||
for data_index, datum in enumerate(data):
|
||||
all_array_datum = {}
|
||||
for array_column in array_columns_to_process:
|
||||
all_array_datum[array_column] = datum[array_column]
|
||||
array_data_dict[data_index] = [all_array_datum]
|
||||
return array_data_dict
|
||||
|
||||
@classmethod
|
||||
def _process_array_data( # pylint: disable=too-many-locals,too-many-branches
|
||||
cls, data: List[dict], all_columns: List[dict], array_column_hierarchy: dict
|
||||
) -> dict:
|
||||
"""
|
||||
Pull out array data that is ready to be processed into a dictionary.
|
||||
The key refers to the index in the original data set. The value is
|
||||
a list of data values. Initially this list will contain just one value,
|
||||
the row of data that corresponds to the index in the original data set.
|
||||
As we process arrays, we will pull out array values into separate rows
|
||||
and append them to the list of data values.
|
||||
Example:
|
||||
Original data set = [
|
||||
{'ColumnA': [1, 2], 'ColumnB': [3]},
|
||||
{'ColumnA': [11, 22], 'ColumnB': [33]}
|
||||
]
|
||||
all_array_data (initially) = {
|
||||
0: [{'ColumnA': [1, 2], 'ColumnB': [3}],
|
||||
1: [{'ColumnA': [11, 22], 'ColumnB': [33]}]
|
||||
}
|
||||
all_array_data (after processing) = {
|
||||
0: [
|
||||
{'ColumnA': 1, 'ColumnB': 3},
|
||||
{'ColumnA': 2, 'ColumnB': ''},
|
||||
],
|
||||
1: [
|
||||
{'ColumnA': 11, 'ColumnB': 33},
|
||||
{'ColumnA': 22, 'ColumnB': ''},
|
||||
],
|
||||
}
|
||||
:param data: rows of data
|
||||
:param all_columns: list of columns
|
||||
:param array_column_hierarchy: graph representing array columns
|
||||
:return: dictionary representing processed array data
|
||||
"""
|
||||
array_columns = list(array_column_hierarchy.keys())
|
||||
# Determine what columns are ready to be processed. This is necessary for
|
||||
# array columns that contain rows with nested arrays. We first process
|
||||
# the outer arrays before processing inner arrays.
|
||||
array_columns_to_process, unprocessed_array_columns = cls._split_ary_cols_by_proc_state( # pylint: disable=line-too-long
|
||||
array_columns, array_column_hierarchy, data[0]
|
||||
)
|
||||
|
||||
# Pull out array data that is ready to be processed into a dictionary.
|
||||
all_array_data = cls._convert_data_lst_to_ary_dict(
|
||||
data, array_columns_to_process
|
||||
)
|
||||
|
||||
for expanded_array_data in all_array_data.values():
|
||||
for array_column in array_columns:
|
||||
if array_column in unprocessed_array_columns:
|
||||
continue
|
||||
# Expand array values that are rows
|
||||
if str(array_column_hierarchy[array_column]["type"]) == "ROW":
|
||||
for array_value in expanded_array_data:
|
||||
cls._expand_row_data(
|
||||
array_value, array_column, array_column_hierarchy
|
||||
)
|
||||
continue
|
||||
array_data = expanded_array_data[0][array_column]
|
||||
array_children = array_column_hierarchy[array_column]
|
||||
# This is an empty array of primitive data type
|
||||
if not array_data and not array_children["children"]:
|
||||
continue
|
||||
# Pull out complex array values into its own row of data
|
||||
elif array_data and array_children["children"]:
|
||||
for array_index, data_value in enumerate(array_data):
|
||||
if array_index >= len(expanded_array_data):
|
||||
empty_data = cls._create_empty_row_of_data(all_columns)
|
||||
expanded_array_data.append(empty_data)
|
||||
for index, datum_value in enumerate(data_value):
|
||||
array_child = array_children["children"][index]
|
||||
expanded_array_data[array_index][array_child] = datum_value
|
||||
# Pull out primitive array values into its own row of data
|
||||
elif array_data:
|
||||
for array_index, data_value in enumerate(array_data):
|
||||
if array_index >= len(expanded_array_data):
|
||||
empty_data = cls._create_empty_row_of_data(all_columns)
|
||||
expanded_array_data.append(empty_data)
|
||||
expanded_array_data[array_index][array_column] = data_value
|
||||
# This is an empty array with nested fields
|
||||
else:
|
||||
for index, array_child in enumerate(array_children["children"]):
|
||||
for array_value in expanded_array_data:
|
||||
array_value[array_child] = ""
|
||||
return all_array_data
|
||||
|
||||
@classmethod
|
||||
def _remove_processed_array_columns(
|
||||
cls, unprocessed_array_columns: Set[str], array_column_hierarchy: dict
|
||||
) -> None:
|
||||
"""
|
||||
Remove keys representing array columns that have already been processed
|
||||
:param unprocessed_array_columns: list of unprocessed array columns
|
||||
:param array_column_hierarchy: graph representing array columns
|
||||
"""
|
||||
array_columns = list(array_column_hierarchy.keys())
|
||||
for array_column in array_columns:
|
||||
if array_column in unprocessed_array_columns:
|
||||
continue
|
||||
else:
|
||||
del array_column_hierarchy[array_column]
|
||||
|
||||
@classmethod
|
||||
def expand_data( # pylint: disable=too-many-locals
|
||||
cls, columns: List[dict], data: List[dict]
|
||||
|
|
|
|||
|
|
@ -454,167 +454,6 @@ class DbEngineSpecsTestCase(SupersetTestCase):
|
|||
self.assertEqual(actual_result.element.name, expected_result["name"])
|
||||
self.assertEqual(actual_result.name, expected_result["label"])
|
||||
|
||||
def test_presto_filter_out_array_nested_cols(self):
|
||||
cols = [
|
||||
{"name": "column", "type": "ARRAY"},
|
||||
{"name": "column.nested_obj", "type": "FLOAT"},
|
||||
]
|
||||
actual_filtered_cols, actual_array_cols = PrestoEngineSpec._filter_out_array_nested_cols( # noqa ignore: E50
|
||||
cols
|
||||
)
|
||||
expected_filtered_cols = [{"name": "column", "type": "ARRAY"}]
|
||||
self.assertEqual(actual_filtered_cols, expected_filtered_cols)
|
||||
self.assertEqual(actual_array_cols, cols)
|
||||
|
||||
def test_presto_create_row_and_array_hierarchy(self):
|
||||
cols = [
|
||||
{
|
||||
"name": "row_column",
|
||||
"type": "ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 VARCHAR)",
|
||||
},
|
||||
{
|
||||
"name": "array_column",
|
||||
"type": "ARRAY(ROW(NESTED_ARRAY ARRAY(ROW(NESTED_OBJ VARCHAR))))",
|
||||
},
|
||||
]
|
||||
actual_row_col_hierarchy, actual_array_col_hierarchy, actual_expanded_cols = PrestoEngineSpec._create_row_and_array_hierarchy( # noqa ignore: E50
|
||||
cols
|
||||
)
|
||||
expected_row_col_hierarchy = {
|
||||
"row_column": {
|
||||
"type": "ROW",
|
||||
"children": ["row_column.nested_obj1", "row_column.nested_row"],
|
||||
},
|
||||
"row_column.nested_row": {
|
||||
"type": "ROW",
|
||||
"children": ["row_column.nested_row.nested_obj2"],
|
||||
},
|
||||
}
|
||||
expected_array_col_hierarchy = {
|
||||
"array_column": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array"],
|
||||
},
|
||||
"array_column.nested_array": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array.nested_obj"],
|
||||
},
|
||||
}
|
||||
expected_expanded_cols = [
|
||||
{"name": "row_column.nested_obj1", "type": "VARCHAR"},
|
||||
{"name": "row_column.nested_row", "type": "ROW"},
|
||||
{"name": "row_column.nested_row.nested_obj2", "type": "VARCHAR"},
|
||||
{"name": "array_column.nested_array", "type": "ARRAY"},
|
||||
{"name": "array_column.nested_array.nested_obj", "type": "VARCHAR"},
|
||||
]
|
||||
self.assertEqual(actual_row_col_hierarchy, expected_row_col_hierarchy)
|
||||
self.assertEqual(actual_array_col_hierarchy, expected_array_col_hierarchy)
|
||||
self.assertEqual(actual_expanded_cols, expected_expanded_cols)
|
||||
|
||||
def test_presto_expand_row_data(self):
|
||||
datum = {"row_col": [1, "a"]}
|
||||
row_column = "row_col"
|
||||
row_col_hierarchy = {
|
||||
"row_col": {
|
||||
"type": "ROW",
|
||||
"children": ["row_col.nested_int", "row_col.nested_str"],
|
||||
}
|
||||
}
|
||||
PrestoEngineSpec._expand_row_data(datum, row_column, row_col_hierarchy)
|
||||
expected_datum = {
|
||||
"row_col": [1, "a"],
|
||||
"row_col.nested_int": 1,
|
||||
"row_col.nested_str": "a",
|
||||
}
|
||||
self.assertEqual(datum, expected_datum)
|
||||
|
||||
def test_presto_split_ary_cols_by_proc_state(self):
|
||||
array_cols = ["array_column", "array_column.nested_array"]
|
||||
array_col_hierarchy = {
|
||||
"array_column": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array"],
|
||||
},
|
||||
"array_column.nested_array": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array.nested_obj"],
|
||||
},
|
||||
}
|
||||
datum = {"array_column": [[[1], [2]]]}
|
||||
actual_array_cols_to_process, actual_unprocessed_array_cols = PrestoEngineSpec._split_ary_cols_by_proc_state( # noqa ignore: E50
|
||||
array_cols, array_col_hierarchy, datum
|
||||
)
|
||||
expected_array_cols_to_process = ["array_column"]
|
||||
expected_unprocessed_array_cols = {"array_column.nested_array"}
|
||||
self.assertEqual(actual_array_cols_to_process, expected_array_cols_to_process)
|
||||
self.assertEqual(actual_unprocessed_array_cols, expected_unprocessed_array_cols)
|
||||
|
||||
def test_presto_convert_data_lst_to_ary_dict(self):
|
||||
data = [
|
||||
{"array_column": [1, 2], "int_column": 3},
|
||||
{"array_column": [11, 22], "int_column": 33},
|
||||
]
|
||||
array_columns_to_process = ["array_column"]
|
||||
actual_array_data_dict = PrestoEngineSpec._convert_data_lst_to_ary_dict(
|
||||
data, array_columns_to_process
|
||||
)
|
||||
expected_array_data_dict = {
|
||||
0: [{"array_column": [1, 2]}],
|
||||
1: [{"array_column": [11, 22]}],
|
||||
}
|
||||
self.assertEqual(actual_array_data_dict, expected_array_data_dict)
|
||||
|
||||
def test_presto_process_array_data(self):
|
||||
data = [
|
||||
{"array_column": [[1], [2]], "int_column": 3},
|
||||
{"array_column": [[11], [22]], "int_column": 33},
|
||||
]
|
||||
all_columns = [
|
||||
{"name": "array_column", "type": "ARRAY"},
|
||||
{"name": "array_column.nested_row", "type": "BIGINT"},
|
||||
{"name": "int_column", "type": "BIGINT"},
|
||||
]
|
||||
array_column_hierarchy = {
|
||||
"array_column": {"type": "ARRAY", "children": ["array_column.nested_row"]}
|
||||
}
|
||||
actual_array_data = PrestoEngineSpec._process_array_data(
|
||||
data, all_columns, array_column_hierarchy
|
||||
)
|
||||
expected_array_data = {
|
||||
0: [
|
||||
{"array_column": [[1], [2]], "array_column.nested_row": 1},
|
||||
{"array_column": "", "array_column.nested_row": 2, "int_column": ""},
|
||||
],
|
||||
1: [
|
||||
{"array_column": [[11], [22]], "array_column.nested_row": 11},
|
||||
{"array_column": "", "array_column.nested_row": 22, "int_column": ""},
|
||||
],
|
||||
}
|
||||
self.assertEqual(actual_array_data, expected_array_data)
|
||||
|
||||
def test_presto_remove_processed_array_columns(self):
|
||||
array_col_hierarchy = {
|
||||
"array_column": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array"],
|
||||
},
|
||||
"array_column.nested_array": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array.nested_obj"],
|
||||
},
|
||||
}
|
||||
unprocessed_array_cols = {"array_column.nested_array"}
|
||||
PrestoEngineSpec._remove_processed_array_columns(
|
||||
unprocessed_array_cols, array_col_hierarchy
|
||||
)
|
||||
expected_array_col_hierarchy = {
|
||||
"array_column.nested_array": {
|
||||
"type": "ARRAY",
|
||||
"children": ["array_column.nested_array.nested_obj"],
|
||||
}
|
||||
}
|
||||
self.assertEqual(array_col_hierarchy, expected_array_col_hierarchy)
|
||||
|
||||
@mock.patch.dict(
|
||||
"superset._feature_flags", {"PRESTO_EXPAND_DATA": True}, clear=True
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue