python - How to stream results from Multiprocessing.Pool to csv? -
i have python process (2.7) takes key, bunch of calculations , returns list of results. here simplified version.
i using multiprocessing create threads can processed faster. however, production data has several million rows , each loop takes progressively longer complete. last time ran each loop took on 6 minutes complete while @ start takes second or less. think because threads adding results resultset , continues grow until contains records.
is possible use multiprocessing stream results of each thread (a list) csv or batch resultset writes csv after set number of rows?
any other suggestions speeding or optimizing approach appreciated.
import numpy np import pandas pd import csv import os import multiprocessing multiprocessing import pool global keys keys = [1,2,3,4,5,6,7,8,9,10,11,12] def key_loop(key): test_df = pd.dataframe(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix[0].tolist() return test_list if __name__ == "__main__": try: pool = pool(processes=8) resultset = pool.imap(key_loop,(key key in keys) ) loaddata = [] sublist in resultset: loaddata.append(sublist) open("c:\\users\\mp_streaming_test.csv", 'w') file: writer = csv.writer(file) listitem in loaddata: writer.writerow(listitem) file.close print "finished load" except: print 'there problem multithreading key pool' raise
here answer consolidating suggestions eevee , made
import numpy np import pandas pd import csv multiprocessing import pool keys = [1,2,3,4,5,6,7,8,9,10,11,12] def key_loop(key): test_df = pd.dataframe(np.random.randn(1,4), columns=['a','b','c','d']) test_list = test_df.ix[0].tolist() return test_list if __name__ == "__main__": try: pool = pool(processes=8) resultset = pool.imap(key_loop, keys, chunksize=200) open("c:\\users\\mp_streaming_test.csv", 'w') file: writer = csv.writer(file) listitem in resultset: writer.writerow(listitem) print "finished load" except: print 'there problem multithreading key pool' raise
again, changes here
- iterate on
resultset
directly, rather needlessly copying list first. - feed
keys
list directlypool.imap
instead of creating generator comprehension out of it. - providing larger
chunksize
imap
default of 1. largerchunksize
reduces cost of inter-process communication required pass values insidekeys
sub-processes in pool, can give big performance boosts whenkeys
large (as in case). should experiment different valueschunksize
(try considerably larger 200, 5000, etc.) , see how affects performance. i'm making wild guess 200, though should better 1.
Comments
Post a Comment