fix: improve upload ZIP file validation (#25658)

This commit is contained in:
Daniel Vaz Gaspar 2023-10-17 18:28:09 +01:00 committed by GitHub
parent cb963585ad
commit f473d13d0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 0 deletions

View File

@ -26,6 +26,7 @@ from superset import db
from superset.commands.importers.exceptions import IncorrectVersionError
from superset.databases.ssh_tunnel.models import SSHTunnel
from superset.models.core import Database
from superset.utils.core import check_is_safe_zip
METADATA_FILE_NAME = "metadata.yaml"
IMPORT_VERSION = "1.0.0"
@ -207,6 +208,7 @@ def is_valid_config(file_name: str) -> bool:
def get_contents_from_bundle(bundle: ZipFile) -> dict[str, str]:
check_is_safe_zip(bundle)
return {
remove_root(file_name): bundle.read(file_name).decode()
for file_name in bundle.namelist()

View File

@ -1600,6 +1600,11 @@ WELCOME_PAGE_LAST_TAB: (
Literal["examples", "all"] | tuple[str, list[dict[str, Any]]]
) = "all"
# Max allowed size for a zipped file
ZIPPED_FILE_MAX_SIZE = 100 * 1024 * 1024 # 100MB
# Max allowed compression ratio for a zipped file
ZIP_FILE_MAX_COMPRESS_RATIO = 200.0
# Configuration for environment tag shown on the navbar. Setting 'text' to '' will hide the tag.
# 'color' can either be a hex color code, or a dot-indexed theme color (e.g. error.base)
ENVIRONMENT_TAG_CONFIG = {

View File

@ -1917,6 +1917,25 @@ def create_zip(files: dict[str, Any]) -> BytesIO:
return buf
def check_is_safe_zip(zip_file: ZipFile) -> None:
"""
Checks whether a ZIP file is safe, raises SupersetException if not.
:param zip_file:
:return:
"""
uncompress_size = 0
compress_size = 0
for zip_file_element in zip_file.infolist():
if zip_file_element.file_size > current_app.config["ZIPPED_FILE_MAX_SIZE"]:
raise SupersetException("Found file with size above allowed threshold")
uncompress_size += zip_file_element.file_size
compress_size += zip_file_element.compress_size
compress_ratio = uncompress_size / compress_size
if compress_ratio > current_app.config["ZIP_FILE_MAX_COMPRESS_RATIO"]:
raise SupersetException("Zip compress ratio above allowed threshold")
def remove_extra_adhoc_filters(form_data: dict[str, Any]) -> None:
"""
Remove filters from slice data that originate from a filter box or native filter

View File

@ -15,13 +15,17 @@
# specific language governing permissions and limitations
# under the License.
import os
from dataclasses import dataclass
from typing import Any, Optional
from unittest.mock import MagicMock
import pandas as pd
import pytest
from superset.exceptions import SupersetException
from superset.utils.core import (
cast_to_boolean,
check_is_safe_zip,
DateColumn,
is_test,
normalize_dttm_col,
@ -44,6 +48,12 @@ EXTRA_FILTER: QueryObjectFilterClause = {
}
@dataclass
class MockZipInfo:
file_size: int
compress_size: int
@pytest.mark.parametrize(
"original,expected",
[
@ -201,3 +211,50 @@ def test_normalize_dttm_col() -> None:
normalize_dttm_col(df, dttm_cols)
assert df["__time"].astype(str).tolist() == ["2017-07-01"]
def test_check_if_safe_zip_success(app_context: None) -> None:
"""
Test if ZIP files are safe
"""
ZipFile = MagicMock()
ZipFile.infolist.return_value = [
MockZipInfo(file_size=1000, compress_size=10),
MockZipInfo(file_size=1000, compress_size=10),
MockZipInfo(file_size=1000, compress_size=10),
MockZipInfo(file_size=1000, compress_size=10),
MockZipInfo(file_size=1000, compress_size=10),
]
check_is_safe_zip(ZipFile)
def test_check_if_safe_zip_high_rate(app_context: None) -> None:
"""
Test if ZIP files is not highly compressed
"""
ZipFile = MagicMock()
ZipFile.infolist.return_value = [
MockZipInfo(file_size=1000, compress_size=1),
MockZipInfo(file_size=1000, compress_size=1),
MockZipInfo(file_size=1000, compress_size=1),
MockZipInfo(file_size=1000, compress_size=1),
MockZipInfo(file_size=1000, compress_size=1),
]
with pytest.raises(SupersetException):
check_is_safe_zip(ZipFile)
def test_check_if_safe_zip_hidden_bomb(app_context: None) -> None:
"""
Test if ZIP file does not contain a big file highly compressed
"""
ZipFile = MagicMock()
ZipFile.infolist.return_value = [
MockZipInfo(file_size=1000, compress_size=100),
MockZipInfo(file_size=1000, compress_size=100),
MockZipInfo(file_size=1000, compress_size=100),
MockZipInfo(file_size=1000, compress_size=100),
MockZipInfo(file_size=1000 * (1024 * 1024), compress_size=100),
]
with pytest.raises(SupersetException):
check_is_safe_zip(ZipFile)