add slackv2 notification

This commit is contained in:
Elizabeth Thompson 2024-06-13 09:39:03 -07:00
parent fc9bc175e6
commit d85d256666
25 changed files with 695 additions and 445 deletions

View File

@ -51,6 +51,7 @@ assists people when migrating to a new version.
translations inside the python package. This includes the .mo files needed by pybabel on the
backend, as well as the .json files used by the frontend. If you were doing anything before
as part of your bundling to expose translation packages, it's probably not needed anymore.
- [29264](https://github.com/apache/superset/pull/29264) Slack has updated its file upload api, and we are now supporting this new api in Superset, although the Slack api is not backward compatible. The original Slack integration is deprecated and we will require a new Slack scope `channels:read` to be added to Slack workspaces in order to use this new api. In an upcoming release, we will make this new Slack scope mandatory and remove the old Slack functionality.
### Potential Downtime

View File

@ -25,6 +25,7 @@ export enum FeatureFlag {
AlertsAttachReports = 'ALERTS_ATTACH_REPORTS',
AlertReports = 'ALERT_REPORTS',
AlertReportTabs = 'ALERT_REPORT_TABS',
AlertReportSlackV2 = 'ALERT_REPORT_SLACK_V2',
AllowFullCsvExport = 'ALLOW_FULL_CSV_EXPORT',
AvoidColorsCollision = 'AVOID_COLORS_COLLISION',
ChartPluginsExperimental = 'CHART_PLUGINS_EXPERIMENTAL',

View File

@ -30,7 +30,7 @@ jest.mock('@superset-ui/core', () => ({
jest.mock('src/features/databases/state.ts', () => ({
useCommonConf: () => ({
ALERT_REPORTS_NOTIFICATION_METHODS: ['Email', 'Slack'],
ALERT_REPORTS_NOTIFICATION_METHODS: ['Email', 'Slack', 'SlackV2'],
}),
}));

View File

@ -517,7 +517,7 @@ const AlertReportModal: FunctionComponent<AlertReportModalProps> = ({
]);
setNotificationAddState(
notificationSettings.length === allowedNotificationMethods.length
notificationSettings.length === allowedNotificationMethodsCount
? 'hidden'
: 'disabled',
);
@ -1235,6 +1235,20 @@ const AlertReportModal: FunctionComponent<AlertReportModalProps> = ({
enforceValidation();
}, [validationStatus]);
const allowedNotificationMethodsCount = useMemo(
() =>
allowedNotificationMethods.reduce((accum: string[], setting: string) => {
if (
accum.some(nm => nm.includes('slack')) &&
setting.toLowerCase().includes('slack')
) {
return accum;
}
return [...accum, setting.toLowerCase()];
}, []).length,
[allowedNotificationMethods],
);
// Show/hide
if (isHidden && show) {
setIsHidden(false);
@ -1743,7 +1757,7 @@ const AlertReportModal: FunctionComponent<AlertReportModalProps> = ({
))}
{
// Prohibit 'add notification method' button if only one present
allowedNotificationMethods.length > notificationSettings.length && (
allowedNotificationMethodsCount > notificationSettings.length && (
<NotificationMethodAdd
data-test="notification-add"
status={notificationAddState}

View File

@ -16,10 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
import { FunctionComponent, useState, ChangeEvent } from 'react';
import {
FunctionComponent,
useState,
ChangeEvent,
useEffect,
useMemo,
} from 'react';
import { styled, t, useTheme } from '@superset-ui/core';
import { Select } from 'src/components';
import {
FeatureFlag,
SupersetClient,
isFeatureEnabled,
styled,
t,
useTheme,
} from '@superset-ui/core';
import rison from 'rison';
import { AsyncSelect, Select } from 'src/components';
import Icons from 'src/components/Icons';
import { NotificationMethodOption, NotificationSetting } from '../types';
import { StyledInputContainer } from '../AlertReportModal';
@ -87,20 +101,57 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({
const [recipientValue, setRecipientValue] = useState<string>(
recipients || '',
);
const [slackRecipients, setSlackRecipients] = useState<
{ label: string; value: string }[]
>([]);
const [error, setError] = useState(false);
const theme = useTheme();
if (!setting) {
return null;
}
const [useSlackV1, setUseSlackV1] = useState<boolean>(false);
const onMethodChange = (method: NotificationMethodOption) => {
const mapChannelsToOptions = (result: { name: any; id: any }[]) =>
result.map((result: { name: any; id: any }) => ({
label: result.name,
value: result.id,
}));
const loadChannels = async (
search_string: string | undefined = '',
): Promise<{
data: { label: any; value: any }[];
totalCount: number;
}> => {
const query = rison.encode({ search_string });
const endpoint = `/api/v1/report/slack_channels/?q=${query}`;
const noResults = { data: [], totalCount: 0 };
return SupersetClient.get({ endpoint })
.then(({ json }) => {
const { result, count } = json;
const options: { label: any; value: any }[] =
mapChannelsToOptions(result);
return {
data: options,
totalCount: (count ?? options.length) as number,
};
})
.catch(() => {
// Fallback to slack v1 if slack v2 is not compatible
setUseSlackV1(true);
return noResults;
});
};
const onMethodChange = (selected: {
label: string;
value: NotificationMethodOption;
}) => {
// Since we're swapping the method, reset the recipients
setRecipientValue('');
if (onUpdate) {
if (onUpdate && setting) {
const updatedSetting = {
...setting,
method,
method: selected.value,
recipients: '',
};
@ -108,6 +159,42 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({
}
};
useEffect(() => {
// fetch slack channel names from
// ids on first load
if (method && ['Slack', 'SlackV2'].includes(method)) {
loadChannels(recipients).then(response => {
setSlackRecipients(response.data || []);
// if fetch succeeds, set the method to SlackV2
onMethodChange({ label: 'Slack', value: 'SlackV2' });
});
}
}, []);
const formattedOptions = useMemo(
() =>
(options || [])
.filter(
method =>
(isFeatureEnabled(FeatureFlag.AlertReportSlackV2) &&
!useSlackV1 &&
method === 'SlackV2') ||
((!isFeatureEnabled(FeatureFlag.AlertReportSlackV2) ||
useSlackV1) &&
method === 'Slack') ||
method === 'Email',
)
.map(method => ({
label: method === 'SlackV2' ? 'Slack' : method,
value: method,
})),
[options],
);
if (!setting) {
return null;
}
const onRecipientsChange = (event: ChangeEvent<HTMLTextAreaElement>) => {
const { target } = event;
@ -123,6 +210,21 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({
}
};
const onSlackRecipientsChange = (
recipients: { label: string; value: string }[],
) => {
setSlackRecipients(recipients);
if (onUpdate) {
const updatedSetting = {
...setting,
recipients: recipients?.map(obj => obj.value).join(','),
};
onUpdate(index, updatedSetting);
}
};
const onSubjectChange = (
event: ChangeEvent<HTMLTextAreaElement | HTMLInputElement>,
) => {
@ -153,15 +255,12 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({
<Select
ariaLabel={t('Delivery method')}
data-test="select-delivery-method"
labelInValue
onChange={onMethodChange}
placeholder={t('Select Delivery Method')}
options={(options || []).map(
(method: NotificationMethodOption) => ({
label: method,
value: method,
}),
)}
value={method}
options={formattedOptions}
showSearch
value={formattedOptions.find(option => option.value === method)}
/>
{index !== 0 && !!onRemove ? (
<span
@ -211,19 +310,37 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({
<div className="inline-container">
<StyledInputContainer>
<div className="control-label">
{t('%s recipients', method)}
{t('%s recipients', method === 'SlackV2' ? 'Slack' : method)}
<span className="required">*</span>
</div>
<div className="input-container">
<textarea
name="recipients"
data-test="recipients"
value={recipientValue}
onChange={onRecipientsChange}
/>
</div>
<div className="helper">
{t('Recipients are separated by "," or ";"')}
<div>
{['Email', 'Slack'].includes(method) ? (
<>
<div className="input-container">
<textarea
name="recipients"
data-test="recipients"
value={recipientValue}
onChange={onRecipientsChange}
/>
</div>
<div className="helper">
{t('Recipients are separated by "," or ";"')}
</div>
</>
) : (
// for SlackV2
<AsyncSelect
ariaLabel={t('Select channels')}
mode="multiple"
name="recipients"
value={slackRecipients}
options={loadChannels}
onChange={onSlackRecipientsChange}
allowClear
data-test="recipients"
/>
)}
</div>
</StyledInputContainer>
</div>

View File

@ -41,6 +41,10 @@ export default function RecipientIcon({ type }: { type: string }) {
recipientIconConfig.icon = <Icons.Slack css={StyledIcon} />;
recipientIconConfig.label = RecipientIconName.Slack;
break;
case RecipientIconName.SlackV2:
recipientIconConfig.icon = <Icons.Slack css={StyledIcon} />;
recipientIconConfig.label = RecipientIconName.Slack;
break;
default:
recipientIconConfig.icon = null;
recipientIconConfig.label = '';

View File

@ -41,7 +41,7 @@ export type DatabaseObject = {
id: number;
};
export type NotificationMethodOption = 'Email' | 'Slack';
export type NotificationMethodOption = 'Email' | 'Slack' | 'SlackV2';
export type NotificationSetting = {
method?: NotificationMethodOption;
@ -124,6 +124,7 @@ export enum AlertState {
export enum RecipientIconName {
Email = 'Email',
Slack = 'Slack',
SlackV2 = 'SlackV2',
}
export interface AlertsReportsConfig {
ALERT_REPORTS_DEFAULT_WORKING_TIMEOUT: number;

View File

@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from uuid import UUID
@ -25,7 +26,7 @@ from celery.exceptions import SoftTimeLimitExceeded
from superset import app, db, security_manager
from superset.commands.base import BaseCommand
from superset.commands.dashboard.permalink.create import CreateDashboardPermalinkCommand
from superset.commands.exceptions import CommandException
from superset.commands.exceptions import CommandException, UpdateFailedError
from superset.commands.report.alert import AlertCommand
from superset.commands.report.exceptions import (
ReportScheduleAlertGracePeriodError,
@ -64,7 +65,10 @@ from superset.reports.models import (
)
from superset.reports.notifications import create_notification
from superset.reports.notifications.base import NotificationContent
from superset.reports.notifications.exceptions import NotificationError
from superset.reports.notifications.exceptions import (
NotificationError,
SlackV1NotificationError,
)
from superset.tasks.utils import get_executor
from superset.utils import json
from superset.utils.core import HeaderDataType, override_user
@ -72,6 +76,7 @@ from superset.utils.csv import get_chart_csv_data, get_chart_dataframe
from superset.utils.decorators import logs_context
from superset.utils.pdf import build_pdf_from_screenshots
from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot
from superset.utils.slack import get_channels_with_search
from superset.utils.urls import get_url_path
logger = logging.getLogger(__name__)
@ -122,6 +127,30 @@ class BaseReportState:
self._report_schedule.last_eval_dttm = datetime.utcnow()
db.session.commit()
def update_report_schedule_slack_v2(self) -> None:
"""
Update the report schedule type and channels for all slack recipients to v2.
V2 uses ids instead of names for channels.
"""
try:
updated_recipients = []
for recipient in self._report_schedule.recipients:
recipient_copy = deepcopy(recipient)
if recipient_copy.type == ReportRecipientType.SLACK:
recipient_copy.type = ReportRecipientType.SLACKV2
slack_recipients = json.loads(recipient_copy.recipient_config_json)
recipient_copy.recipient_config_json = json.dumps(
{"target": get_channels_with_search(slack_recipients["target"])}
)
updated_recipients.append(recipient_copy)
db.session.commit()
except Exception as ex:
logger.warning(
"Failed to update slack recipients to v2: %s", str(ex), exc_info=True
)
raise UpdateFailedError from ex
def create_log(self, error_message: Optional[str] = None) -> None:
"""
Creates a Report execution log, uses the current computed last_value for Alerts
@ -440,6 +469,19 @@ class BaseReportState:
)
else:
notification.send()
except SlackV1NotificationError as ex:
# The slack notification should be sent with the v2 api
logger.info("Attempting to upgrade the report to Slackv2: %s", str(ex))
try:
self.update_report_schedule_slack_v2()
recipient.type = ReportRecipientType.SLACKV2
notification = create_notification(recipient, notification_content)
notification.send()
except UpdateFailedError as err:
# log the error but keep processing the report with SlackV1
logger.warning(
"Failed to update slack recipients to v2: %s", str(err)
)
except (NotificationError, SupersetException) as ex:
# collect errors but keep processing them
notification_errors.append(

View File

@ -444,6 +444,7 @@ DEFAULT_FEATURE_FLAGS: dict[str, bool] = {
# Enables Alerts and reports new implementation
"ALERT_REPORTS": False,
"ALERT_REPORT_TABS": False,
"ALERT_REPORT_SLACK_V2": False,
"DASHBOARD_RBAC": False,
"ENABLE_ADVANCED_DATA_TYPES": False,
# Enabling ALERTS_ATTACH_REPORTS, the system sends email and slack message

View File

@ -171,6 +171,7 @@ MODEL_API_RW_METHOD_PERMISSION_MAP = {
"excel_metadata": "excel_upload",
"columnar_metadata": "columnar_upload",
"csv_metadata": "csv_upload",
"slack_channels": "write",
}
EXTRA_FORM_DATA_APPEND_KEYS = {

View File

@ -40,15 +40,18 @@ from superset.commands.report.update import UpdateReportScheduleCommand
from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
from superset.dashboards.filters import DashboardAccessFilter
from superset.databases.filters import DatabaseFilter
from superset.exceptions import SupersetException
from superset.extensions import event_logger
from superset.reports.filters import ReportScheduleAllTextFilter, ReportScheduleFilter
from superset.reports.models import ReportSchedule
from superset.reports.schemas import (
get_delete_ids_schema,
get_slack_channels_schema,
openapi_spec_methods_override,
ReportSchedulePostSchema,
ReportSchedulePutSchema,
)
from superset.utils.slack import get_channels_with_search
from superset.views.base_api import (
BaseSupersetModelRestApi,
RelatedFieldFilter,
@ -71,7 +74,8 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {
RouteMethod.RELATED,
"bulk_delete", # not using RouteMethod since locally defined
"bulk_delete",
"slack_channels", # not using RouteMethod since locally defined
}
class_permission_name = "ReportSchedule"
method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP
@ -513,3 +517,63 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
return self.response_403()
except ReportScheduleDeleteFailedError as ex:
return self.response_422(message=str(ex))
@expose("/slack_channels/", methods=("GET",))
@protect()
@rison(get_slack_channels_schema)
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self,
*args,
**kwargs: f"{self.__class__.__name__}.slack_channels",
log_to_statsd=False,
)
def slack_channels(self, **kwargs: Any) -> Response:
"""Get slack channels.
---
get:
summary: Get slack channels
description: Get slack channels
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/get_slack_channels_schema'
responses:
200:
description: Slack channels
content:
application/json:
schema:
type: object
properties:
result:
type: array
items:
type: object
properties:
id:
type: string
name:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
search_string = kwargs.get("rison", {}).get("search_string")
channels = get_channels_with_search(search_string=search_string)
return self.response(200, result=channels)
except SupersetException as ex:
logger.error("Error fetching slack channels %s", str(ex))
return self.response_422(message=str(ex))

View File

@ -62,6 +62,7 @@ class ReportScheduleValidatorType(StrEnum):
class ReportRecipientType(StrEnum):
EMAIL = "Email"
SLACK = "Slack"
SLACKV2 = "SlackV2"
class ReportState(StrEnum):

View File

@ -18,6 +18,7 @@ from superset.reports.models import ReportRecipients
from superset.reports.notifications.base import BaseNotification, NotificationContent
from superset.reports.notifications.email import EmailNotification # noqa: F401
from superset.reports.notifications.slack import SlackNotification # noqa: F401
from superset.reports.notifications.slackv2 import SlackV2Notification # noqa: F401
def create_notification(

View File

@ -24,6 +24,17 @@ class NotificationError(SupersetException):
"""
class SlackV1NotificationError(SupersetException):
"""
Report should not be run with the slack v1 api
"""
message = """Report should not be run with the Slack V1 api.
Attempting to run with V2 if required Slack scopes are available"""
status = 422
class NotificationParamException(SupersetException):
status = 422

View File

@ -17,14 +17,10 @@
import logging
from collections.abc import Sequence
from io import IOBase
from typing import List, Union
from typing import Union
import backoff
import pandas as pd
from deprecation import deprecated
from flask import g
from flask_babel import gettext as __
from slack_sdk import WebClient
from slack_sdk.errors import (
BotUserAccessError,
SlackApiError,
@ -43,172 +39,72 @@ from superset.reports.notifications.exceptions import (
NotificationMalformedException,
NotificationParamException,
NotificationUnprocessableException,
SlackV1NotificationError,
)
from superset.reports.notifications.slack_mixin import SlackMixin
from superset.utils import json
from superset.utils.core import get_email_address_list
from superset.utils.decorators import statsd_gauge
from superset.utils.slack import get_slack_client
from superset.utils.slack import (
get_slack_client,
should_use_v2_api,
)
logger = logging.getLogger(__name__)
# Slack only allows Markdown messages up to 4k chars
MAXIMUM_MESSAGE_SIZE = 4000
class SlackNotification(BaseNotification): # pylint: disable=too-few-public-methods
# TODO: Deprecated: Remove this class in Superset 6.0.0
class SlackNotification(SlackMixin, BaseNotification): # pylint: disable=too-few-public-methods
"""
Sends a slack notification for a report recipient
"""
type = ReportRecipientType.SLACK
def _get_channels(self, client: WebClient) -> List[str]:
def _get_channel(self) -> str:
"""
Get the recipient's channel(s).
:returns: A list of channel ids: "EID676L"
:raises SlackApiError: If the API call fails
Note Slack SDK uses "channel" to refer to one or more
channels. Multiple channels are demarcated by a comma.
:returns: The comma separated list of channel(s)
"""
recipient_str = json.loads(self._recipient.recipient_config_json)["target"]
channel_recipients: List[str] = get_email_address_list(recipient_str)
conversations_list_response = client.conversations_list(
types="public_channel,private_channel"
)
return [
c["id"]
for c in conversations_list_response["channels"]
if c["name"] in channel_recipients
]
def _message_template(self, table: str = "") -> str:
return __(
"""*%(name)s*
%(description)s
<%(url)s|Explore in Superset>
%(table)s
""",
name=self._content.name,
description=self._content.description or "",
url=self._content.url,
table=table,
)
@staticmethod
def _error_template(name: str, description: str, text: str) -> str:
return __(
"""*%(name)s*
%(description)s
Error: %(text)s
""",
name=name,
description=description,
text=text,
)
def _get_body(self) -> str:
if self._content.text:
return self._error_template(
self._content.name, self._content.description or "", self._content.text
)
if self._content.embedded_data is None:
return self._message_template()
# Embed data in the message
df = self._content.embedded_data
# Flatten columns/index so they show up nicely in the table
df.columns = [
(
" ".join(str(name) for name in column).strip()
if isinstance(column, tuple)
else column
)
for column in df.columns
]
df.index = [
(
" ".join(str(name) for name in index).strip()
if isinstance(index, tuple)
else index
)
for index in df.index
]
# Slack Markdown only works on messages shorter than 4k chars, so we might
# need to truncate the data
for i in range(len(df) - 1):
truncated_df = df[: i + 1].fillna("")
truncated_row = pd.Series({k: "..." for k in df.columns})
truncated_df = pd.concat(
[truncated_df, truncated_row.to_frame().T], ignore_index=True
)
tabulated = df.to_markdown()
table = f"```\n{tabulated}\n```\n\n(table was truncated)"
message = self._message_template(table)
if len(message) > MAXIMUM_MESSAGE_SIZE:
# Decrement i and build a message that is under the limit
truncated_df = df[:i].fillna("")
truncated_row = pd.Series({k: "..." for k in df.columns})
truncated_df = pd.concat(
[truncated_df, truncated_row.to_frame().T], ignore_index=True
)
tabulated = df.to_markdown()
table = (
f"```\n{tabulated}\n```\n\n(table was truncated)"
if len(truncated_df) > 0
else ""
)
break
# Send full data
else:
tabulated = df.to_markdown()
table = f"```\n{tabulated}\n```"
return self._message_template(table)
return ",".join(get_email_address_list(recipient_str))
def _get_inline_files(
self,
) -> Sequence[Union[str, IOBase, bytes]]:
) -> tuple[Union[str, None], Sequence[Union[str, IOBase, bytes]]]:
if self._content.csv:
return [self._content.csv]
return ("csv", [self._content.csv])
if self._content.screenshots:
return self._content.screenshots
return ("png", self._content.screenshots)
if self._content.pdf:
return [self._content.pdf]
return []
return ("pdf", [self._content.pdf])
return (None, [])
@deprecated(deprecated_in="4.1")
def _deprecated_upload_files(
self, client: WebClient, title: str, body: str
) -> None:
"""
Deprecated method to upload files to slack
Should only be used if the new method fails
To be removed in the next major release
"""
file_type, files = (None, [])
if self._content.csv:
file_type, files = ("csv", [self._content.csv])
if self._content.screenshots:
file_type, files = ("png", self._content.screenshots)
if self._content.pdf:
file_type, files = ("pdf", [self._content.pdf])
@backoff.on_exception(backoff.expo, SlackApiError, factor=10, base=2, max_tries=5)
@statsd_gauge("reports.slack.send")
def send(self) -> None:
file_type, files = self._get_inline_files()
title = self._content.name
body = self._get_body(content=self._content)
global_logs_context = getattr(g, "logs_context", {}) or {}
recipient_str = json.loads(self._recipient.recipient_config_json)["target"]
# see if the v2 api will work
if should_use_v2_api():
# if we can fetch channels, then raise an error and use the v2 api
raise SlackV1NotificationError
recipients = get_email_address_list(recipient_str)
for channel in recipients:
if len(files) > 0:
# use the v1 api but warn with a deprecation message
logger.warning(
"Your current Slack scopes are missing `channels:read`. Please add this to your Slack app in order to continue using the v1 API. Support for the old Slack API will be removed in Superset version 6.0.0."
)
try:
client = get_slack_client()
channel = self._get_channel()
# files_upload returns SlackResponse as we run it in sync mode.
if files:
for file in files:
client.files_upload(
channels=channel,
@ -219,46 +115,6 @@ Error: %(text)s
)
else:
client.chat_postMessage(channel=channel, text=body)
@backoff.on_exception(backoff.expo, SlackApiError, factor=10, base=2, max_tries=5)
@statsd_gauge("reports.slack.send")
def send(self) -> None:
global_logs_context = getattr(g, "logs_context", {}) or {}
try:
client = get_slack_client()
title = self._content.name
body = self._get_body()
try:
channels = self._get_channels(client)
except SlackApiError:
logger.warning(
"Slack scope missing. Using deprecated API to get channels. Please update your Slack app to use the new API.",
extra={
"execution_id": global_logs_context.get("execution_id"),
},
)
self._deprecated_upload_files(client, title, body)
return
if channels == []:
raise NotificationParamException("No valid channel found")
files = self._get_inline_files()
# files_upload returns SlackResponse as we run it in sync mode.
for channel in channels:
if len(files) > 0:
for file in files:
client.files_upload_v2(
channel=channel,
file=file,
initial_comment=body,
title=title,
)
else:
client.chat_postMessage(channel=channel, text=body)
logger.info(
"Report sent to slack",
extra={

View File

@ -0,0 +1,124 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pandas as pd
from flask_babel import gettext as __
from superset.reports.notifications.base import NotificationContent
# Slack only allows Markdown messages up to 4k chars
MAXIMUM_MESSAGE_SIZE = 4000
# pylint: disable=too-few-public-methods
class SlackMixin:
def _message_template(
self,
content: NotificationContent,
table: str = "",
) -> str:
return __(
"""*%(name)s*
%(description)s
<%(url)s|Explore in Superset>
%(table)s
""",
name=content.name,
description=content.description or "",
url=content.url,
table=table,
)
@staticmethod
def _error_template(name: str, description: str, text: str) -> str:
return __(
"""*%(name)s*
%(description)s
Error: %(text)s
""",
name=name,
description=description,
text=text,
)
def _get_body(self, content: NotificationContent) -> str:
if content.text:
return self._error_template(
content.name, content.description or "", content.text
)
if content.embedded_data is None:
return self._message_template(content=content)
# Embed data in the message
df = content.embedded_data
# Flatten columns/index so they show up nicely in the table
df.columns = [
(
" ".join(str(name) for name in column).strip()
if isinstance(column, tuple)
else column
)
for column in df.columns
]
df.index = [
(
" ".join(str(name) for name in index).strip()
if isinstance(index, tuple)
else index
)
for index in df.index
]
# Slack Markdown only works on messages shorter than 4k chars, so we might
# need to truncate the data
for i in range(len(df) - 1):
truncated_df = df[: i + 1].fillna("")
truncated_row = pd.Series({k: "..." for k in df.columns})
truncated_df = pd.concat(
[truncated_df, truncated_row.to_frame().T], ignore_index=True
)
tabulated = df.to_markdown()
table = f"```\n{tabulated}\n```\n\n(table was truncated)"
message = self._message_template(table=table, content=content)
if len(message) > MAXIMUM_MESSAGE_SIZE:
# Decrement i and build a message that is under the limit
truncated_df = df[:i].fillna("")
truncated_row = pd.Series({k: "..." for k in df.columns})
truncated_df = pd.concat(
[truncated_df, truncated_row.to_frame().T], ignore_index=True
)
tabulated = df.to_markdown()
table = (
f"```\n{tabulated}\n```\n\n(table was truncated)"
if len(truncated_df) > 0
else ""
)
break
# Send full data
else:
tabulated = df.to_markdown()
table = f"```\n{tabulated}\n```"
return self._message_template(table=table, content=content)

View File

@ -0,0 +1,131 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from collections.abc import Sequence
from io import IOBase
from typing import List, Union
import backoff
from flask import g
from slack_sdk.errors import (
BotUserAccessError,
SlackApiError,
SlackClientConfigurationError,
SlackClientError,
SlackClientNotConnectedError,
SlackObjectFormationError,
SlackRequestError,
SlackTokenRotationError,
)
from superset.reports.models import ReportRecipientType
from superset.reports.notifications.base import BaseNotification
from superset.reports.notifications.exceptions import (
NotificationAuthorizationException,
NotificationMalformedException,
NotificationParamException,
NotificationUnprocessableException,
)
from superset.reports.notifications.slack_mixin import SlackMixin
from superset.utils import json
from superset.utils.core import get_email_address_list
from superset.utils.decorators import statsd_gauge
from superset.utils.slack import get_slack_client
logger = logging.getLogger(__name__)
class SlackV2Notification(SlackMixin, BaseNotification): # pylint: disable=too-few-public-methods
"""
Sends a slack notification for a report recipient with the slack upload v2 API
"""
type = ReportRecipientType.SLACKV2
def _get_channels(self) -> List[str]:
"""
Get the recipient's channel(s).
:returns: A list of channel ids: "EID676L"
:raises NotificationParamException or SlackApiError: If the recipient is not found
"""
recipient_str = json.loads(self._recipient.recipient_config_json)["target"]
return get_email_address_list(recipient_str)
def _get_inline_files(
self,
) -> Sequence[Union[str, IOBase, bytes]]:
if self._content.csv:
return [self._content.csv]
if self._content.screenshots:
return self._content.screenshots
if self._content.pdf:
return [self._content.pdf]
return []
@backoff.on_exception(backoff.expo, SlackApiError, factor=10, base=2, max_tries=5)
@statsd_gauge("reports.slack.send")
def send(self) -> None:
global_logs_context = getattr(g, "logs_context", {}) or {}
try:
client = get_slack_client()
title = self._content.name
body = self._get_body(content=self._content)
channels = self._get_channels()
logger.info("channels: %s", channels)
if not channels:
raise NotificationParamException("No recipients saved in the report")
files = self._get_inline_files()
# files_upload returns SlackResponse as we run it in sync mode.
for channel in channels:
if len(files) > 0:
for file in files:
client.files_upload_v2(
channel=channel,
file=file,
initial_comment=body,
title=title,
)
else:
client.chat_postMessage(channel=channel, text=body)
logger.info(
"Report sent to slack",
extra={
"execution_id": global_logs_context.get("execution_id"),
},
)
except (
BotUserAccessError,
SlackRequestError,
SlackClientConfigurationError,
) as ex:
raise NotificationParamException(str(ex)) from ex
except SlackObjectFormationError as ex:
raise NotificationMalformedException(str(ex)) from ex
except SlackTokenRotationError as ex:
raise NotificationAuthorizationException(str(ex)) from ex
except (SlackClientNotConnectedError, SlackApiError) as ex:
raise NotificationUnprocessableException(str(ex)) from ex
except SlackClientError as ex:
# this is the base class for all slack client errors
# keep it last so that it doesn't interfere with @backoff
raise NotificationUnprocessableException(str(ex)) from ex

View File

@ -49,6 +49,12 @@ openapi_spec_methods_override = {
}
get_delete_ids_schema = {"type": "array", "items": {"type": "integer"}}
get_slack_channels_schema = {
"type": "object",
"properties": {
"seach_string": {"type": "string"},
},
}
type_description = "The report schedule type"
name_description = "The report schedule name."

View File

@ -94,7 +94,7 @@ def execute(self: Celery.task, report_schedule_id: int) -> None:
).run()
except ReportScheduleUnexpectedError:
logger.exception(
"An unexpected occurred while executing the report: %s", task_id
"An unexpected error occurred while executing the report: %s", task_id
)
self.update_state(state="FAILURE")
except CommandException as ex:

View File

@ -16,8 +16,15 @@
# under the License.
import logging
from flask import current_app
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from superset.exceptions import SupersetException
logger = logging.getLogger(__name__)
class SlackClientError(Exception):
@ -31,6 +38,53 @@ def get_slack_client() -> WebClient:
return WebClient(token=token, proxy=current_app.config["SLACK_PROXY"])
def get_channels_with_search(search_string: str = "", limit: int = 200) -> list[str]:
"""
The slack api is paginated but does not include search, so we need to fetch
all channels and filter them ourselves
This will search by slack name or id
"""
try:
client = get_slack_client()
channels = []
cursor = None
while True:
response = client.conversations_list(limit=limit, cursor=cursor)
channels.extend(response.data["channels"])
cursor = response.data.get("response_metadata", {}).get("next_cursor")
if not cursor:
break
# The search string can be multiple channels separated by commas
if search_string:
search_array = search_string.split(",")
channels = [
channel
for channel in channels
if any(
search.lower() in channel["name"].lower()
or search.lower() in channel["id"].lower()
for search in search_array
)
]
return channels
except (SlackClientError, SlackApiError) as ex:
raise SupersetException(f"Failed to list channels: {ex}") from ex
def should_use_v2_api() -> bool:
try:
client = get_slack_client()
client.conversations_list()
logger.info("Slack API v2 is available")
return True
except SlackApiError:
return False
def get_user_avatar(email: str, client: WebClient = None) -> str:
client = client or get_slack_client()
try:

View File

@ -402,6 +402,7 @@ def cached_common_bootstrap_data( # pylint: disable=unused-argument
frontend_config["ALERT_REPORTS_NOTIFICATION_METHODS"] = [
ReportRecipientType.EMAIL,
ReportRecipientType.SLACK,
ReportRecipientType.SLACKV2,
]
else:
frontend_config["ALERT_REPORTS_NOTIFICATION_METHODS"] = [

View File

@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import string
from operator import or_
from random import choice, randint, random, uniform
from typing import Any
@ -146,16 +147,14 @@ def _cleanup(dash_id: int, slices_ids: list[int]) -> None:
def _cleanup_reports(dash_id: int, slices_ids: list[int]) -> None:
reports_with_dash = (
db.session.query(ReportSchedule).filter_by(dashboard_id=dash_id).all()
)
reports_with_slices = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.chart_id.in_(slices_ids))
.all()
reports = db.session.query(ReportSchedule).filter(
or_(
ReportSchedule.dashboard_id == dash_id,
ReportSchedule.chart_id.in_(slices_ids),
)
)
for report in reports_with_dash + reports_with_slices:
for report in reports:
db.session.delete(report)
db.session.commit()

View File

@ -268,6 +268,17 @@ def create_report_slack_chart():
cleanup_report_schedule(report_schedule)
@pytest.fixture()
def create_report_slack_chartv2():
chart = db.session.query(Slice).first()
report_schedule = create_report_notification(
slack_channel="slack_channel_id", chart=chart, name="report_slack_chartv2"
)
yield report_schedule
cleanup_report_schedule(report_schedule)
@pytest.fixture()
def create_report_slack_chart_with_csv():
chart = db.session.query(Slice).first()
@ -1100,13 +1111,17 @@ def test_email_dashboard_report_schedule_force_screenshot(
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart"
"load_birth_names_dashboard_with_slices", "create_report_slack_chartv2"
)
@patch("superset.reports.notifications.slack.get_slack_client")
@patch("superset.commands.report.execute.get_channels_with_search")
@patch("superset.reports.notifications.slack.should_use_v2_api", return_value=True)
@patch("superset.reports.notifications.slackv2.get_slack_client")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_slack_chart_report_schedule(
def test_slack_chart_report_schedule_v2(
screenshot_mock,
slack_client_mock,
slack_should_use_v2_api_mock,
get_channels_with_search_mock,
create_report_slack_chart,
):
"""
@ -1116,11 +1131,9 @@ def test_slack_chart_report_schedule(
screenshot_mock.return_value = SCREENSHOT_FILE
notification_targets = get_target_from_report_schedule(create_report_slack_chart)
channel_name = notification_targets[0]
channel_id = "channel_id"
slack_client_mock.return_value.conversations_list.return_value = {
"channels": [{"id": channel_id, "name": channel_name}]
}
channel_id = notification_targets[0]
get_channels_with_search_mock.return_value = {}
with freeze_time("2020-01-01T00:00:00Z"):
with patch.object(current_app.config["STATS_LOGGER"], "gauge") as statsd_mock:
@ -1139,56 +1152,17 @@ def test_slack_chart_report_schedule(
# Assert logs are correct
assert_log(ReportState.SUCCESS)
statsd_mock.assert_called_once_with("reports.slack.send.ok", 1)
# this will send a warning
assert statsd_mock.call_args_list[0] == call(
"reports.slack.send.warning", 1
)
assert statsd_mock.call_args_list[1] == call("reports.slack.send.ok", 1)
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart"
)
@patch("superset.reports.notifications.slack.get_slack_client")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_slack_chart_report_schedule_deprecated(
screenshot_mock,
slack_client_mock,
create_report_slack_chart,
):
"""
ExecuteReport Command: Test chart slack report schedule
"""
# setup screenshot mock
screenshot_mock.return_value = SCREENSHOT_FILE
notification_targets = get_target_from_report_schedule(create_report_slack_chart)
channel_name = notification_targets[0]
slack_client_mock.return_value.conversations_list.side_effect = SlackApiError(
"Error", "Response"
)
with freeze_time("2020-01-01T00:00:00Z"):
with patch.object(current_app.config["STATS_LOGGER"], "gauge") as statsd_mock:
AsyncExecuteReportScheduleCommand(
TEST_ID, create_report_slack_chart.id, datetime.utcnow()
).run()
assert (
slack_client_mock.return_value.files_upload.call_args[1]["channels"]
== channel_name
)
assert (
slack_client_mock.return_value.files_upload.call_args[1]["file"]
== SCREENSHOT_FILE
)
# Assert logs are correct
assert_log(ReportState.SUCCESS)
statsd_mock.assert_called_once_with("reports.slack.send.ok", 1)
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart"
)
@patch("superset.utils.slack.WebClient")
@patch("superset.utils.slack.get_slack_client")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_slack_chart_report_schedule_with_errors(
screenshot_mock,
@ -1214,7 +1188,7 @@ def test_slack_chart_report_schedule_with_errors(
]
for idx, er in enumerate(slack_errors):
web_client_mock.side_effect = er
web_client_mock.side_effect = [SlackApiError(None, None), er]
with pytest.raises(ReportScheduleClientErrorsException):
AsyncExecuteReportScheduleCommand(
@ -1242,6 +1216,7 @@ def test_slack_chart_report_schedule_with_errors(
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart_with_csv"
)
@patch("superset.reports.notifications.slack.should_use_v2_api", return_value=False)
@patch("superset.reports.notifications.slack.get_slack_client")
@patch("superset.utils.csv.urllib.request.urlopen")
@patch("superset.utils.csv.urllib.request.OpenerDirector.open")
@ -1251,10 +1226,11 @@ def test_slack_chart_report_schedule_with_csv(
mock_open,
mock_urlopen,
slack_client_mock_class,
slack_should_use_v2_api_mock,
create_report_slack_chart_with_csv,
):
"""
ExecuteReport Command: Test chart slack report schedule with CSV
ExecuteReport Command: Test chart slack report V1 schedule with CSV
"""
# setup csv mock
response = Mock()
@ -1268,63 +1244,6 @@ def test_slack_chart_report_schedule_with_csv(
)
channel_name = notification_targets[0]
channel_id = "channel_id"
slack_client_mock_class.return_value = Mock()
slack_client_mock_class.return_value.conversations_list.return_value = {
"channels": [{"id": channel_id, "name": channel_name}]
}
with freeze_time("2020-01-01T00:00:00Z"):
AsyncExecuteReportScheduleCommand(
TEST_ID, create_report_slack_chart_with_csv.id, datetime.utcnow()
).run()
assert (
slack_client_mock_class.return_value.files_upload_v2.call_args[1]["channel"]
== channel_id
)
assert (
slack_client_mock_class.return_value.files_upload_v2.call_args[1]["file"]
== CSV_FILE
)
# Assert logs are correct
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart_with_csv"
)
@patch("superset.reports.notifications.slack.get_slack_client")
@patch("superset.utils.csv.urllib.request.urlopen")
@patch("superset.utils.csv.urllib.request.OpenerDirector.open")
@patch("superset.utils.csv.get_chart_csv_data")
def test_slack_chart_report_schedule_with_csv_deprecated_api(
csv_mock,
mock_open,
mock_urlopen,
slack_client_mock_class,
create_report_slack_chart_with_csv,
):
"""
ExecuteReport Command: Test chart slack report schedule with CSV
"""
# setup csv mock
response = Mock()
mock_open.return_value = response
mock_urlopen.return_value = response
mock_urlopen.return_value.getcode.return_value = 200
response.read.return_value = CSV_FILE
notification_targets = get_target_from_report_schedule(
create_report_slack_chart_with_csv
)
channel_name = notification_targets[0]
slack_client_mock_class.return_value = Mock()
slack_client_mock_class.return_value.conversations_list.side_effect = SlackApiError(
"Error", "Response"
)
with freeze_time("2020-01-01T00:00:00Z"):
AsyncExecuteReportScheduleCommand(
@ -1347,6 +1266,7 @@ def test_slack_chart_report_schedule_with_csv_deprecated_api(
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart_with_text"
)
@patch("superset.reports.notifications.slack.should_use_v2_api", return_value=False)
@patch("superset.utils.csv.urllib.request.urlopen")
@patch("superset.utils.csv.urllib.request.OpenerDirector.open")
@patch("superset.reports.notifications.slack.get_slack_client")
@ -1356,6 +1276,7 @@ def test_slack_chart_report_schedule_with_text(
slack_client_mock_class,
mock_open,
mock_urlopen,
slack_should_use_v2_api_mock,
create_report_slack_chart_with_text,
):
"""
@ -1383,17 +1304,6 @@ def test_slack_chart_report_schedule_with_text(
}
).encode("utf-8")
notification_targets = get_target_from_report_schedule(
create_report_slack_chart_with_text
)
channel_name = notification_targets[0]
channel_id = "channel_id"
slack_client_mock_class.return_value.conversations_list.return_value = {
"channels": [{"id": channel_id, "name": channel_name}]
}
with freeze_time("2020-01-01T00:00:00Z"):
AsyncExecuteReportScheduleCommand(
TEST_ID, create_report_slack_chart_with_text.id, datetime.utcnow()
@ -1420,87 +1330,6 @@ def test_slack_chart_report_schedule_with_text(
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart_with_text"
)
@patch("superset.utils.csv.urllib.request.urlopen")
@patch("superset.utils.csv.urllib.request.OpenerDirector.open")
@patch("superset.reports.notifications.slack.get_slack_client")
@patch("superset.utils.csv.get_chart_dataframe")
def test_slack_chart_report_schedule_with_text_deprecated_slack_api(
dataframe_mock,
slack_client_mock_class,
mock_open,
mock_urlopen,
create_report_slack_chart_with_text,
):
"""
ExecuteReport Command: Test chart slack report schedule with text
"""
# setup dataframe mock
response = Mock()
mock_open.return_value = response
mock_urlopen.return_value = response
mock_urlopen.return_value.getcode.return_value = 200
response.read.return_value = json.dumps(
{
"result": [
{
"data": {
"t1": {0: "c11", 1: "c21"},
"t2": {0: "c12", 1: "c22"},
"t3__sum": {0: "c13", 1: "c23"},
},
"colnames": [("t1",), ("t2",), ("t3__sum",)],
"indexnames": [(0,), (1,)],
"coltypes": [1, 1, 0],
},
],
}
).encode("utf-8")
notification_targets = get_target_from_report_schedule(
create_report_slack_chart_with_text
)
channel_name = notification_targets[0]
slack_client_mock_class.return_value.conversations_list.side_effect = SlackApiError(
"Error", "Response"
)
with freeze_time("2020-01-01T00:00:00Z"):
AsyncExecuteReportScheduleCommand(
TEST_ID, create_report_slack_chart_with_text.id, datetime.utcnow()
).run()
table_markdown = """| | t1 | t2 | t3__sum |
|---:|:-----|:-----|:----------|
| 0 | c11 | c12 | c13 |
| 1 | c21 | c22 | c23 |"""
assert (
table_markdown
in slack_client_mock_class.return_value.chat_postMessage.call_args[1][
"text"
]
)
assert (
f"<http://0.0.0.0:8080/explore/?form_data=%7B%22slice_id%22:+{create_report_slack_chart_with_text.chart.id}%7D&force=false|Explore in Superset>"
in slack_client_mock_class.return_value.chat_postMessage.call_args[1][
"text"
]
)
assert (
slack_client_mock_class.return_value.chat_postMessage.call_args[1][
"channel"
]
== channel_name
)
# Assert logs are correct
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures("create_report_slack_chart")
def test_report_schedule_not_found(create_report_slack_chart):
"""

View File

@ -59,18 +59,19 @@ def test_send_slack(
description='<p>This is <a href="#">a test</a> alert</p><br />',
)
SlackNotification(
notification = SlackNotification(
recipient=ReportRecipients(
type=ReportRecipientType.SLACK,
recipient_config_json='{"target": "some_channel"}',
),
content=content,
).send()
)
notification.send()
logger_mock.info.assert_called_with(
"Report sent to slack", extra={"execution_id": execution_id}
)
slack_client_mock.return_value.chat_postMessage.assert_called_with(
channel="123",
channel="some_channel",
text="""*test alert*
<p>This is <a href="#">a test</a> alert</p><br />

View File

@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import Mock
import pandas as pd
@ -55,15 +54,6 @@ def test_get_channel_with_multi_recipients() -> None:
content=content,
)
client = Mock()
client.conversations_list.return_value = {
"channels": [
{"name": "some_channel", "id": "23SDKE"},
{"name": "second_channel", "id": "WD3D8KE"},
{"name": "third_channel", "id": "223DFKE"},
]
}
result = slack_notification._get_channel()
result = slack_notification._get_channels(client)
assert result == ["23SDKE", "WD3D8KE", "223DFKE"]
assert result == "some_channel,second_channel,third_channel"