feat(advanced analytics): support groupby in resample (#18045)
This commit is contained in:
parent
8dea7f500b
commit
0c7f7288d8
|
|
@ -17,7 +17,11 @@
|
|||
* specific language governing permissions and limitationsxw
|
||||
* under the License.
|
||||
*/
|
||||
import { PostProcessingResample } from '@superset-ui/core';
|
||||
import {
|
||||
ensureIsArray,
|
||||
isPhysicalColumn,
|
||||
PostProcessingResample,
|
||||
} from '@superset-ui/core';
|
||||
import { PostProcessingFactory } from './types';
|
||||
import { TIME_COLUMN } from './utils';
|
||||
|
||||
|
|
@ -28,6 +32,13 @@ export const resampleOperator: PostProcessingFactory<
|
|||
const resampleMethod = resampleZeroFill ? 'asfreq' : formData.resample_method;
|
||||
const resampleRule = formData.resample_rule;
|
||||
if (resampleMethod && resampleRule) {
|
||||
const groupby_columns = ensureIsArray(queryObject.columns).map(column => {
|
||||
if (isPhysicalColumn(column)) {
|
||||
return column;
|
||||
}
|
||||
return column.label;
|
||||
});
|
||||
|
||||
return {
|
||||
operation: 'resample',
|
||||
options: {
|
||||
|
|
@ -35,6 +46,7 @@ export const resampleOperator: PostProcessingFactory<
|
|||
rule: resampleRule,
|
||||
fill_value: resampleZeroFill ? 0 : null,
|
||||
time_column: TIME_COLUMN,
|
||||
groupby_columns,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,8 +16,8 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
import { QueryObject, SqlaFormData } from '@superset-ui/core';
|
||||
import { resampleOperator } from '../../../src';
|
||||
import { AdhocColumn, QueryObject, SqlaFormData } from '@superset-ui/core';
|
||||
import { resampleOperator } from '@superset-ui/chart-controls';
|
||||
|
||||
const formData: SqlaFormData = {
|
||||
metrics: [
|
||||
|
|
@ -75,6 +75,7 @@ test('should do resample', () => {
|
|||
rule: '1D',
|
||||
fill_value: null,
|
||||
time_column: '__timestamp',
|
||||
groupby_columns: [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
|
@ -92,6 +93,80 @@ test('should do zerofill resample', () => {
|
|||
rule: '1D',
|
||||
fill_value: 0,
|
||||
time_column: '__timestamp',
|
||||
groupby_columns: [],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('should append physical column to resample', () => {
|
||||
expect(
|
||||
resampleOperator(
|
||||
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
|
||||
{ ...queryObject, columns: ['column1', 'column2'] },
|
||||
),
|
||||
).toEqual({
|
||||
operation: 'resample',
|
||||
options: {
|
||||
method: 'asfreq',
|
||||
rule: '1D',
|
||||
fill_value: 0,
|
||||
time_column: '__timestamp',
|
||||
groupby_columns: ['column1', 'column2'],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('should append label of adhoc column and physical column to resample', () => {
|
||||
expect(
|
||||
resampleOperator(
|
||||
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
|
||||
{
|
||||
...queryObject,
|
||||
columns: [
|
||||
{
|
||||
hasCustomLabel: true,
|
||||
label: 'concat_a_b',
|
||||
expressionType: 'SQL',
|
||||
sqlExpression: "'a' + 'b'",
|
||||
} as AdhocColumn,
|
||||
'column2',
|
||||
],
|
||||
},
|
||||
),
|
||||
).toEqual({
|
||||
operation: 'resample',
|
||||
options: {
|
||||
method: 'asfreq',
|
||||
rule: '1D',
|
||||
fill_value: 0,
|
||||
time_column: '__timestamp',
|
||||
groupby_columns: ['concat_a_b', 'column2'],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('should append `undefined` if adhoc non-existing label', () => {
|
||||
expect(
|
||||
resampleOperator(
|
||||
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
|
||||
{
|
||||
...queryObject,
|
||||
columns: [
|
||||
{
|
||||
sqlExpression: "'a' + 'b'",
|
||||
} as AdhocColumn,
|
||||
'column2',
|
||||
],
|
||||
},
|
||||
),
|
||||
).toEqual({
|
||||
operation: 'resample',
|
||||
options: {
|
||||
method: 'asfreq',
|
||||
rule: '1D',
|
||||
fill_value: 0,
|
||||
time_column: '__timestamp',
|
||||
groupby_columns: [undefined, 'column2'],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -167,6 +167,9 @@ export interface PostProcessingResample {
|
|||
rule: string;
|
||||
fill_value?: number | null;
|
||||
time_column: string;
|
||||
// If AdhocColumn doesn't have a label, it will be undefined.
|
||||
// todo: we have to give an explicit label for AdhocColumn.
|
||||
groupby_columns?: Array<string | undefined>;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -958,27 +958,41 @@ def boxplot(
|
|||
return aggregate(df, groupby=groupby, aggregates=aggregates)
|
||||
|
||||
|
||||
def resample(
|
||||
@validate_column_args("groupby_columns")
|
||||
def resample( # pylint: disable=too-many-arguments
|
||||
df: DataFrame,
|
||||
rule: str,
|
||||
method: str,
|
||||
time_column: str,
|
||||
groupby_columns: Optional[Tuple[Optional[str], ...]] = None,
|
||||
fill_value: Optional[Union[float, int]] = None,
|
||||
) -> DataFrame:
|
||||
"""
|
||||
resample a timeseries dataframe.
|
||||
support upsampling in resample
|
||||
|
||||
:param df: DataFrame to resample.
|
||||
:param rule: The offset string representing target conversion.
|
||||
:param method: How to fill the NaN value after resample.
|
||||
:param time_column: existing columns in DataFrame.
|
||||
:param groupby_columns: columns except time_column in dataframe
|
||||
:param fill_value: What values do fill missing.
|
||||
:return: DataFrame after resample
|
||||
:raises QueryObjectValidationError: If the request in incorrect
|
||||
"""
|
||||
df = df.set_index(time_column)
|
||||
if method == "asfreq" and fill_value is not None:
|
||||
df = df.resample(rule).asfreq(fill_value=fill_value)
|
||||
|
||||
def _upsampling(_df: DataFrame) -> DataFrame:
|
||||
_df = _df.set_index(time_column)
|
||||
if method == "asfreq" and fill_value is not None:
|
||||
return _df.resample(rule).asfreq(fill_value=fill_value)
|
||||
return getattr(_df.resample(rule), method)()
|
||||
|
||||
if groupby_columns:
|
||||
df = (
|
||||
df.set_index(keys=list(groupby_columns))
|
||||
.groupby(by=list(groupby_columns))
|
||||
.apply(_upsampling)
|
||||
)
|
||||
df = df.reset_index().set_index(time_column).sort_index()
|
||||
else:
|
||||
df = getattr(df.resample(rule), method)()
|
||||
df = _upsampling(df)
|
||||
return df.reset_index()
|
||||
|
|
|
|||
|
|
@ -1029,3 +1029,70 @@ class TestPostProcessing(SupersetTestCase):
|
|||
)
|
||||
self.assertListEqual(post_df["label"].tolist(), ["x", "y", 0, 0, "z", 0, "q"])
|
||||
self.assertListEqual(post_df["y"].tolist(), [1.0, 2.0, 0, 0, 3.0, 0, 4.0])
|
||||
|
||||
def test_resample_with_groupby(self):
|
||||
"""
|
||||
The Dataframe contains a timestamp column, a string column and a numeric column.
|
||||
__timestamp city val
|
||||
0 2022-01-13 Chicago 6.0
|
||||
1 2022-01-13 LA 5.0
|
||||
2 2022-01-13 NY 4.0
|
||||
3 2022-01-11 Chicago 3.0
|
||||
4 2022-01-11 LA 2.0
|
||||
5 2022-01-11 NY 1.0
|
||||
"""
|
||||
df = DataFrame(
|
||||
{
|
||||
"__timestamp": to_datetime(
|
||||
[
|
||||
"2022-01-13",
|
||||
"2022-01-13",
|
||||
"2022-01-13",
|
||||
"2022-01-11",
|
||||
"2022-01-11",
|
||||
"2022-01-11",
|
||||
]
|
||||
),
|
||||
"city": ["Chicago", "LA", "NY", "Chicago", "LA", "NY"],
|
||||
"val": [6.0, 5.0, 4.0, 3.0, 2.0, 1.0],
|
||||
}
|
||||
)
|
||||
post_df = proc.resample(
|
||||
df=df,
|
||||
rule="1D",
|
||||
method="asfreq",
|
||||
fill_value=0,
|
||||
time_column="__timestamp",
|
||||
groupby_columns=("city",),
|
||||
)
|
||||
assert list(post_df.columns) == [
|
||||
"__timestamp",
|
||||
"city",
|
||||
"val",
|
||||
]
|
||||
assert [str(dt.date()) for dt in post_df["__timestamp"]] == (
|
||||
["2022-01-11"] * 3 + ["2022-01-12"] * 3 + ["2022-01-13"] * 3
|
||||
)
|
||||
assert list(post_df["val"]) == [3.0, 2.0, 1.0, 0, 0, 0, 6.0, 5.0, 4.0]
|
||||
|
||||
# should raise error when get a non-existent column
|
||||
with pytest.raises(QueryObjectValidationError):
|
||||
proc.resample(
|
||||
df=df,
|
||||
rule="1D",
|
||||
method="asfreq",
|
||||
fill_value=0,
|
||||
time_column="__timestamp",
|
||||
groupby_columns=("city", "unkonw_column",),
|
||||
)
|
||||
|
||||
# should raise error when get a None value in groupby list
|
||||
with pytest.raises(QueryObjectValidationError):
|
||||
proc.resample(
|
||||
df=df,
|
||||
rule="1D",
|
||||
method="asfreq",
|
||||
fill_value=0,
|
||||
time_column="__timestamp",
|
||||
groupby_columns=("city", None,),
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue