c# - Producer/Consumer pattern with a batched producer -


i'm attempting implement simple producer/consumer style application multiple producers , 1 consumer.

research has led me onto blockingcollection<t> useful , allowed me implement long-running consumer task below:

var c1 = task.factory.startnew(() => {     var buffer = new list<int>(batch_buffer_size);      foreach (var value in blockingcollection.getconsumingenumerable())     {         buffer.add(value);         if (buffer.count == batch_buffer_size)         {             processitems(buffer);             buffer.clear();         }     } }); 

the processitems function submits buffer database , work in batches. solution sub optimal however. in baron production periods time until buffer filled meaning database out of date.

a more ideal solution either run task on 30 second timer or short circuit foreach timeout.

i ran timer idea , came this:

synctimer = new timer(new timercallback(timerelapsed), blockingcollection, 5000, 5000);  private static void timerelapsed(object state) {     var buffer = new list<int>();     var collection = ((blockingcollection<int>)state).getconsumingenumerable();      foreach (var value in collection)     {         buffer.add(value);     }      processitems(buffer);     buffer.clear(); } 

this has clear problem foreach blocked until end, defeating purpose of timer.

can offer direction take? need snapshot blockingcollection periodically , process contents clear it. perhaps blockingcollection wrong type?

instead of using getconsumingenumerable in timer callback, use 1 of these methods instead, adding results list until returns false or you've reached satisfactory batch size.

blockingcollection.trytake method (t) - need, don't want perform further waiting @ all.

blockingcollection.trytake method (t, int32)

blockingcollection.trytake method (t, timespan)

you can extract out extension (untested):

public static ilist<t> flush<t> (this blockingcollection<t> collection, int maxsize = int.maxvalue) {      // argument checking.       t next;      var result = new list<t>();       while(result.count < maxsize && collection.trytake(out next))      {          result.add(next);      }       return result; } 

Comments

Popular posts from this blog

javascript - RequestAnimationFrame not working when exiting fullscreen switching space on Safari -

Python ctypes access violation with const pointer arguments -