由于某些需求,需要处理n个1000w+数据的csv/excel文件,折腾了一天终于解决。
注意:excel 单个 sheet 超过100w行(1048576)时,需要分sheet 处理。
解决方案:使用python的pandas及openpyxl。首先在安装好python的基础上,安装这两个模块
pip install pandas openpyxl
如果只是需要导入数据库,那这个操作比较简单,直接读取文件然后导入即可。
csv文件,参考 Python多进程导入CSV大文件到数据库
excel文件
from openpyxl import *
import pandas as pd
import time
import sqlite3
# 连接到数据库(如果不存在,则会创建)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''CREATE TABLE IF NOT EXISTS data
(key text PRIMARY KEY, c1 text)''')
ws_save = []
print(f'start>>>>')
t0 = time.time()
print(f'start time >>> {t0}')
read_only = True
t1 = time.time()
# 读取excel数据
# print('耗时%0.3f秒钟'%(t1-t0))
wb2 = load_workbook(r'test.xlsx', read_only=read_only) # c1 1 注意读取使用readonly 否则时间会很久
t2 = time.time()
print('耗时%0.3f秒钟'%(t2-t1))
sheet2 = wb2.active
i = 0
print(f'start load data>>>')
BATCH = 10000
for row in sheet2.iter_rows(values_only=True):
cursor.execute('''INSERT OR REPLACE INTO data (key, c1)
VALUES (?, ?)''', (row[2], row[3]))
i = i + 1
if i % BATCH == 0:
print(f'process total >>> {i}')
conn.commit()
conn.close()
print(f'end load data>>>')
如果还需要做比对,则把需要比较的表先导入数据库(去除非必要的字段),导入后进行比对
csv处理方式
import pandas as pd
import time
import sqlite3
# 连接到数据库(如果不存在,则会创建)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
ws_save = []
print(f'start>>>>')
t0 = time.time()
print(f'start time >>> {t0}')
df1 = pd.read_csv(r'test.csv', sep=',', iterator=True, usecols=[2, 22, 93, 162])
#query
#df1.query('')
t1 = time.time()
print(f'read file cose {int(t1-t0)} seconds')
t2 = time.time()
i = 0
print(f'start load data>>>')
BATCH = 10000
print(f'end load data>>>')
print(f'process start>>> ')
i = 0
loop = True
while loop:
try:
print(f'process total >>> {i * BATCH}')
i = i + 1
# 返回N行数据块 DataFrame类型
data = df1.get_chunk(BATCH)
data_list = data.values.tolist()
for row in data_list:
row = list(row)
# 查询数据
cursor.execute('SELECT c1 FROM data WHERE key = ?', (row[0],))
db_row = cursor.fetchone()
if not db_row:
continue
#db_row = ['']
row.append(db_row[0])
ws_save.append(row)
except StopIteration:
# 错误数据写入文件
loop = False
conn.close()
print(f'process end>>> ')
t3 = time.time()
print(f'load data cose {int(t3-t2)} seconds')
print(f'write data to csv')
# 保存修改后的第一个excel数据到新的文件
if ws_save:
df = pd.DataFrame(ws_save)
df.to_csv("result.csv", index=False, header=['c1', 'c2', '...'])
t4 = time.time()
print(f'write data to csv cose {int(t4-t3)} seconds')
print(f'end>>>>')
excel处理方式类似
from openpyxl import *
import pandas as pd
import time
import sqlite3
# 连接到数据库(如果不存在,则会创建)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
ws_save = []
print(f'start>>>>')
t0 = time.time()
print(f'start time >>> {t0}')
read_only = True
# 读取excel1和excel2数据
wb = load_workbook(r'test.xlsx', read_only=read_only) #C_11 1 i注意读取使用readonly 否则时间会很久
t1 = time.time()
print('耗时%0.3f秒钟'%(t1-t0))
#wb2 = load_workbook(r'1.xlsx', read_only=read_only) # c1 1
t2 = time.time()
print('耗时%0.3f秒钟'%(t2-t1))
sheet = wb.active
#sheet2 = wb2.active
i = 0
print(f'start load data>>>')
BATCH = 10000
print(f'end load data>>>')
print(f'process start>>> ')
i = 0
for row in sheet.iter_rows(values_only=True):
i = i + 1
if i % 10000 == 0:
print(f'process total >>> {i}')
row = list(row)
# 查询数据
cursor.execute('SELECT c1 FROM data WHERE key = ?', (row[2],))
value = cursor.fetchone()
if not value:
continue
row.append(value[0])
ws_save.append(row) # 拿指定字段
print(f'process end>>> ')
t3 = time.time()
print('耗时%0.3f秒钟'%(t3-t2))
print(f'write data to csv')
# 保存修改后的第一个excel数据到新的文件
if ws_save:
df = pd.DataFrame(ws_save)
df.to_csv("result.csv", index=False, header=['c1', 'c2', 'c3'])
t4 = time.time()
print('耗时%0.3f秒钟'%(t4-t3))
print(f'end>>>>')
结果(不使用多线程的情况下)
1500w数据导入数据库,平均花费 2000 秒
500w+与1500w进行比对,去除非必要字段后提取相关信息花费30秒