From 337b1cfe5cf094f6719ecce732b0ce37d8953196 Mon Sep 17 00:00:00 2001 From: inter Date: Mon, 8 Sep 2025 16:36:19 +0800 Subject: [PATCH] Add File --- backend/apps/datasource/api/datasource.py | 334 ++++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 backend/apps/datasource/api/datasource.py diff --git a/backend/apps/datasource/api/datasource.py b/backend/apps/datasource/api/datasource.py new file mode 100644 index 0000000..60dd929 --- /dev/null +++ b/backend/apps/datasource/api/datasource.py @@ -0,0 +1,334 @@ +import asyncio +import hashlib +import os +import traceback +import uuid +from io import StringIO +from typing import List + +import orjson +import pandas as pd +from fastapi import APIRouter, File, UploadFile, HTTPException + +from apps.db.db import get_schema +from apps.db.engine import get_engine_conn +from common.core.config import settings +from common.core.deps import SessionDep, CurrentUser, Trans +from common.utils.utils import SQLBotLogUtil +from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \ + execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \ + check_status_by_id +from ..crud.field import get_fields_by_table_id +from ..crud.table import get_tables_by_ds_id +from ..models.datasource import CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField + +router = APIRouter(tags=["datasource"], prefix="/datasource") +path = settings.EXCEL_PATH + + +@router.get("/ws/{oid}", include_in_schema=False) +async def query_by_oid(session: SessionDep, user: CurrentUser, oid: int) -> List[CoreDatasource]: + if not user.isAdmin: + raise Exception("no permission to execute") + return get_datasource_list(session=session, user=user, oid=oid) + + +@router.get("/list") +async def datasource_list(session: SessionDep, user: CurrentUser): + return get_datasource_list(session=session, user=user) + + +@router.post("/get/{id}") +async def get_datasource(session: SessionDep, id: int): + return get_ds(session, id) + + +@router.post("/check") +async def check(session: SessionDep, trans: Trans, ds: CoreDatasource): + def inner(): + return check_status(session, trans, ds, True) + + return await asyncio.to_thread(inner) + + +@router.get("/check/{ds_id}") +async def check_by_id(session: SessionDep, trans: Trans, ds_id: int): + def inner(): + return check_status_by_id(session, trans, ds_id, True) + + return await asyncio.to_thread(inner) + + +@router.post("/add", response_model=CoreDatasource) +async def add(session: SessionDep, trans: Trans, user: CurrentUser, ds: CreateDatasource): + def inner(): + return create_ds(session, trans, user, ds) + + return await asyncio.to_thread(inner) + + +@router.post("/chooseTables/{id}") +async def choose_tables(session: SessionDep, trans: Trans, id: int, tables: List[CoreTable]): + def inner(): + chooseTables(session, trans, id, tables) + + await asyncio.to_thread(inner) + + +@router.post("/update", response_model=CoreDatasource) +async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource): + def inner(): + return update_ds(session, trans, user, ds) + + return await asyncio.to_thread(inner) + + +@router.post("/delete/{id}", response_model=CoreDatasource) +async def delete(session: SessionDep, id: int): + return delete_ds(session, id) + + +@router.post("/getTables/{id}") +async def get_tables(session: SessionDep, id: int): + return getTables(session, id) + + +@router.post("/getTablesByConf") +async def get_tables_by_conf(session: SessionDep, trans: Trans, ds: CoreDatasource): + try: + return getTablesByDs(session, ds) + except Exception as e: + # check ds status + def inner(): + return check_status(session, trans, ds, True) + + status = await asyncio.to_thread(inner) + if status: + SQLBotLogUtil.error(f"get table failed: {e}") + raise HTTPException(status_code=500, detail=f'Get table Failed: {e.args}') + + +@router.post("/getSchemaByConf") +async def get_schema_by_conf(session: SessionDep, trans: Trans, ds: CoreDatasource): + try: + return get_schema(ds) + except Exception as e: + # check ds status + def inner(): + return check_status(session, trans, ds, True) + + status = await asyncio.to_thread(inner) + if status: + SQLBotLogUtil.error(f"get table failed: {e}") + raise HTTPException(status_code=500, detail=f'Get table Failed: {e.args}') + + +@router.post("/getFields/{id}/{table_name}") +async def get_fields(session: SessionDep, id: int, table_name: str): + return getFields(session, id, table_name) + + +from pydantic import BaseModel + + +class TestObj(BaseModel): + sql: str = None + + +@router.post("/execSql/{id}") +async def exec_sql(session: SessionDep, id: int, obj: TestObj): + def inner(): + data = execSql(session, id, obj.sql) + try: + data_obj = data.get('data') + # print(orjson.dumps(data, option=orjson.OPT_NON_STR_KEYS).decode()) + print(orjson.dumps(data_obj).decode()) + except Exception: + traceback.print_exc() + + return data + + return await asyncio.to_thread(inner) + + +@router.post("/tableList/{id}") +async def table_list(session: SessionDep, id: int): + return get_tables_by_ds_id(session, id) + + +@router.post("/fieldList/{id}") +async def field_list(session: SessionDep, id: int): + return get_fields_by_table_id(session, id) + + +@router.post("/editLocalComment") +async def edit_local(session: SessionDep, data: TableObj): + update_table_and_fields(session, data) + + +@router.post("/editTable") +async def edit_table(session: SessionDep, table: CoreTable): + updateTable(session, table) + + +@router.post("/editField") +async def edit_field(session: SessionDep, field: CoreField): + updateField(session, field) + + +@router.post("/previewData/{id}") +async def preview_data(session: SessionDep, trans: Trans, current_user: CurrentUser, id: int, data: TableObj): + def inner(): + try: + return preview(session, current_user, id, data) + except Exception as e: + ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first() + # check ds status + status = check_status(session, trans, ds, True) + if status: + SQLBotLogUtil.error(f"Preview failed: {e}") + raise HTTPException(status_code=500, detail=f'Preview Failed: {e.args}') + + return await asyncio.to_thread(inner) + + +@router.post("/fieldEnum/{id}") +async def field_enum(session: SessionDep, id: int): + def inner(): + return fieldEnum(session, id) + + return await asyncio.to_thread(inner) + + +# @router.post("/uploadExcel") +# async def upload_excel(session: SessionDep, file: UploadFile = File(...)): +# ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"} +# if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): +# raise HTTPException(400, "Only support .xlsx/.xls/.csv") +# +# os.makedirs(path, exist_ok=True) +# filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}" +# save_path = os.path.join(path, filename) +# with open(save_path, "wb") as f: +# f.write(await file.read()) +# +# def inner(): +# sheets = [] +# with get_data_engine() as conn: +# if filename.endswith(".csv"): +# df = pd.read_csv(save_path, engine='c') +# tableName = f"sheet1_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" +# sheets.append({"tableName": tableName, "tableComment": ""}) +# column_len = len(df.dtypes) +# fields = [] +# for i in range(column_len): +# # build fields +# fields.append({"name": df.columns[i], "type": str(df.dtypes[i]), "relType": ""}) +# # create table +# create_table(conn, tableName, fields) +# +# data = [ +# {df.columns[i]: None if pd.isna(row[i]) else (int(row[i]) if "int" in str(df.dtypes[i]) else row[i]) +# for i in range(len(row))} +# for row in df.values +# ] +# # insert data +# insert_data(conn, tableName, fields, data) +# else: +# excel_engine = 'xlrd' if filename.endswith(".xls") else 'openpyxl' +# df_sheets = pd.read_excel(save_path, sheet_name=None, engine=excel_engine) +# # build columns and data to insert db +# for sheet_name, df in df_sheets.items(): +# tableName = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" +# sheets.append({"tableName": tableName, "tableComment": ""}) +# column_len = len(df.dtypes) +# fields = [] +# for i in range(column_len): +# # build fields +# fields.append({"name": df.columns[i], "type": str(df.dtypes[i]), "relType": ""}) +# # create table +# create_table(conn, tableName, fields) +# +# data = [ +# {df.columns[i]: None if pd.isna(row[i]) else ( +# int(row[i]) if "int" in str(df.dtypes[i]) else row[i]) +# for i in range(len(row))} +# for row in df.values +# ] +# # insert data +# insert_data(conn, tableName, fields, data) +# +# os.remove(save_path) +# return {"filename": filename, "sheets": sheets} +# +# return await asyncio.to_thread(inner) + + +@router.post("/uploadExcel") +async def upload_excel(session: SessionDep, file: UploadFile = File(...)): + ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"} + if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): + raise HTTPException(400, "Only support .xlsx/.xls/.csv") + + os.makedirs(path, exist_ok=True) + filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}" + save_path = os.path.join(path, filename) + with open(save_path, "wb") as f: + f.write(await file.read()) + + def inner(): + sheets = [] + engine = get_engine_conn() + if filename.endswith(".csv"): + df = pd.read_csv(save_path, engine='c') + tableName = f"sheet1_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" + sheets.append({"tableName": tableName, "tableComment": ""}) + insert_pg(df, tableName, engine) + else: + sheet_names = pd.ExcelFile(save_path).sheet_names + for sheet_name in sheet_names: + tableName = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" + sheets.append({"tableName": tableName, "tableComment": ""}) + # df_temp = pd.read_excel(save_path, nrows=5) + # non_empty_cols = df_temp.columns[df_temp.notna().any()].tolist() + df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine') + insert_pg(df, tableName, engine) + + # os.remove(save_path) + return {"filename": filename, "sheets": sheets} + + return await asyncio.to_thread(inner) + + +def insert_pg(df, tableName, engine): + # fix field type + for i in range(len(df.dtypes)): + if str(df.dtypes[i]) == 'uint64': + df[str(df.columns[i])] = df[str(df.columns[i])].astype('string') + + conn = engine.raw_connection() + cursor = conn.cursor() + try: + df.to_sql( + tableName, + engine, + if_exists='replace', + index=False + ) + # trans csv + output = StringIO() + df.to_csv(output, sep='\t', header=False, index=False) + # output.seek(0) + + # pg copy + cursor.copy_expert( + sql=f"""COPY "{tableName}" FROM STDIN WITH CSV DELIMITER E'\t'""", + file=output + ) + conn.commit() + except Exception as e: + traceback.print_exc() + raise HTTPException(400, str(e)) + finally: + cursor.close() + conn.close()