大数据处理解决方案

由于某些需求,需要处理n个1000w+数据的csv/excel文件,折腾了一天终于解决。

注意:excel 单个 sheet 超过100w行(1048576)时,需要分sheet 处理。

解决方案:使用python的pandas及openpyxl。首先在安装好python的基础上,安装这两个模块

pip install pandas openpyxl

如果只是需要导入数据库,那这个操作比较简单,直接读取文件然后导入即可。

csv文件,参考 Python多进程导入CSV大文件到数据库

excel文件

from openpyxl import *
import pandas as pd
import time
import sqlite3
# 连接到数据库(如果不存在,则会创建)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''CREATE TABLE IF NOT EXISTS data
               (key text PRIMARY KEY, c1 text)''')



ws_save = []
print(f'start>>>>')
t0 = time.time()
print(f'start time >>> {t0}')

read_only = True

t1 = time.time()
# 读取excel数据
# print('耗时%0.3f秒钟'%(t1-t0))
wb2 = load_workbook(r'test.xlsx', read_only=read_only) # c1 1 注意读取使用readonly 否则时间会很久
t2 = time.time()
print('耗时%0.3f秒钟'%(t2-t1))
sheet2 = wb2.active

i = 0

print(f'start load data>>>')

BATCH = 10000


for row in sheet2.iter_rows(values_only=True):
    cursor.execute('''INSERT OR REPLACE INTO data (key, c1) 
          VALUES (?, ?)''', (row[2], row[3]))

    i = i + 1
    if i % BATCH == 0:
        print(f'process total >>> {i}')

conn.commit()
conn.close()
print(f'end load data>>>')

如果还需要做比对,则把需要比较的表先导入数据库(去除非必要的字段),导入后进行比对

csv处理方式

import pandas as pd
import time
import sqlite3
# 连接到数据库(如果不存在,则会创建)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()

ws_save = []
print(f'start>>>>')
t0 = time.time()
print(f'start time >>> {t0}')
df1 = pd.read_csv(r'test.csv', sep=',', iterator=True, usecols=[2, 22, 93, 162])
#query
#df1.query('')
t1 = time.time()
print(f'read file cose {int(t1-t0)} seconds')
t2 = time.time()

i = 0
print(f'start load data>>>')
BATCH = 10000

print(f'end load data>>>')

print(f'process start>>> ')
i = 0

loop = True
while loop:
    try:
        print(f'process total >>> {i * BATCH}')
        i = i + 1
        # 返回N行数据块  DataFrame类型
        data = df1.get_chunk(BATCH)  
        data_list = data.values.tolist()

        for row in data_list:
            row = list(row)
            # 查询数据
            cursor.execute('SELECT c1 FROM data WHERE key = ?', (row[0],))
            db_row = cursor.fetchone()
            if not db_row:
                continue
                #db_row = ['']
            row.append(db_row[0])
            ws_save.append(row)

    except StopIteration:
        # 错误数据写入文件
        loop = False

conn.close()
print(f'process end>>> ')

t3 = time.time()
print(f'load data cose {int(t3-t2)} seconds') 
print(f'write data to csv')
# 保存修改后的第一个excel数据到新的文件
if ws_save:
    df = pd.DataFrame(ws_save)
    df.to_csv("result.csv", index=False, header=['c1', 'c2', '...'])

t4 = time.time()
print(f'write data to csv cose {int(t4-t3)} seconds')
print(f'end>>>>')

excel处理方式类似

from openpyxl import *
import pandas as pd
import time
import sqlite3
# 连接到数据库(如果不存在,则会创建)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()

ws_save = []
print(f'start>>>>')
t0 = time.time()
print(f'start time >>> {t0}')

read_only = True

# 读取excel1和excel2数据
wb = load_workbook(r'test.xlsx', read_only=read_only) #C_11 1 i注意读取使用readonly 否则时间会很久
t1 = time.time()
print('耗时%0.3f秒钟'%(t1-t0))
#wb2 = load_workbook(r'1.xlsx', read_only=read_only) # c1 1
t2 = time.time()
print('耗时%0.3f秒钟'%(t2-t1))
sheet = wb.active
#sheet2 = wb2.active

i = 0

print(f'start load data>>>')

BATCH = 10000


print(f'end load data>>>')

print(f'process start>>> ')
i = 0

for row in sheet.iter_rows(values_only=True):
    i = i + 1
    if i % 10000 == 0:
        print(f'process total >>> {i}')
    row = list(row)
    # 查询数据
    cursor.execute('SELECT c1 FROM data WHERE key = ?', (row[2],))
    value = cursor.fetchone()
    if not value:
        continue
    row.append(value[0])
    ws_save.append(row) # 拿指定字段

print(f'process end>>> ')

t3 = time.time()
print('耗时%0.3f秒钟'%(t3-t2)) 
print(f'write data to csv')
# 保存修改后的第一个excel数据到新的文件
if ws_save:
    df = pd.DataFrame(ws_save)
    df.to_csv("result.csv", index=False, header=['c1', 'c2', 'c3'])

t4 = time.time()
print('耗时%0.3f秒钟'%(t4-t3)) 
print(f'end>>>>')

结果(不使用多线程的情况下)

1500w数据导入数据库,平均花费 2000 秒

500w+与1500w进行比对,去除非必要字段后提取相关信息花费30秒

This entry was posted in 应用. Bookmark the permalink.