javascript - Design pattern for bidirectional stream translator -


i design networking protocol implementation object entirely socket agnostic, , act purely bidirectional translator. protocol object should fed objects or commands "control" side , emit bytes "network" side, , accept bytes "network" side translated objects/responses , emitted "control" side.

i can't choose elegant design pattern in node.js. want stream compatible, , ended approach far:

socket = getsocketsomehow(); proto = new protocol();  socket.pipe(proto.aux); proto.aux.pipe(socket);  proto.write({ foo: 'this', bar: ['is', 'command'] }); proto.once('data', function(response) {     console.log('this response: ' + response.quux); }); 

proto collection of 2 cross-connected duplex streams, being stream.duplex, aux. incoming network data goes proto.aux, parsed , emitted proto objects. incoming objects go proto , composed bytes , emitted proto.aux.

is there better way same?

i had ended following approach. code samples in coffeescript better readability.


bond class implements duplex stream interface, binds 2 unrelated streams, reads , writes proxied separate streams.

'use strict'  { eventemitter } = require 'events'  class bond extends eventemitter     proxyreadablemethod = (method) =>         @::[method] = -> @_bondstate.readable[method] arguments...      proxywritablemethod = (method) =>         @::[method] = -> @_bondstate.writable[method] arguments...      proxyreadablemethod 'read'     proxyreadablemethod 'setencoding'     proxyreadablemethod 'resume'     proxyreadablemethod 'pause'     proxyreadablemethod 'pipe'     proxyreadablemethod 'unpipe'     proxyreadablemethod 'unshift'     proxyreadablemethod 'wrap'      proxywritablemethod 'write'     proxywritablemethod 'end'      constructor: (readable, writable) ->         super          @_bondstate = {}         @_bondstate.readable = readable         @_bondstate.writable = writable          proxyevent = (obj, event) =>             obj.on event, => @emit event, arguments...          proxyevent readable, 'readable'         proxyevent readable, 'data'         proxyevent readable, 'end'         proxyevent readable, 'close'         # proxyevent readable, 'error'          proxyevent writable, 'drain'         proxyevent writable, 'finish'         proxyevent writable, 'pipe'         proxyevent writable, 'unpipe'         # proxyevent writable, 'error'  module.exports = bond 

protocol aggregates 2 internal transform streams — parser , composer. parser takes data coming aux side, , transforms coming out ctl side, while composer reverse. aux , ctl both bonds parser , composer, in different directions — aux deals "composed" data coming in , out, , ctl side emits , accepts "parsed" data. design decision expose ctl through protocol itself, , aux visible instance variable.

protocol exposes:

  • _parse, _compose _transform-like methods
  • _parseend, _composeend _flush-like methods
  • parsed, composed push-like methods
  • unparse, uncompose unshift-like methods
'use strict'  bond = require './bond' backlogtransform = require './backlog-transform'  class protocol extends bond     constructor: (options) ->         @_protocolstate = {}         @_protocolstate.options = options         parser = @_protocolstate.parser = new parsertransform @         composer = @_protocolstate.composer = new composertransform @          parser.__name = 'parser'         composer.__name = 'composer'          proxyevent = (source, event) =>             source.on event, =>                 @emit event, arguments...          proxyparserevent = (event) =>             proxyevent @_protocolstate.parser, event          proxycomposerevent = (event) =>             proxyevent @_protocolstate.composer, event          proxyparserevent 'error'         proxycomposerevent 'error'          super @_protocolstate.parser, @_protocolstate.composer         @aux = @_protocolstate.aux = new bond @_protocolstate.composer, @_protocolstate.parser         # @_protocolstate.main = @main = new bond @_protocolstate.parser, @_protocolstate.composer      parsed: (chunk, encoding) ->         @_protocolstate.parser.push chunk, encoding      composed: (chunk, encoding) ->         @_protocolstate.composer.push chunk, encoding      unparse: (chunk, encoding) ->         @_protocolstate.parser.unshift chunk, encoding      uncompose: (chunk, encoding) ->         @_protocolstate.composer.unshift chunk, encoding      #     _parse: (chunk, encoding, callback) ->         throw new typeerror 'not implemented'      _compose: (chunk, encoding, callback) ->         throw new typeerror 'not implemented'      _parseend: (callback) ->         callback()      _composeend: (callback) ->         callback()  class parsertransform extends backlogtransform     constructor: (@protocol) ->         options = @protocol._protocolstate.options         super options, options.auxobjectmode, options.mainobjectmode      __transform: (chunk, encoding, callback) ->         @protocol._parse chunk, encoding, callback      __flush: (callback) ->         @protocol._parseend callback  class composertransform extends backlogtransform     constructor: (@protocol) ->         options = @protocol._protocolstate.options         super options, options.mainobjectmode, options.auxobjectmode      __transform: (chunk, encoding, callback) ->         @protocol._compose chunk, encoding, callback      __flush: (callback) ->         @protocol._composeend callback  module.exports = protocol 

backlogtransform utility class, extending transform stream ability unshift untransformed chunk queue calling unshift method somewhere during _transform, unshifted data appear on next _transform, prepended new chunk. implementation not ideal be, unfortunately...

'use strict'  async = require 'async' stream = require 'stream'  class backlogtransform extends stream.transform     constructor: (options, writableobjectmode, readableobjectmode) ->         options ?= {}          super options         @_writablestate.objectmode = writableobjectmode ? options.writableobjectmode         @_readablestate.objectmode = readableobjectmode ? options.readableobjectmode         @_backlogtransformstate = {}         @_backlogtransformstate.backlog = []      unshift: (chunk, encoding = null) ->         if @_writablestate.decodestrings             chunk = new buffer chunk, encoding ? @_writablestate.defaultencoding          @_backlogtransformstate.backlog.unshift { chunk, encoding }      _flushbacklog: (callback) ->         backlog = @_backlogtransformstate.backlog          if backlog.length             if @_writablestate.objectmode                 async.forever(                     (next) =>                         return next {} if not backlog.length                          { chunk, encoding } = backlog.shift()                         @__transform chunk, encoding, (err) ->                             return next { err } if err?                              next null                      ({ err }) ->                         return callback err if err?                          callback()                 )             else                 chunks = (chunk { chunk, encoding } in backlog)                  if @_writablestate.decodestrings                     encoding = 'buffer'                     chunk = buffer.concat chunks                 else                     encoding = backlog[0].encoding                     item in backlog[1..]                         if encoding != item.encoding                             encoding = null                             break                      chunk = chunks.join ''                  @_backlogtransformstate.backlog = []                 @__transform chunk, encoding, callback         else             callback()      _transform: (chunk, encoding, callback) ->         backlog = @_backlogtransformstate.backlog          if backlog.length             backlog.push { chunk, encoding }              @_flushbacklog callback         else             @__transform chunk, encoding, callback      _flush: (callback) ->         @_flushbacklog =>             @__flush callback      __transform: (chunk, encoding, callback) ->         throw new typeerror 'not implemented'      __flush: (callback) ->         callback()  module.exports = backlogtransform 

Comments

Popular posts from this blog

javascript - RequestAnimationFrame not working when exiting fullscreen switching space on Safari -

Python ctypes access violation with const pointer arguments -