c# - How do I await a response from an RX Subject without introducing a race condition? -
i have service allows caller send commands , receive responses asynchronously. in real application, these actions disconnected (some action send command, , responses process independently).
however, in tests, need able send command , wait (first) response before continuing test.
the responses published using rx, , first attempt @ code this:
service.sendcommand("blah"); await service.responses.firstasync();
the problem this, firstasync
work if response arrives after await has been hit. if service processes quickly, test hang on await
.
my next attempt fix call firstasync()
prior sending command, have result if arrived before awaiting:
var firstresponse = service.responses.firstasync(); service.sendcommand("blah"); await firstresponse;
however, still fails in same way. seems it's when await
hit (getawaiter
called) starts listening; exact same race condition exists.
if change subject replaysubject
buffer (or timer) can "workaround" this; doesn't make sense in production classes; testing.
what's "correct" way able in rx? how can set receive first event on stream in way won't introduce race condition?
here's small test illustrates issue in "single-threaded" way. test hang indefintely:
[fact] public async task mytest() { var x = new subject<bool>(); // subscribe first bool (but don't await yet) var firstbool = x.firstasync(); // send first bool x.onnext(true); // await task receives first bool var b = await firstbool; // <-- hangs here; presumably because firstbool didn't start monitoring until getawaiter called? assert.equal(true, b); }
i tried calling replay() in test thinking buffer results; doesn't change anything:
[fact] public async task mytest() { var x = new subject<bool>(); var firstbool = x.replay(); // send first bool x.onnext(true); // await task receives first bool var b = await firstbool.firstasync(); // <-- still hangs here assert.equal(true, b); }
you can asyncsubject
[fact] public async task mytest() { var x = new subject<bool>(); var firstbool = x.firstasync().publishlast(); // publishlast wraps asyncsubject firstbool.connect(); // send first bool x.onnext(true); // await task receives first bool var b = await firstbool; assert.equal(true, b); }
asyncsubject
caches last received value before oncomplete
called , replays it.
Comments
Post a Comment