import asyncio import hashlib import os import traceback import uuid from io import StringIO from typing import List import orjson import pandas as pd from fastapi import APIRouter, File, UploadFile, HTTPException from apps.db.db import get_schema from apps.db.engine import get_engine_conn from common.core.config import settings from common.core.deps import SessionDep, CurrentUser, Trans from common.utils.utils import SQLBotLogUtil from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \ execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \ check_status_by_id from ..crud.field import get_fields_by_table_id from ..crud.table import get_tables_by_ds_id from ..models.datasource import CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField router = APIRouter(tags=["datasource"], prefix="/datasource") path = settings.EXCEL_PATH @router.get("/ws/{oid}", include_in_schema=False) async def query_by_oid(session: SessionDep, user: CurrentUser, oid: int) -> List[CoreDatasource]: if not user.isAdmin: raise Exception("no permission to execute") return get_datasource_list(session=session, user=user, oid=oid) @router.get("/list") async def datasource_list(session: SessionDep, user: CurrentUser): return get_datasource_list(session=session, user=user) @router.post("/get/{id}") async def get_datasource(session: SessionDep, id: int): return get_ds(session, id) @router.post("/check") async def check(session: SessionDep, trans: Trans, ds: CoreDatasource): def inner(): return check_status(session, trans, ds, True) return await asyncio.to_thread(inner) @router.get("/check/{ds_id}") async def check_by_id(session: SessionDep, trans: Trans, ds_id: int): def inner(): return check_status_by_id(session, trans, ds_id, True) return await asyncio.to_thread(inner) @router.post("/add", response_model=CoreDatasource) async def add(session: SessionDep, trans: Trans, user: CurrentUser, ds: CreateDatasource): def inner(): return create_ds(session, trans, user, ds) return await asyncio.to_thread(inner) @router.post("/chooseTables/{id}") async def choose_tables(session: SessionDep, trans: Trans, id: int, tables: List[CoreTable]): def inner(): chooseTables(session, trans, id, tables) await asyncio.to_thread(inner) @router.post("/update", response_model=CoreDatasource) async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource): def inner(): return update_ds(session, trans, user, ds) return await asyncio.to_thread(inner) @router.post("/delete/{id}", response_model=CoreDatasource) async def delete(session: SessionDep, id: int): return delete_ds(session, id) @router.post("/getTables/{id}") async def get_tables(session: SessionDep, id: int): return getTables(session, id) @router.post("/getTablesByConf") async def get_tables_by_conf(session: SessionDep, trans: Trans, ds: CoreDatasource): try: return getTablesByDs(session, ds) except Exception as e: # check ds status def inner(): return check_status(session, trans, ds, True) status = await asyncio.to_thread(inner) if status: SQLBotLogUtil.error(f"get table failed: {e}") raise HTTPException(status_code=500, detail=f'Get table Failed: {e.args}') @router.post("/getSchemaByConf") async def get_schema_by_conf(session: SessionDep, trans: Trans, ds: CoreDatasource): try: return get_schema(ds) except Exception as e: # check ds status def inner(): return check_status(session, trans, ds, True) status = await asyncio.to_thread(inner) if status: SQLBotLogUtil.error(f"get table failed: {e}") raise HTTPException(status_code=500, detail=f'Get table Failed: {e.args}') @router.post("/getFields/{id}/{table_name}") async def get_fields(session: SessionDep, id: int, table_name: str): return getFields(session, id, table_name) from pydantic import BaseModel class TestObj(BaseModel): sql: str = None @router.post("/execSql/{id}") async def exec_sql(session: SessionDep, id: int, obj: TestObj): def inner(): data = execSql(session, id, obj.sql) try: data_obj = data.get('data') # print(orjson.dumps(data, option=orjson.OPT_NON_STR_KEYS).decode()) print(orjson.dumps(data_obj).decode()) except Exception: traceback.print_exc() return data return await asyncio.to_thread(inner) @router.post("/tableList/{id}") async def table_list(session: SessionDep, id: int): return get_tables_by_ds_id(session, id) @router.post("/fieldList/{id}") async def field_list(session: SessionDep, id: int): return get_fields_by_table_id(session, id) @router.post("/editLocalComment") async def edit_local(session: SessionDep, data: TableObj): update_table_and_fields(session, data) @router.post("/editTable") async def edit_table(session: SessionDep, table: CoreTable): updateTable(session, table) @router.post("/editField") async def edit_field(session: SessionDep, field: CoreField): updateField(session, field) @router.post("/previewData/{id}") async def preview_data(session: SessionDep, trans: Trans, current_user: CurrentUser, id: int, data: TableObj): def inner(): try: return preview(session, current_user, id, data) except Exception as e: ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first() # check ds status status = check_status(session, trans, ds, True) if status: SQLBotLogUtil.error(f"Preview failed: {e}") raise HTTPException(status_code=500, detail=f'Preview Failed: {e.args}') return await asyncio.to_thread(inner) @router.post("/fieldEnum/{id}") async def field_enum(session: SessionDep, id: int): def inner(): return fieldEnum(session, id) return await asyncio.to_thread(inner) # @router.post("/uploadExcel") # async def upload_excel(session: SessionDep, file: UploadFile = File(...)): # ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"} # if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): # raise HTTPException(400, "Only support .xlsx/.xls/.csv") # # os.makedirs(path, exist_ok=True) # filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}" # save_path = os.path.join(path, filename) # with open(save_path, "wb") as f: # f.write(await file.read()) # # def inner(): # sheets = [] # with get_data_engine() as conn: # if filename.endswith(".csv"): # df = pd.read_csv(save_path, engine='c') # tableName = f"sheet1_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" # sheets.append({"tableName": tableName, "tableComment": ""}) # column_len = len(df.dtypes) # fields = [] # for i in range(column_len): # # build fields # fields.append({"name": df.columns[i], "type": str(df.dtypes[i]), "relType": ""}) # # create table # create_table(conn, tableName, fields) # # data = [ # {df.columns[i]: None if pd.isna(row[i]) else (int(row[i]) if "int" in str(df.dtypes[i]) else row[i]) # for i in range(len(row))} # for row in df.values # ] # # insert data # insert_data(conn, tableName, fields, data) # else: # excel_engine = 'xlrd' if filename.endswith(".xls") else 'openpyxl' # df_sheets = pd.read_excel(save_path, sheet_name=None, engine=excel_engine) # # build columns and data to insert db # for sheet_name, df in df_sheets.items(): # tableName = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" # sheets.append({"tableName": tableName, "tableComment": ""}) # column_len = len(df.dtypes) # fields = [] # for i in range(column_len): # # build fields # fields.append({"name": df.columns[i], "type": str(df.dtypes[i]), "relType": ""}) # # create table # create_table(conn, tableName, fields) # # data = [ # {df.columns[i]: None if pd.isna(row[i]) else ( # int(row[i]) if "int" in str(df.dtypes[i]) else row[i]) # for i in range(len(row))} # for row in df.values # ] # # insert data # insert_data(conn, tableName, fields, data) # # os.remove(save_path) # return {"filename": filename, "sheets": sheets} # # return await asyncio.to_thread(inner) @router.post("/uploadExcel") async def upload_excel(session: SessionDep, file: UploadFile = File(...)): ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"} if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): raise HTTPException(400, "Only support .xlsx/.xls/.csv") os.makedirs(path, exist_ok=True) filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}" save_path = os.path.join(path, filename) with open(save_path, "wb") as f: f.write(await file.read()) def inner(): sheets = [] engine = get_engine_conn() if filename.endswith(".csv"): df = pd.read_csv(save_path, engine='c') tableName = f"sheet1_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" sheets.append({"tableName": tableName, "tableComment": ""}) insert_pg(df, tableName, engine) else: sheet_names = pd.ExcelFile(save_path).sheet_names for sheet_name in sheet_names: tableName = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" sheets.append({"tableName": tableName, "tableComment": ""}) # df_temp = pd.read_excel(save_path, nrows=5) # non_empty_cols = df_temp.columns[df_temp.notna().any()].tolist() df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine') insert_pg(df, tableName, engine) # os.remove(save_path) return {"filename": filename, "sheets": sheets} return await asyncio.to_thread(inner) def insert_pg(df, tableName, engine): # fix field type for i in range(len(df.dtypes)): if str(df.dtypes[i]) == 'uint64': df[str(df.columns[i])] = df[str(df.columns[i])].astype('string') conn = engine.raw_connection() cursor = conn.cursor() try: df.to_sql( tableName, engine, if_exists='replace', index=False ) # trans csv output = StringIO() df.to_csv(output, sep='\t', header=False, index=False) # output.seek(0) # pg copy cursor.copy_expert( sql=f"""COPY "{tableName}" FROM STDIN WITH CSV DELIMITER E'\t'""", file=output ) conn.commit() except Exception as e: traceback.print_exc() raise HTTPException(400, str(e)) finally: cursor.close() conn.close()