import pandas as pd
from sqlalchemy import create_engine
import time
import re
# ===================== 配置区 =====================
CSV_FILE = r"C:\Users\Administrator\Desktop\商品资料.csv"
TABLE_NAME = "商品资料"
BATCH_SIZE = 10000 # 读取分块
# PG 连接信息
DB_USER = "postgres"
DB_PWD = "123456"
DB_HOST = "127.0.0.1"
DB_PORT = 5432
DB_NAME = "postgres"
# ===================================================
def clean_text(text):
if pd.isna(text):
return None
text = str(text)
# 还原 _x002B_ 这类转义字符
pattern = r'_x([0-9A-Fa-f]{4})_'
text = re.sub(pattern, lambda m: chr(int(m.group(1), 16)), text)
# 清除字段内换行、回车,避免解析异常
text = text.replace("\r", "").replace("\n", " ")
return text
def get_csv_effective_rows(csv_path, chunksize=10000, encoding="utf-8"):
total = 0
reader = pd.read_csv(
csv_path,
header=0,
dtype=str,
encoding=encoding,
chunksize=chunksize
)
for chunk in reader:
total += len(chunk)
return total
def get_pg_table_count(engine, table):
sql = f'SELECT COUNT(*) AS cnt FROM "{table}"'
df = pd.read_sql(sql, engine)
return int(df["cnt"].iloc[0])
def main():
total_start = time.time()
engine = create_engine(f"postgresql://{DB_USER}:{DB_PWD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
print("📊 正在统计CSV有效数据行数...")
csv_effective_rows = get_csv_effective_rows(CSV_FILE, chunksize=BATCH_SIZE, encoding="utf-8")
print(f"📄 CSV 有效数据行数(不含表头、空行): {csv_effective_rows}")
before_pg_count = get_pg_table_count(engine, TABLE_NAME)
print(f"🗄️ 导入前 PG 表总行数: {before_pg_count}")
reader = pd.read_csv(
CSV_FILE,
header=0,
dtype=str,
encoding="utf-8",
chunksize=BATCH_SIZE
)
write_total = 0
for i, chunk in enumerate(reader):
# 定义批次起始时间
chunk_start = time.time()
# 清洗数据
t1 = time.time()
chunk = chunk.map(clean_text)
t2 = time.time()
# 写入数据库
chunk.to_sql(
name=TABLE_NAME,
con=engine,
if_exists="append",
index=False,
chunksize=2000
)
t3 = time.time()
chunk_end = time.time()
batch_rows = len(chunk)
write_total += batch_rows
# 分项耗时
cost_read_clean = round(t2 - t1, 2)
cost_write = round(t3 - t2, 2)
cost_total = round(chunk_end - chunk_start, 2)
# 完整日志:批次、本批行数、累计、分项耗时
print(f"📦 第 {i+1} 批 | 本批:{batch_rows} | 累计写入:{write_total} | 清洗读取:{cost_read_clean}s | 写入:{cost_write}s | 总:{cost_total}s")
after_pg_count = get_pg_table_count(engine, TABLE_NAME)
actual_add = after_pg_count - before_pg_count
print("\n" + "-"*60)
print(f"📄 CSV 有效数据行数: {csv_effective_rows}")
print(f"📝 代码累计写入行数: {write_total}")
print(f"🗄️ 导入前PG表行数: {before_pg_count}")
print(f"🗄️ 导入后PG表行数: {after_pg_count}")
print(f"📈 PG表实际新增行数: {actual_add}")
print("-"*60)
if csv_effective_rows == write_total and write_total == actual_add:
print("✅ 数据量校验通过:行数完全一致,无重复/丢失")
else:
print("❌ 数据量异常!存在重复插入或数据丢失")
print(f" 差异行数 = {csv_effective_rows - actual_add}")
total_end = time.time()
print(f"\n⏱️ 程序总用时: {round(total_end - total_start, 2)} s")
if __name__ == "__main__":
main()
评论