test: upload excel (#10467)

This commit is contained in:
Hossein Torabi 2020-07-30 00:20:39 +04:30 committed by GitHub
parent 78cad9a4a8
commit 259a344fd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 1 deletions

View File

@ -37,3 +37,6 @@ requests==2.23.0
statsd==3.3.0
tox==3.11.1
pillow==7.0.0
openpyxl==3.0.3 # Pandas use openpyxl to write excel format(using in unittes)
xlrd==1.2.0

View File

@ -395,7 +395,7 @@ class ExcelToDatabaseView(SimpleFormView):
os.remove(uploaded_tmp_file_path)
# Go back to welcome page / splash screen
message = _(
'CSV file "%(excel_filename)s" uploaded to table "%(table_name)s" in '
'Excel file "%(excel_filename)s" uploaded to table "%(table_name)s" in '
'database "%(db_name)s"',
excel_filename=form.excel_file.data.filename,
table_name=str(excel_table),

View File

@ -811,6 +811,9 @@ class TestCore(SupersetTestCase):
for l in content:
test_file.write(f"{l}\n")
def create_sample_excelfile(self, filename: str, content: Dict[str, str]) -> None:
pd.DataFrame(content).to_excel(filename)
def enable_csv_upload(self, database: models.Database) -> None:
"""Enables csv upload in the given database."""
database.allow_csv_upload = True
@ -837,6 +840,22 @@ class TestCore(SupersetTestCase):
form_data.update(extra)
return self.get_resp("/csvtodatabaseview/form", data=form_data)
def upload_excel(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"excel_file": open(filename, "rb"),
"name": table_name,
"con": utils.get_example_database().id,
"sheet_name": "Sheet1",
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/exceltodatabaseview/form", data=form_data)
@mock.patch(
"superset.models.core.config",
{**app.config, "ALLOWED_USER_CSV_SCHEMA_FUNC": lambda d, u: ["admin_database"]},
@ -981,6 +1000,39 @@ class TestCore(SupersetTestCase):
os.remove(f1)
os.remove(f2)
def test_import_excel(self):
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
f1 = "testExcel.xlsx"
self.create_sample_excelfile(f1, {"a": ["john", "paul"], "b": [1, 2]})
self.enable_csv_upload(utils.get_example_database())
try:
success_msg_f1 = f'Excel file "{f1}" uploaded to table "{table_name}"'
# initial upload with fail mode
resp = self.upload_excel(f1, table_name)
self.assertIn(success_msg_f1, resp)
# upload again with fail mode; should fail
fail_msg = f'Unable to upload Excel file "{f1}" to table "{table_name}"'
resp = self.upload_excel(f1, table_name)
self.assertIn(fail_msg, resp)
# upload again with append mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)
# upload again with replace mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)
# make sure that john and empty string are replaced with None
data = db.session.execute(f"SELECT * from {table_name}").fetchall()
assert data == [(0, "john", 1), (1, "paul", 2)]
finally:
os.remove(f1)
def test_dataframe_timezone(self):
tz = pytz.FixedOffset(60)
data = [