微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

python并行读取csv文件并连接数据框

如何解决python并行读取csv文件并连接数据框

我有一个应用程序,每个应用程序会读取50个大型csvs文件,每个文件约400MB。现在,我正在阅读这些内容以创建一个数据帧,并最终将所有这些链接一个单一的数据帧。我想同时进行以加快整个过程。所以我的下面代码看起来像这样:

import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from time import time 

Class dataProvider:
    def __init__(self):
        self.df=pd.DataFrame()
        self.pool = ThreadPool(processes=40)
        self.df_abc=pd.DataFrame()
        self.df_xyz=pd.DataFrame()
        self.start=time()

     def get_csv_data(self,filename):
        return pd.read_csv(filename)

     def get_all_csv_data(self,filename):
         self.start=time()
         df_1 = self.pool.apply_sync(self.get_csv_data,('1.csv',),callback=concatDf)
         df_2 = self.pool.apply_sync(self.get_csv_data,('2.csv',callback=concatDf)
         total_time=time()-self.start

     def concatDf(self):
         self.df_abc=pd.concat([df_1,df_2])
         self.df_xyz=self.df_abc.iloc[:,1:]
         return self.df_xyz

我看到以下代码问题:

  1. 如果我的apply_sync调用调用了相同的回调,那么我怎么知道当前的回调已被df_1行或df_2以上的哪个调用确切地调用了? 2)我想连接不同apply_sync的输出,如何在concatDf回调函数中做到这一点?
  2. 我如何知道所有apply_sync调用的回调已完成,以便我可以将所有50个csvs返回的串联数据帧返回?
  3. 有没有更好,更有效的方法来做到这一点?

谢谢

解决方法

编辑:仅在有足够的可用RAM时使用此解决方案。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import pandas as pd
from glob import glob 

files = glob("*.csv")

def read_file(file):
    return pd.read_csv(file)

# I would recommend to try out whether ThreadPoolExecutor or 
# ProcessPoolExecutor is faster on your system:
with ThreadPoolExecutor(4) as pool:
    df = pd.concat(pool.map(read_file,files))
print(df)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。