72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
# Author: Junjun
|
|
# Date: 2025/5/19
|
|
import urllib.parse
|
|
from typing import List
|
|
|
|
from sqlalchemy import create_engine, text, MetaData, Table
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from apps.datasource.models.datasource import DatasourceConf
|
|
from common.core.config import settings
|
|
|
|
|
|
def get_engine_config():
|
|
return DatasourceConf(username=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD,
|
|
host=settings.POSTGRES_SERVER, port=settings.POSTGRES_PORT, database=settings.POSTGRES_DB,
|
|
dbSchema="public", timeout=30) # read engine config
|
|
|
|
|
|
def get_engine_uri(conf: DatasourceConf):
|
|
return f"postgresql+psycopg2://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}"
|
|
|
|
|
|
def get_engine_conn():
|
|
conf = get_engine_config()
|
|
db_url = get_engine_uri(conf)
|
|
engine = create_engine(db_url,
|
|
connect_args={"options": f"-c search_path={conf.dbSchema}", "connect_timeout": conf.timeout},
|
|
pool_timeout=conf.timeout)
|
|
return engine
|
|
|
|
|
|
def get_data_engine():
|
|
engine = get_engine_conn()
|
|
session_maker = sessionmaker(bind=engine)
|
|
session = session_maker()
|
|
return session
|
|
|
|
|
|
def create_table(session, table_name: str, fields: List[any]):
|
|
# field type relation
|
|
list = []
|
|
for f in fields:
|
|
if "object" in f["type"]:
|
|
f["relType"] = "text"
|
|
elif "int" in f["type"]:
|
|
f["relType"] = "bigint"
|
|
elif "float" in f["type"]:
|
|
f["relType"] = "numeric"
|
|
elif "datetime" in f["type"]:
|
|
f["relType"] = "timestamp"
|
|
else:
|
|
f["relType"] = "text"
|
|
list.append(f'"{f["name"]}" {f["relType"]}')
|
|
|
|
sql = f"""
|
|
CREATE TABLE "{table_name}" (
|
|
{", ".join(list)}
|
|
);
|
|
"""
|
|
session.execute(text(sql))
|
|
session.commit()
|
|
|
|
|
|
def insert_data(session, table_name: str, fields: List[any], data: List[any]):
|
|
engine = get_engine_conn()
|
|
metadata = MetaData()
|
|
table = Table(table_name, metadata, autoload_with=engine)
|
|
with engine.connect() as conn:
|
|
stmt = table.insert().values(data)
|
|
conn.execute(stmt)
|
|
conn.commit()
|