javascript - Design pattern for bidirectional stream translator -
i design networking protocol implementation object entirely socket agnostic, , act purely bidirectional translator. protocol object should fed objects or commands "control" side , emit bytes "network" side, , accept bytes "network" side translated objects/responses , emitted "control" side.
i can't choose elegant design pattern in node.js. want stream
compatible, , ended approach far:
socket = getsocketsomehow(); proto = new protocol(); socket.pipe(proto.aux); proto.aux.pipe(socket); proto.write({ foo: 'this', bar: ['is', 'command'] }); proto.once('data', function(response) { console.log('this response: ' + response.quux); });
proto
collection of 2 cross-connected duplex streams, being stream.duplex
, aux
. incoming network data goes proto.aux
, parsed , emitted proto
objects. incoming objects go proto
, composed bytes , emitted proto.aux
.
is there better way same?
i had ended following approach. code samples in coffeescript better readability.
bond
class implements duplex
stream interface, binds 2 unrelated streams, reads , writes proxied separate streams.
'use strict' { eventemitter } = require 'events' class bond extends eventemitter proxyreadablemethod = (method) => @::[method] = -> @_bondstate.readable[method] arguments... proxywritablemethod = (method) => @::[method] = -> @_bondstate.writable[method] arguments... proxyreadablemethod 'read' proxyreadablemethod 'setencoding' proxyreadablemethod 'resume' proxyreadablemethod 'pause' proxyreadablemethod 'pipe' proxyreadablemethod 'unpipe' proxyreadablemethod 'unshift' proxyreadablemethod 'wrap' proxywritablemethod 'write' proxywritablemethod 'end' constructor: (readable, writable) -> super @_bondstate = {} @_bondstate.readable = readable @_bondstate.writable = writable proxyevent = (obj, event) => obj.on event, => @emit event, arguments... proxyevent readable, 'readable' proxyevent readable, 'data' proxyevent readable, 'end' proxyevent readable, 'close' # proxyevent readable, 'error' proxyevent writable, 'drain' proxyevent writable, 'finish' proxyevent writable, 'pipe' proxyevent writable, 'unpipe' # proxyevent writable, 'error' module.exports = bond
protocol
aggregates 2 internal transform
streams — parser
, composer
. parser
takes data coming aux
side, , transforms coming out ctl
side, while composer
reverse. aux
, ctl
both bonds parser , composer, in different directions — aux
deals "composed" data coming in , out, , ctl
side emits , accepts "parsed" data. design decision expose ctl
through protocol
itself, , aux
visible instance variable.
protocol
exposes:
_parse
,_compose
_transform
-like methods_parseend
,_composeend
_flush
-like methodsparsed
,composed
push
-like methodsunparse
,uncompose
unshift
-like methods
'use strict' bond = require './bond' backlogtransform = require './backlog-transform' class protocol extends bond constructor: (options) -> @_protocolstate = {} @_protocolstate.options = options parser = @_protocolstate.parser = new parsertransform @ composer = @_protocolstate.composer = new composertransform @ parser.__name = 'parser' composer.__name = 'composer' proxyevent = (source, event) => source.on event, => @emit event, arguments... proxyparserevent = (event) => proxyevent @_protocolstate.parser, event proxycomposerevent = (event) => proxyevent @_protocolstate.composer, event proxyparserevent 'error' proxycomposerevent 'error' super @_protocolstate.parser, @_protocolstate.composer @aux = @_protocolstate.aux = new bond @_protocolstate.composer, @_protocolstate.parser # @_protocolstate.main = @main = new bond @_protocolstate.parser, @_protocolstate.composer parsed: (chunk, encoding) -> @_protocolstate.parser.push chunk, encoding composed: (chunk, encoding) -> @_protocolstate.composer.push chunk, encoding unparse: (chunk, encoding) -> @_protocolstate.parser.unshift chunk, encoding uncompose: (chunk, encoding) -> @_protocolstate.composer.unshift chunk, encoding # _parse: (chunk, encoding, callback) -> throw new typeerror 'not implemented' _compose: (chunk, encoding, callback) -> throw new typeerror 'not implemented' _parseend: (callback) -> callback() _composeend: (callback) -> callback() class parsertransform extends backlogtransform constructor: (@protocol) -> options = @protocol._protocolstate.options super options, options.auxobjectmode, options.mainobjectmode __transform: (chunk, encoding, callback) -> @protocol._parse chunk, encoding, callback __flush: (callback) -> @protocol._parseend callback class composertransform extends backlogtransform constructor: (@protocol) -> options = @protocol._protocolstate.options super options, options.mainobjectmode, options.auxobjectmode __transform: (chunk, encoding, callback) -> @protocol._compose chunk, encoding, callback __flush: (callback) -> @protocol._composeend callback module.exports = protocol
backlogtransform
utility class, extending transform
stream ability unshift untransformed chunk queue calling unshift
method somewhere during _transform
, unshifted data appear on next _transform
, prepended new chunk. implementation not ideal be, unfortunately...
'use strict' async = require 'async' stream = require 'stream' class backlogtransform extends stream.transform constructor: (options, writableobjectmode, readableobjectmode) -> options ?= {} super options @_writablestate.objectmode = writableobjectmode ? options.writableobjectmode @_readablestate.objectmode = readableobjectmode ? options.readableobjectmode @_backlogtransformstate = {} @_backlogtransformstate.backlog = [] unshift: (chunk, encoding = null) -> if @_writablestate.decodestrings chunk = new buffer chunk, encoding ? @_writablestate.defaultencoding @_backlogtransformstate.backlog.unshift { chunk, encoding } _flushbacklog: (callback) -> backlog = @_backlogtransformstate.backlog if backlog.length if @_writablestate.objectmode async.forever( (next) => return next {} if not backlog.length { chunk, encoding } = backlog.shift() @__transform chunk, encoding, (err) -> return next { err } if err? next null ({ err }) -> return callback err if err? callback() ) else chunks = (chunk { chunk, encoding } in backlog) if @_writablestate.decodestrings encoding = 'buffer' chunk = buffer.concat chunks else encoding = backlog[0].encoding item in backlog[1..] if encoding != item.encoding encoding = null break chunk = chunks.join '' @_backlogtransformstate.backlog = [] @__transform chunk, encoding, callback else callback() _transform: (chunk, encoding, callback) -> backlog = @_backlogtransformstate.backlog if backlog.length backlog.push { chunk, encoding } @_flushbacklog callback else @__transform chunk, encoding, callback _flush: (callback) -> @_flushbacklog => @__flush callback __transform: (chunk, encoding, callback) -> throw new typeerror 'not implemented' __flush: (callback) -> callback() module.exports = backlogtransform
Comments
Post a Comment