python - How to stream results from Multiprocessing.Pool to csv? -


i have python process (2.7) takes key, bunch of calculations , returns list of results. here simplified version.

i using multiprocessing create threads can processed faster. however, production data has several million rows , each loop takes progressively longer complete. last time ran each loop took on 6 minutes complete while @ start takes second or less. think because threads adding results resultset , continues grow until contains records.

is possible use multiprocessing stream results of each thread (a list) csv or batch resultset writes csv after set number of rows?

any other suggestions speeding or optimizing approach appreciated.

import numpy np import pandas pd import csv import os import multiprocessing multiprocessing import pool  global keys keys = [1,2,3,4,5,6,7,8,9,10,11,12]  def key_loop(key):     test_df = pd.dataframe(np.random.randn(1,4), columns=['a','b','c','d'])     test_list = test_df.ix[0].tolist()     return test_list  if __name__ == "__main__":     try:         pool = pool(processes=8)               resultset = pool.imap(key_loop,(key key in keys) )          loaddata = []         sublist in resultset:             loaddata.append(sublist)          open("c:\\users\\mp_streaming_test.csv", 'w') file:             writer = csv.writer(file)             listitem in loaddata:                 writer.writerow(listitem)         file.close          print "finished load"     except:         print 'there problem multithreading key pool'         raise 

here answer consolidating suggestions eevee , made

import numpy np import pandas pd import csv multiprocessing import pool  keys = [1,2,3,4,5,6,7,8,9,10,11,12]  def key_loop(key):     test_df = pd.dataframe(np.random.randn(1,4), columns=['a','b','c','d'])     test_list = test_df.ix[0].tolist()     return test_list  if __name__ == "__main__":     try:         pool = pool(processes=8)               resultset = pool.imap(key_loop, keys, chunksize=200)          open("c:\\users\\mp_streaming_test.csv", 'w') file:             writer = csv.writer(file)             listitem in resultset:                 writer.writerow(listitem)          print "finished load"     except:         print 'there problem multithreading key pool'         raise 

again, changes here

  1. iterate on resultset directly, rather needlessly copying list first.
  2. feed keys list directly pool.imap instead of creating generator comprehension out of it.
  3. providing larger chunksize imap default of 1. larger chunksize reduces cost of inter-process communication required pass values inside keys sub-processes in pool, can give big performance boosts when keys large (as in case). should experiment different values chunksize (try considerably larger 200, 5000, etc.) , see how affects performance. i'm making wild guess 200, though should better 1.

Comments

Popular posts from this blog

javascript - RequestAnimationFrame not working when exiting fullscreen switching space on Safari -

Python ctypes access violation with const pointer arguments -