python - How to stream results from Multiprocessing.Pool to csv? -
i have python process (2.7) takes key, bunch of calculations , returns list of results. here simplified version.
i using multiprocessing create threads can processed faster. however, production data has several million rows , each loop takes progressively longer complete. last time ran each loop took on 6 minutes complete while @ start takes second or less. think because threads adding results resultset , continues grow until contains records.
is possible use multiprocessing stream results of each thread (a list) csv or batch resultset writes csv after set number of rows?
any other suggestions speeding or optimizing approach appreciated.
import numpy np import pandas pd import csv import os import multiprocessing multiprocessing import pool global keys keys = [1,2,3,4,5,6,7,8,9,10,11,12] def key_loop(key): test_df = pd.dataframe(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix[0].tolist() return test_list if __name__ == "__main__": try: pool = pool(processes=8) resultset = pool.imap(key_loop,(key key in keys) ) loaddata = [] sublist in resultset: loaddata.append(sublist) open("c:\\users\\mp_streaming_test.csv", 'w') file: writer = csv.writer(file) listitem in loaddata: writer.writerow(listitem) file.close print "finished load" except: print 'there problem multithreading key pool' raise
here answer consolidating suggestions eevee , made
import numpy np import pandas pd import csv multiprocessing import pool keys = [1,2,3,4,5,6,7,8,9,10,11,12] def key_loop(key): test_df = pd.dataframe(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix[0].tolist() return test_list if __name__ == "__main__": try: pool = pool(processes=8) resultset = pool.imap(key_loop, keys, chunksize=200) open("c:\\users\\mp_streaming_test.csv", 'w') file: writer = csv.writer(file) listitem in resultset: writer.writerow(listitem) print "finished load" except: print 'there problem multithreading key pool' raise again, changes here
- iterate on
resultsetdirectly, rather needlessly copying list first. - feed
keyslist directlypool.imapinstead of creating generator comprehension out of it. - providing larger
chunksizeimapdefault of 1. largerchunksizereduces cost of inter-process communication required pass values insidekeyssub-processes in pool, can give big performance boosts whenkeyslarge (as in case). should experiment different valueschunksize(try considerably larger 200, 5000, etc.) , see how affects performance. i'm making wild guess 200, though should better 1.
Comments
Post a Comment