python导入数据到postgresql数据库

import pandas as pd
from sqlalchemy import create_engine
import time
import re

# ===================== 配置区 =====================
CSV_FILE = r"C:\Users\Administrator\Desktop\商品资料.csv"
TABLE_NAME = "商品资料"
BATCH_SIZE = 10000        # 读取分块

# PG 连接信息
DB_USER = "postgres"
DB_PWD = "123456"
DB_HOST = "127.0.0.1"
DB_PORT = 5432
DB_NAME = "postgres"
# ===================================================

def clean_text(text):
    if pd.isna(text):
        return None
    text = str(text)
    # 还原 _x002B_ 这类转义字符
    pattern = r'_x([0-9A-Fa-f]{4})_'
    text = re.sub(pattern, lambda m: chr(int(m.group(1), 16)), text)
    # 清除字段内换行、回车,避免解析异常
    text = text.replace("\r", "").replace("\n", " ")
    return text

def get_csv_effective_rows(csv_path, chunksize=10000, encoding="utf-8"):
    total = 0
    reader = pd.read_csv(
        csv_path,
        header=0,
        dtype=str,
        encoding=encoding,
        chunksize=chunksize
    )
    for chunk in reader:
        total += len(chunk)
    return total

def get_pg_table_count(engine, table):
    sql = f'SELECT COUNT(*) AS cnt FROM "{table}"'
    df = pd.read_sql(sql, engine)
    return int(df["cnt"].iloc[0])

def main():
    total_start = time.time()
    engine = create_engine(f"postgresql://{DB_USER}:{DB_PWD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

    print("📊 正在统计CSV有效数据行数...")
    csv_effective_rows = get_csv_effective_rows(CSV_FILE, chunksize=BATCH_SIZE, encoding="utf-8")
    print(f"📄 CSV 有效数据行数(不含表头、空行): {csv_effective_rows}")

    before_pg_count = get_pg_table_count(engine, TABLE_NAME)
    print(f"🗄️  导入前 PG 表总行数: {before_pg_count}")

    reader = pd.read_csv(
        CSV_FILE,
        header=0,
        dtype=str,
        encoding="utf-8",
        chunksize=BATCH_SIZE
    )

    write_total = 0
    for i, chunk in enumerate(reader):
        # 定义批次起始时间
        chunk_start = time.time()

        # 清洗数据
        t1 = time.time()
        chunk = chunk.map(clean_text)
        t2 = time.time()

        # 写入数据库
        chunk.to_sql(
            name=TABLE_NAME,
            con=engine,
            if_exists="append",
            index=False,
            chunksize=2000
        )
        t3 = time.time()

        chunk_end = time.time()
        batch_rows = len(chunk)
        write_total += batch_rows

        # 分项耗时
        cost_read_clean = round(t2 - t1, 2)
        cost_write = round(t3 - t2, 2)
        cost_total = round(chunk_end - chunk_start, 2)

        # 完整日志:批次、本批行数、累计、分项耗时
        print(f"📦 第 {i+1} 批 | 本批:{batch_rows} | 累计写入:{write_total} | 清洗读取:{cost_read_clean}s | 写入:{cost_write}s | 总:{cost_total}s")

    after_pg_count = get_pg_table_count(engine, TABLE_NAME)
    actual_add = after_pg_count - before_pg_count

    print("\n" + "-"*60)
    print(f"📄 CSV 有效数据行数: {csv_effective_rows}")
    print(f"📝 代码累计写入行数: {write_total}")
    print(f"🗄️  导入前PG表行数: {before_pg_count}")
    print(f"🗄️  导入后PG表行数: {after_pg_count}")
    print(f"📈 PG表实际新增行数: {actual_add}")
    print("-"*60)

    if csv_effective_rows == write_total and write_total == actual_add:
        print("✅ 数据量校验通过:行数完全一致,无重复/丢失")
    else:
        print("❌ 数据量异常!存在重复插入或数据丢失")
        print(f"   差异行数 = {csv_effective_rows - actual_add}")

    total_end = time.time()
    print(f"\n⏱️  程序总用时: {round(total_end - total_start, 2)} s")

if __name__ == "__main__":
    main()