feat: add renameOperator (#19776)

This commit is contained in:
Yongjie Zhao 2022-04-20 19:48:12 +08:00 committed by GitHub
parent f06db796b5
commit 3c28cd4625
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 512 additions and 71 deletions

View File

@ -17,21 +17,12 @@
* specific language governing permissions and limitationsxw
* under the License.
*/
import { ensureIsArray, PostProcessingFlatten } from '@superset-ui/core';
import { PostProcessingFlatten } from '@superset-ui/core';
import { PostProcessingFactory } from './types';
export const flattenOperator: PostProcessingFactory<PostProcessingFlatten> = (
formData,
queryObject,
) => {
const drop_levels: number[] = [];
if (ensureIsArray(queryObject.metrics).length === 1) {
drop_levels.push(0);
}
return {
operation: 'flatten',
options: {
drop_levels,
},
};
};
) => ({
operation: 'flatten',
});

View File

@ -23,6 +23,7 @@ export { timeComparePivotOperator } from './timeComparePivotOperator';
export { sortOperator } from './sortOperator';
export { pivotOperator } from './pivotOperator';
export { resampleOperator } from './resampleOperator';
export { renameOperator } from './renameOperator';
export { contributionOperator } from './contributionOperator';
export { prophetOperator } from './prophetOperator';
export { boxplotOperator } from './boxplotOperator';

View File

@ -0,0 +1,89 @@
/* eslint-disable camelcase */
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitationsxw
* under the License.
*/
import {
PostProcessingRename,
ensureIsArray,
getMetricLabel,
ComparisionType,
} from '@superset-ui/core';
import { PostProcessingFactory } from './types';
import { getMetricOffsetsMap, isValidTimeCompare } from './utils';
export const renameOperator: PostProcessingFactory<PostProcessingRename> = (
formData,
queryObject,
) => {
const metrics = ensureIsArray(queryObject.metrics);
const columns = ensureIsArray(queryObject.columns);
const { x_axis: xAxis } = formData;
// remove or rename top level of column name(metric name) in the MultiIndex when
// 1) only 1 metric
// 2) exist dimentsion
// 3) exist xAxis
// 4) exist time comparison, and comparison type is "actual values"
if (
metrics.length === 1 &&
columns.length > 0 &&
(xAxis || queryObject.is_timeseries) &&
!(
// todo: we should provide an approach to handle derived metrics
(
isValidTimeCompare(formData, queryObject) &&
[
ComparisionType.Difference,
ComparisionType.Ratio,
ComparisionType.Percentage,
].includes(formData.comparison_type)
)
)
) {
const renamePairs: [string, string | null][] = [];
if (
// "actual values" will add derived metric.
// we will rename the "metric" from the metricWithOffset label
// for example: "count__1 year ago" => "1 year ago"
isValidTimeCompare(formData, queryObject) &&
formData.comparison_type === ComparisionType.Values
) {
const metricOffsetMap = getMetricOffsetsMap(formData, queryObject);
const timeOffsets = ensureIsArray(formData.time_compare);
[...metricOffsetMap.keys()].forEach(metricWithOffset => {
const offsetLabel = timeOffsets.find(offset =>
metricWithOffset.includes(offset),
);
renamePairs.push([metricWithOffset, offsetLabel]);
});
}
renamePairs.push([getMetricLabel(metrics[0]), null]);
return {
operation: 'rename',
options: {
columns: Object.fromEntries(renamePairs),
level: 0,
inplace: true,
},
};
}
return undefined;
};

View File

@ -51,40 +51,9 @@ const queryObject: QueryObject = {
},
],
};
const singleMetricQueryObject: QueryObject = {
metrics: ['count(*)'],
time_range: '2015 : 2016',
granularity: 'month',
post_processing: [
{
operation: 'pivot',
options: {
index: ['__timestamp'],
columns: ['nation'],
aggregates: {
'count(*)': {
operator: 'sum',
},
},
},
},
],
};
test('should do flattenOperator', () => {
expect(flattenOperator(formData, queryObject)).toEqual({
operation: 'flatten',
options: {
drop_levels: [],
},
});
});
test('should add drop level', () => {
expect(flattenOperator(formData, singleMetricQueryObject)).toEqual({
operation: 'flatten',
options: {
drop_levels: [0],
},
});
});

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ComparisionType, QueryObject, SqlaFormData } from '@superset-ui/core';
import { renameOperator } from '@superset-ui/chart-controls';
const formData: SqlaFormData = {
x_axis: 'dttm',
metrics: ['count(*)'],
groupby: ['gender'],
time_range: '2015 : 2016',
granularity: 'month',
datasource: 'foo',
viz_type: 'table',
};
const queryObject: QueryObject = {
is_timeseries: true,
metrics: ['count(*)'],
columns: ['gender', 'dttm'],
time_range: '2015 : 2016',
granularity: 'month',
post_processing: [],
};
test('should skip renameOperator if exists multiple metrics', () => {
expect(
renameOperator(formData, {
...queryObject,
...{
metrics: ['count(*)', 'sum(sales)'],
},
}),
).toEqual(undefined);
});
test('should skip renameOperator if does not exist series', () => {
expect(
renameOperator(formData, {
...queryObject,
...{
columns: [],
},
}),
).toEqual(undefined);
});
test('should skip renameOperator if does not exist x_axis and is_timeseries', () => {
expect(
renameOperator(
{
...formData,
...{ x_axis: null },
},
{ ...queryObject, ...{ is_timeseries: false } },
),
).toEqual(undefined);
});
test('should skip renameOperator if exists derived metrics', () => {
[
ComparisionType.Difference,
ComparisionType.Ratio,
ComparisionType.Percentage,
].forEach(type => {
expect(
renameOperator(
{
...formData,
...{
comparison_type: type,
time_compare: ['1 year ago'],
},
},
{
...queryObject,
...{
metrics: ['count(*)'],
},
},
),
).toEqual(undefined);
});
});
test('should add renameOperator', () => {
expect(renameOperator(formData, queryObject)).toEqual({
operation: 'rename',
options: { columns: { 'count(*)': null }, inplace: true, level: 0 },
});
});
test('should add renameOperator if does not exist x_axis', () => {
expect(
renameOperator(
{
...formData,
...{ x_axis: null },
},
queryObject,
),
).toEqual({
operation: 'rename',
options: { columns: { 'count(*)': null }, inplace: true, level: 0 },
});
});
test('should add renameOperator if exist "actual value" time comparison', () => {
expect(
renameOperator(
{
...formData,
...{
comparison_type: ComparisionType.Values,
time_compare: ['1 year ago', '1 year later'],
},
},
queryObject,
),
).toEqual({
operation: 'rename',
options: {
columns: {
'count(*)': null,
'count(*)__1 year ago': '1 year ago',
'count(*)__1 year later': '1 year later',
},
inplace: true,
level: 0,
},
});
});

View File

@ -201,6 +201,18 @@ export type PostProcessingResample =
| _PostProcessingResample
| DefaultPostProcessing;
interface _PostProcessingRename {
operation: 'rename';
options: {
columns: Record<string, string | null>;
inplace?: boolean;
level?: number | string;
};
}
export type PostProcessingRename =
| _PostProcessingRename
| DefaultPostProcessing;
interface _PostProcessingFlatten {
operation: 'flatten';
options?: {
@ -228,6 +240,7 @@ export type PostProcessingRule =
| PostProcessingCompare
| PostProcessingSort
| PostProcessingResample
| PostProcessingRename
| PostProcessingFlatten;
export function isPostProcessingAggregation(

View File

@ -30,6 +30,7 @@ import {
isValidTimeCompare,
pivotOperator,
resampleOperator,
renameOperator,
contributionOperator,
prophetOperator,
timeComparePivotOperator,
@ -91,7 +92,12 @@ export default function buildQuery(formData: QueryFormData) {
rollingWindowOperator(formData, baseQueryObject),
timeCompareOperator(formData, baseQueryObject),
resampleOperator(formData, baseQueryObject),
renameOperator(formData, {
...baseQueryObject,
...{ is_timeseries },
}),
flattenOperator(formData, baseQueryObject),
// todo: move contribution and prophet before flatten
contributionOperator(formData, baseQueryObject),
prophetOperator(formData, baseQueryObject),
],

View File

@ -17,6 +17,7 @@
# pylint: disable=too-many-lines
from __future__ import annotations
import inspect
from typing import Any, Dict, Optional, TYPE_CHECKING
from flask_babel import gettext as _
@ -27,7 +28,7 @@ from marshmallow_enum import EnumField
from superset import app
from superset.common.chart_data import ChartDataResultFormat, ChartDataResultType
from superset.db_engine_specs.base import builtin_time_grains
from superset.utils import schema as utils
from superset.utils import pandas_postprocessing, schema as utils
from superset.utils.core import (
AnnotationType,
FilterOperator,
@ -770,24 +771,12 @@ class ChartDataPostProcessingOperationSchema(Schema):
description="Post processing operation type",
required=True,
validate=validate.OneOf(
choices=(
"aggregate",
"boxplot",
"contribution",
"cum",
"geodetic_parse",
"geohash_decode",
"geohash_encode",
"pivot",
"prophet",
"rolling",
"select",
"sort",
"diff",
"compare",
"resample",
"flatten",
)
choices=[
name
for name, value in inspect.getmembers(
pandas_postprocessing, inspect.isfunction
)
]
),
example="aggregate",
)

View File

@ -28,6 +28,7 @@ from superset.utils.pandas_postprocessing.geography import (
)
from superset.utils.pandas_postprocessing.pivot import pivot
from superset.utils.pandas_postprocessing.prophet import prophet
from superset.utils.pandas_postprocessing.rename import rename
from superset.utils.pandas_postprocessing.resample import resample
from superset.utils.pandas_postprocessing.rolling import rolling
from superset.utils.pandas_postprocessing.select import select
@ -46,6 +47,7 @@ __all__ = [
"geodetic_parse",
"pivot",
"prophet",
"rename",
"resample",
"rolling",
"select",

View File

@ -81,14 +81,16 @@ def flatten(
"""
if _is_multi_index_on_columns(df):
df.columns = df.columns.droplevel(drop_levels)
# every cell should be converted to string
df.columns = [
FLAT_COLUMN_SEPARATOR.join(
# pylint: disable=superfluous-parens
[str(cell) for cell in (series if is_sequence(series) else [series])]
)
for series in df.columns.to_flat_index()
]
_columns = []
for series in df.columns.to_flat_index():
_cells = []
for cell in series if is_sequence(series) else [series]:
if pd.notnull(cell):
# every cell should be converted to string
_cells.append(str(cell))
_columns.append(FLAT_COLUMN_SEPARATOR.join(_cells))
df.columns = _columns
if reset_index and not isinstance(df.index, pd.RangeIndex):
df = df.reset_index(level=0)

View File

@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict, Optional, Union
import pandas as pd
from flask_babel import gettext as _
from pandas._typing import Level
from superset.exceptions import InvalidPostProcessingError
from superset.utils.pandas_postprocessing.utils import validate_column_args
@validate_column_args("columns")
def rename(
df: pd.DataFrame,
columns: Dict[str, Union[str, None]],
inplace: bool = False,
level: Optional[Level] = None,
) -> pd.DataFrame:
"""
Alter column name of DataFrame
:param df: DataFrame to rename.
:param columns: The offset string representing target conversion.
:param inplace: Whether to return a new DataFrame.
:param level: In case of a MultiIndex, only rename labels in the specified level.
:return: DataFrame after rename
:raises InvalidPostProcessingError: If the request is unexpected
"""
if not columns:
return df
try:
_rename_level = df.columns.get_level_values(level=level)
except (IndexError, KeyError) as err:
raise InvalidPostProcessingError from err
if all(new_name in _rename_level for new_name in columns.values()):
raise InvalidPostProcessingError(_("Label already exists"))
if inplace:
df.rename(columns=columns, inplace=inplace, level=level)
return df
return df.rename(columns=columns, inplace=inplace, level=level)

View File

@ -0,0 +1,175 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pandas as pd
import pytest
from superset.exceptions import InvalidPostProcessingError
from superset.utils import pandas_postprocessing as pp
from tests.unit_tests.fixtures.dataframes import categories_df
def test_rename_should_not_side_effect():
_categories_df = categories_df.copy()
pp.rename(
df=_categories_df,
columns={
"constant": "constant_newname",
"category": "category_namename",
},
)
assert _categories_df.equals(categories_df)
def test_rename():
new_categories_df = pp.rename(
df=categories_df,
columns={
"constant": "constant_newname",
"category": "category_newname",
},
)
assert list(new_categories_df.columns.values) == [
"constant_newname",
"category_newname",
"dept",
"name",
"asc_idx",
"desc_idx",
"idx_nulls",
]
assert not new_categories_df.equals(categories_df)
def test_should_inplace_rename():
_categories_df = categories_df.copy()
_categories_df_inplaced = pp.rename(
df=_categories_df,
columns={
"constant": "constant_newname",
"category": "category_namename",
},
inplace=True,
)
assert _categories_df_inplaced.equals(_categories_df)
def test_should_rename_on_level():
iterables = [["m1", "m2"], ["a", "b"], ["x", "y"]]
columns = pd.MultiIndex.from_product(iterables, names=[None, "level1", "level2"])
df = pd.DataFrame(index=[0, 1, 2], columns=columns, data=1)
"""
m1 m2
level1 a b a b
level2 x y x y x y x y
0 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1
2 1 1 1 1 1 1 1 1
"""
post_df = pp.rename(
df=df,
columns={"m1": "new_m1"},
level=0,
)
assert post_df.columns.get_level_values(level=0).equals(
pd.Index(
[
"new_m1",
"new_m1",
"new_m1",
"new_m1",
"m2",
"m2",
"m2",
"m2",
]
)
)
def test_should_raise_exception_no_column():
with pytest.raises(InvalidPostProcessingError):
pp.rename(
df=categories_df,
columns={
"foobar": "foobar2",
},
)
def test_should_raise_exception_duplication():
with pytest.raises(InvalidPostProcessingError):
pp.rename(
df=categories_df,
columns={
"constant": "category",
},
)
def test_should_raise_exception_duplication_on_multiindx():
iterables = [["m1", "m2"], ["a", "b"], ["x", "y"]]
columns = pd.MultiIndex.from_product(iterables, names=[None, "level1", "level2"])
df = pd.DataFrame(index=[0, 1, 2], columns=columns, data=1)
"""
m1 m2
level1 a b a b
level2 x y x y x y x y
0 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1
2 1 1 1 1 1 1 1 1
"""
with pytest.raises(InvalidPostProcessingError):
pp.rename(
df=df,
columns={
"m1": "m2",
},
level=0,
)
pp.rename(
df=df,
columns={
"a": "b",
},
level=1,
)
def test_should_raise_exception_invalid_level():
with pytest.raises(InvalidPostProcessingError):
pp.rename(
df=categories_df,
columns={
"constant": "new_constant",
},
level=100,
)
pp.rename(
df=categories_df,
columns={
"constant": "new_constant",
},
level="xxxxx",
)
def test_should_return_df_empty_columns():
assert pp.rename(
df=categories_df,
columns={},
).equals(categories_df)