Files
SQLBot/backend/apps/db/engine.py
2025-09-08 16:36:08 +08:00

72 lines
2.2 KiB
Python

# Author: Junjun
# Date: 2025/5/19
import urllib.parse
from typing import List
from sqlalchemy import create_engine, text, MetaData, Table
from sqlalchemy.orm import sessionmaker
from apps.datasource.models.datasource import DatasourceConf
from common.core.config import settings
def get_engine_config():
return DatasourceConf(username=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD,
host=settings.POSTGRES_SERVER, port=settings.POSTGRES_PORT, database=settings.POSTGRES_DB,
dbSchema="public", timeout=30) # read engine config
def get_engine_uri(conf: DatasourceConf):
return f"postgresql+psycopg2://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}"
def get_engine_conn():
conf = get_engine_config()
db_url = get_engine_uri(conf)
engine = create_engine(db_url,
connect_args={"options": f"-c search_path={conf.dbSchema}", "connect_timeout": conf.timeout},
pool_timeout=conf.timeout)
return engine
def get_data_engine():
engine = get_engine_conn()
session_maker = sessionmaker(bind=engine)
session = session_maker()
return session
def create_table(session, table_name: str, fields: List[any]):
# field type relation
list = []
for f in fields:
if "object" in f["type"]:
f["relType"] = "text"
elif "int" in f["type"]:
f["relType"] = "bigint"
elif "float" in f["type"]:
f["relType"] = "numeric"
elif "datetime" in f["type"]:
f["relType"] = "timestamp"
else:
f["relType"] = "text"
list.append(f'"{f["name"]}" {f["relType"]}')
sql = f"""
CREATE TABLE "{table_name}" (
{", ".join(list)}
);
"""
session.execute(text(sql))
session.commit()
def insert_data(session, table_name: str, fields: List[any], data: List[any]):
engine = get_engine_conn()
metadata = MetaData()
table = Table(table_name, metadata, autoload_with=engine)
with engine.connect() as conn:
stmt = table.insert().values(data)
conn.execute(stmt)
conn.commit()