fix: Contribution percentages for ECharts plugins (#28368)

This commit is contained in:
Michael S. Molina 2024-05-08 15:54:21 -03:00 committed by GitHub
parent 9e4ba6762f
commit 55f3b46f41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 164 additions and 40 deletions

View File

@ -22,12 +22,13 @@ import { PostProcessingFactory } from './types';
/* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-unused-vars */
export const contributionOperator: PostProcessingFactory< export const contributionOperator: PostProcessingFactory<
PostProcessingContribution PostProcessingContribution
> = (formData, queryObject) => { > = (formData, queryObject, time_shifts) => {
if (formData.contributionMode) { if (formData.contributionMode) {
return { return {
operation: 'contribution', operation: 'contribution',
options: { options: {
orientation: formData.contributionMode, orientation: formData.contributionMode,
time_shifts,
}, },
}; };
} }

View File

@ -78,6 +78,10 @@ export default function buildQuery(formData: QueryFormData) {
...ensureIsArray(groupby), ...ensureIsArray(groupby),
]; ];
const time_offsets = isTimeComparison(formData, baseQueryObject)
? formData.time_compare
: [];
return [ return [
{ {
...baseQueryObject, ...baseQueryObject,
@ -87,9 +91,7 @@ export default function buildQuery(formData: QueryFormData) {
...(isXAxisSet(formData) ? {} : { is_timeseries: true }), ...(isXAxisSet(formData) ? {} : { is_timeseries: true }),
// todo: move `normalizeOrderBy to extractQueryFields` // todo: move `normalizeOrderBy to extractQueryFields`
orderby: normalizeOrderBy(baseQueryObject).orderby, orderby: normalizeOrderBy(baseQueryObject).orderby,
time_offsets: isTimeComparison(formData, baseQueryObject) time_offsets,
? formData.time_compare
: [],
/* Note that: /* Note that:
1. The resample, rolling, cum, timeCompare operators should be after pivot. 1. The resample, rolling, cum, timeCompare operators should be after pivot.
2. the flatOperator makes multiIndex Dataframe into flat Dataframe 2. the flatOperator makes multiIndex Dataframe into flat Dataframe
@ -100,7 +102,7 @@ export default function buildQuery(formData: QueryFormData) {
timeCompareOperator(formData, baseQueryObject), timeCompareOperator(formData, baseQueryObject),
resampleOperator(formData, baseQueryObject), resampleOperator(formData, baseQueryObject),
renameOperator(formData, baseQueryObject), renameOperator(formData, baseQueryObject),
contributionOperator(formData, baseQueryObject), contributionOperator(formData, baseQueryObject, time_offsets),
sortOperator(formData, baseQueryObject), sortOperator(formData, baseQueryObject),
flattenOperator(formData, baseQueryObject), flattenOperator(formData, baseQueryObject),
// todo: move prophet before flatten // todo: move prophet before flatten

View File

@ -15,10 +15,10 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
from decimal import Decimal from decimal import Decimal
from typing import Optional from typing import Any
from flask_babel import gettext as _ from flask_babel import gettext as _
from pandas import DataFrame from pandas import DataFrame, MultiIndex
from superset.exceptions import InvalidPostProcessingError from superset.exceptions import InvalidPostProcessingError
from superset.utils.core import PostProcessingContributionOrientation from superset.utils.core import PostProcessingContributionOrientation
@ -28,11 +28,12 @@ from superset.utils.pandas_postprocessing.utils import validate_column_args
@validate_column_args("columns") @validate_column_args("columns")
def contribution( def contribution(
df: DataFrame, df: DataFrame,
orientation: Optional[ orientation: (
PostProcessingContributionOrientation PostProcessingContributionOrientation | None
] = PostProcessingContributionOrientation.COLUMN, ) = PostProcessingContributionOrientation.COLUMN,
columns: Optional[list[str]] = None, columns: list[str] | None = None,
rename_columns: Optional[list[str]] = None, time_shifts: list[str] | None = None,
rename_columns: list[str] | None = None,
) -> DataFrame: ) -> DataFrame:
""" """
Calculate cell contribution to row/column total for numeric columns. Calculate cell contribution to row/column total for numeric columns.
@ -40,8 +41,11 @@ def contribution(
If `columns` are specified, only calculate contributions on selected columns. If `columns` are specified, only calculate contributions on selected columns.
Contribution for time shift columns will be calculated separately.
:param df: DataFrame containing all-numeric data (temporal column ignored) :param df: DataFrame containing all-numeric data (temporal column ignored)
:param columns: Columns to calculate values from. :param columns: Columns to calculate values from.
:param time_shifts: The applied time shifts.
:param rename_columns: The new labels for the calculated contribution columns. :param rename_columns: The new labels for the calculated contribution columns.
The original columns will not be removed. The original columns will not be removed.
:param orientation: calculate by dividing cell with row/column total :param orientation: calculate by dividing cell with row/column total
@ -62,15 +66,86 @@ def contribution(
column=col, column=col,
) )
) )
columns = columns or numeric_df.columns actual_columns = columns or numeric_df.columns
rename_columns = rename_columns or columns
if len(rename_columns) != len(columns): rename_columns = rename_columns or actual_columns
if len(rename_columns) != len(actual_columns):
raise InvalidPostProcessingError( raise InvalidPostProcessingError(
_("`rename_columns` must have the same length as `columns`.") _(
"`rename_columns` must have the same length as `columns` + `time_shift_columns`."
)
) )
# limit to selected columns # limit to selected columns
numeric_df = numeric_df[columns] numeric_df_view = numeric_df[actual_columns]
axis = 0 if orientation == PostProcessingContributionOrientation.COLUMN else 1
numeric_df = numeric_df / numeric_df.values.sum(axis=axis, keepdims=True) if orientation == PostProcessingContributionOrientation.COLUMN:
contribution_df[rename_columns] = numeric_df numeric_df_view = numeric_df_view / numeric_df_view.values.sum(
axis=0, keepdims=True
)
contribution_df[rename_columns] = numeric_df_view
return contribution_df
result = get_column_groups(numeric_df_view, time_shifts, rename_columns)
calculate_row_contribution(
contribution_df, result["non_time_shift"][0], result["non_time_shift"][1]
)
for time_shift in result["time_shifts"].items():
calculate_row_contribution(contribution_df, time_shift[1][0], time_shift[1][1])
return contribution_df return contribution_df
def get_column_groups(
df: DataFrame, time_shifts: list[str] | None, rename_columns: list[str]
) -> dict[str, Any]:
"""
Group columns based on whether they have a time shift.
:param df: DataFrame to group columns from
:param time_shifts: List of time shifts to group by
:param rename_columns: List of new column names
:return: Dictionary with two keys: 'non_time_shift' and 'time_shifts'. 'non_time_shift'
maps to a tuple of original and renamed columns without a time shift. 'time_shifts' maps
to a dictionary where each key is a time shift and each value is a tuple of original and
renamed columns with that time shift.
"""
result: dict[str, Any] = {
"non_time_shift": ([], []), # take the form of ([A, B, C], [X, Y, Z])
"time_shifts": {}, # take the form of {A: ([X], [Y]), B: ([Z], [W])}
}
for i, col in enumerate(df.columns):
col_0 = col[0] if isinstance(df.columns, MultiIndex) else col
time_shift = None
if time_shifts and isinstance(col_0, str):
for ts in time_shifts:
if col_0.endswith(ts):
time_shift = ts
break
if time_shift is not None:
if time_shift not in result["time_shifts"]:
result["time_shifts"][time_shift] = ([], [])
result["time_shifts"][time_shift][0].append(col)
result["time_shifts"][time_shift][1].append(rename_columns[i])
else:
result["non_time_shift"][0].append(col)
result["non_time_shift"][1].append(rename_columns[i])
return result
def calculate_row_contribution(
df: DataFrame, columns: list[str], rename_columns: list[str]
) -> None:
"""
Calculate the contribution of each column to the row total and update the DataFrame.
This function calculates the contribution of each selected column to the total of the row,
and updates the DataFrame with these contribution percentages in place of the original values.
:param df: The DataFrame to calculate contributions for.
:param columns: A list of column names to calculate contributions for.
:param rename_columns: A list of new column names for the contribution columns.
"""
# calculate the row sum considering only the selected columns
row_sum_except_selected = df.loc[:, columns].sum(axis=1)
# update the dataframe cells with the row contribution percentage
df[rename_columns] = df.loc[:, columns].div(row_sum_except_selected, axis=0)

View File

@ -26,37 +26,43 @@ from superset.exceptions import InvalidPostProcessingError
from superset.utils.core import DTTM_ALIAS, PostProcessingContributionOrientation from superset.utils.core import DTTM_ALIAS, PostProcessingContributionOrientation
from superset.utils.pandas_postprocessing import contribution from superset.utils.pandas_postprocessing import contribution
df_template = DataFrame(
{
DTTM_ALIAS: [
datetime(2020, 7, 16, 14, 49),
datetime(2020, 7, 16, 14, 50),
datetime(2020, 7, 16, 14, 51),
],
"a": [1, 3, nan],
"b": [1, 9, nan],
"c": [nan, nan, nan],
}
)
def test_contribution():
df = DataFrame( def test_non_numeric_columns():
{
DTTM_ALIAS: [
datetime(2020, 7, 16, 14, 49),
datetime(2020, 7, 16, 14, 50),
datetime(2020, 7, 16, 14, 51),
],
"a": [1, 3, nan],
"b": [1, 9, nan],
"c": [nan, nan, nan],
}
)
with pytest.raises(InvalidPostProcessingError, match="not numeric"): with pytest.raises(InvalidPostProcessingError, match="not numeric"):
contribution(df, columns=[DTTM_ALIAS]) contribution(df_template.copy(), columns=[DTTM_ALIAS])
def test_rename_should_have_same_length():
with pytest.raises(InvalidPostProcessingError, match="same length"): with pytest.raises(InvalidPostProcessingError, match="same length"):
contribution(df, columns=["a"], rename_columns=["aa", "bb"]) contribution(df_template.copy(), columns=["a"], rename_columns=["aa", "bb"])
# cell contribution across row
def test_cell_contribution_across_row():
processed_df = contribution( processed_df = contribution(
df, df_template.copy(),
orientation=PostProcessingContributionOrientation.ROW, orientation=PostProcessingContributionOrientation.ROW,
) )
assert processed_df.columns.tolist() == [DTTM_ALIAS, "a", "b", "c"] assert processed_df.columns.tolist() == [DTTM_ALIAS, "a", "b", "c"]
assert_array_equal(processed_df["a"].tolist(), [0.5, 0.25, nan]) assert_array_equal(processed_df["a"].tolist(), [0.5, 0.25, nan])
assert_array_equal(processed_df["b"].tolist(), [0.5, 0.75, nan]) assert_array_equal(processed_df["b"].tolist(), [0.5, 0.75, nan])
assert_array_equal(processed_df["c"].tolist(), [0, 0, nan]) assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan])
# cell contribution across column without temporal column
def test_cell_contribution_across_column_without_temporal_column():
df = df_template.copy()
df.pop(DTTM_ALIAS) df.pop(DTTM_ALIAS)
processed_df = contribution( processed_df = contribution(
df, orientation=PostProcessingContributionOrientation.COLUMN df, orientation=PostProcessingContributionOrientation.COLUMN
@ -66,7 +72,10 @@ def test_contribution():
assert_array_equal(processed_df["b"].tolist(), [0.1, 0.9, 0]) assert_array_equal(processed_df["b"].tolist(), [0.1, 0.9, 0])
assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan]) assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan])
# contribution only on selected columns
def test_contribution_on_selected_columns():
df = df_template.copy()
df.pop(DTTM_ALIAS)
processed_df = contribution( processed_df = contribution(
df, df,
orientation=PostProcessingContributionOrientation.COLUMN, orientation=PostProcessingContributionOrientation.COLUMN,
@ -78,3 +87,40 @@ def test_contribution():
assert_array_equal(processed_df["b"].tolist(), [1, 9, nan]) assert_array_equal(processed_df["b"].tolist(), [1, 9, nan])
assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan]) assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan])
assert processed_df["pct_a"].tolist() == [0.25, 0.75, 0] assert processed_df["pct_a"].tolist() == [0.25, 0.75, 0]
def test_contribution_with_time_shift_columns():
df = DataFrame(
{
DTTM_ALIAS: [
datetime(2020, 7, 16, 14, 49),
datetime(2020, 7, 16, 14, 50),
],
"a": [3, 6],
"b": [3, 3],
"c": [6, 3],
"a__1 week ago": [2, 2],
"b__1 week ago": [1, 1],
"c__1 week ago": [1, 1],
}
)
processed_df = contribution(
df,
orientation=PostProcessingContributionOrientation.ROW,
time_shifts=["1 week ago"],
)
assert processed_df.columns.tolist() == [
DTTM_ALIAS,
"a",
"b",
"c",
"a__1 week ago",
"b__1 week ago",
"c__1 week ago",
]
assert_array_equal(processed_df["a"].tolist(), [0.25, 0.5])
assert_array_equal(processed_df["b"].tolist(), [0.25, 0.25])
assert_array_equal(processed_df["c"].tolist(), [0.50, 0.25])
assert_array_equal(processed_df["a__1 week ago"].tolist(), [0.5, 0.5])
assert_array_equal(processed_df["b__1 week ago"].tolist(), [0.25, 0.25])
assert_array_equal(processed_df["c__1 week ago"].tolist(), [0.25, 0.25])