var protocol = require('dnode-protocol'); var Stream = require('stream'); var json = typeof JSON === 'object' ? JSON : require('jsonify'); module.exports = dnode; dnode.prototype = {}; (function () { // browsers etc for (var key in Stream.prototype) { dnode.prototype[key] = Stream.prototype[key]; } })(); function dnode (cons, opts) { Stream.call(this); var self = this; self.opts = opts || {}; self.cons = typeof cons === 'function' ? cons : function () { return cons || {} } ; self.readable = true; self.writable = true; process.nextTick(function () { if (self._ended) return; self.proto = self._createProto(); self.proto.start(); if (!self._handleQueue) return; for (var i = 0; i < self._handleQueue.length; i++) { self.handle(self._handleQueue[i]); } }); } dnode.prototype._createProto = function () { var self = this; var proto = protocol(function (remote) { if (self._ended) return; var ref = self.cons.call(this, remote, self); if (typeof ref !== 'object') ref = this; self.emit('local', ref, self); return ref; }, self.opts.proto); proto.on('remote', function (remote) { self.emit('remote', remote, self); self.emit('ready'); // backwards compatability, deprecated }); proto.on('request', function (req) { if (!self.readable) return; if (self.opts.emit === 'object') { self.emit('data', req); } else self.emit('data', json.stringify(req) + '\n'); }); proto.on('fail', function (err) { // errors that the remote end was responsible for self.emit('fail', err); }); proto.on('error', function (err) { // errors that the local code was responsible for self.emit('error', err); }); return proto; }; dnode.prototype.write = function (buf) { if (this._ended) return; var self = this; var row; if (buf && typeof buf === 'object' && buf.constructor && buf.constructor.name === 'Buffer' && buf.length && typeof buf.slice === 'function') { // treat like a buffer if (!self._bufs) self._bufs = []; // treat like a buffer for (var i = 0, j = 0; i < buf.length; i++) { if (buf[i] === 0x0a) { self._bufs.push(buf.slice(j, i)); var line = ''; for (var k = 0; k < self._bufs.length; k++) { line += String(self._bufs[k]); } try { row = json.parse(line) } catch (err) { return self.end() } j = i + 1; self.handle(row); self._bufs = []; } } if (j < buf.length) self._bufs.push(buf.slice(j, buf.length)); } else if (buf && typeof buf === 'object') { // .isBuffer() without the Buffer // Use self to pipe JSONStream.parse() streams. self.handle(buf); } else { if (typeof buf !== 'string') buf = String(buf); if (!self._line) self._line = ''; for (var i = 0; i < buf.length; i++) { if (buf.charCodeAt(i) === 0x0a) { try { row = json.parse(self._line) } catch (err) { return self.end() } self._line = ''; self.handle(row); } else self._line += buf.charAt(i) } } }; dnode.prototype.handle = function (row) { if (!this.proto) { if (!this._handleQueue) this._handleQueue = []; this._handleQueue.push(row); } else this.proto.handle(row); }; dnode.prototype.end = function () { if (this._ended) return; this._ended = true; this.writable = false; this.readable = false; this.emit('end'); }; dnode.prototype.destroy = function () { this.end(); };