c# - Producer/Consumer pattern with a batched producer -
i'm attempting implement simple producer/consumer style application multiple producers , 1 consumer.
research has led me onto blockingcollection<t> useful , allowed me implement long-running consumer task below:
var c1 = task.factory.startnew(() => {     var buffer = new list<int>(batch_buffer_size);      foreach (var value in blockingcollection.getconsumingenumerable())     {         buffer.add(value);         if (buffer.count == batch_buffer_size)         {             processitems(buffer);             buffer.clear();         }     } });   the processitems function submits buffer database , work in batches. solution sub optimal however. in baron production periods time until buffer filled meaning database out of date.
a more ideal solution either run task on 30 second timer or short circuit foreach timeout.
i ran timer idea , came this:
synctimer = new timer(new timercallback(timerelapsed), blockingcollection, 5000, 5000);  private static void timerelapsed(object state) {     var buffer = new list<int>();     var collection = ((blockingcollection<int>)state).getconsumingenumerable();      foreach (var value in collection)     {         buffer.add(value);     }      processitems(buffer);     buffer.clear(); }   this has clear problem foreach blocked until end, defeating purpose of timer.
can offer direction take? need snapshot blockingcollection periodically , process contents clear it. perhaps blockingcollection wrong type?
instead of using getconsumingenumerable in timer callback, use 1 of these methods instead, adding results list until returns false or you've reached satisfactory batch size.
blockingcollection.trytake method (t) - need, don't want perform further waiting @ all.
blockingcollection.trytake method (t, int32)
blockingcollection.trytake method (t, timespan)
you can extract out extension (untested):
public static ilist<t> flush<t> (this blockingcollection<t> collection, int maxsize = int.maxvalue) {      // argument checking.       t next;      var result = new list<t>();       while(result.count < maxsize && collection.trytake(out next))      {          result.add(next);      }       return result; }      
Comments
Post a Comment