Giter Club home page Giter Club logo

multiplex's Introduction

multiplex

A binary stream multiplexer. Stream multiple streams of binary data over a single binary stream. Like mux-demux but faster since it only works with binary streams.

NPM

api

var multiplex = require('multiplex')([options], [onStream])

Returns a new multiplexer. You can use this to create sub-streams. All data written to sub-streams will be emitted through this. If you pipe a multiplex instance to another multiplex instance all substream data will be multiplexed and demultiplexed on the other end.

onStream will be called with (stream, id) whenever a new remote sub-stream is created with an id that hasn't already been created with .createStream.

Options include:

  • opts.limit - set the max allowed message size. default is no maximum

Any other options set in options are used as defaults options when creating sub streams.

stream = multiplex.createStream([id], [options])

Creates a new sub-stream with an optional whole string id (default is the stream channel id).

Sub-streams are duplex streams.

Options include:

  • opts.chunked - enables chunked mode on all streams (message framing not guaranteed)
  • opts.halfOpen - make channels support half open mode meaning that they can be readable but not writable and vice versa

stream = multiplex.receiveStream(id, [options])

Explicitly receive an incoming stream.

This is useful if you have a function that accepts an instance of multiplex and you want to receive a substream.

stream = multiplex.createSharedStream(id, [options])

Create a shared stream. If both ends create a shared stream with the same id, writing data on one end will emit the same data on the other end

events

multiplex.on('error', function (err) {})

Emitted when the outer stream encounters invalid data

multiplex.on('stream', function (stream, id) {})

Emitted when a it a new stream arrives.

stream.on('error', function (err) {})

Emitted if the inner stream is destroyed with an error

example

var multiplex = require('multiplex')
var plex1 = multiplex()
var stream1 = plex1.createStream()
var stream2 = plex1.createStream()

var plex2 = multiplex(function onStream(stream, id) {
  stream.on('data', function(c) {
    console.log('data', id, c.toString())
  })
})

plex1.pipe(plex2)

stream1.write(new Buffer('stream one!'))
stream2.write(new Buffer('stream two!'))

contributing

multiplex is an OPEN Open Source Project. This means that:

Individuals making significant and valuable contributions are given commit-access to the project to contribute as they see fit. This project is more like an open wiki than a standard guarded open source project.

See the CONTRIBUTING.md file for more details.

contributors

multiplex is only possible due to the excellent work of the following contributors:

maxogdenGitHub/maxogden
1N50MN14GitHub/1N50MN14
substackGitHub/substack
mafintoshGitHub/mafintosh

multiplex's People

Contributors

1n50mn14 avatar daviddias avatar mafintosh avatar max-mapper avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

multiplex's Issues

Bug: invalid invocation of 'events' module, breaks in node 0.10

Multiplex calls the method Class Method: EventEmitter.listenerCount(emitter, event) on a variable events which is defined in line 3 as:

var events = require('events')

From the multiplex code it looks like events is supposed to be an EventEmitter instance. I think the proper way to user the events package is like this:

var events = require('events').EventEmitter

not

var events = require('events') // how it is now

This can cause a problem in node 0.10, because in this case require('events') does not have all the methods defined on it. You need to access the underlying EventEmitter object. An example of the error is in this build (at the very bottom):
https://travis-ci.org/substack/dataplex/builds/66828146

Potential fix in the pull request which works in node 0.12 and node 0.10.

Passing through errors from underlying streams

Been using multiplex for a while and loving it, but I'm not sure that stream error handling is well dealt with by the package. This hasn't been a problem before, but recently WebRTC data channels have been popping up edge cases that are giving my error handling a really solid run.

With some tinkering I found that modifying line 107 to the following code:

decode.pipe(through.obj(decoder)).on('error', dup.emit.bind(dup, 'error'))

the problem goes away but I'm not sure that is all that is required to ensure that unhandled errors in the local streams are bubbled up to a place in which they can be handled.

Any thoughts on what the right way to handle this is before I go all gangbusters with a PR?

Spec Question?

the readme says

onStream will be called with (stream, id) whenever a new remote sub-stream is created with an id that hasn't already been created with .createStream.

curious about "that hasn't already been created". does this mean that if I run the below on two clients:

conn = getConnection()
mux = multiplex(opts, handler)
stream = mux.createStream("foo")

function handler(stream, id) {
  stream.close() // just close all incoming streams
}

stream for foo will be properly connected between the two?

(i'm implementing this in Go for interop happiness)

Emit end event on client stream after source stream pushes null (due to destroy being called on client stream)

A common use case case of multiplex is call the destroy method on the client stream to signal the server stream to stop sending data. When the server is done sending data, the end event should propagated to the client stream -- which it currently does not. Below is an example illustrating the issue. (Just copy and paste it into a file, install the necessary packages, and run the file).

// Emit end event on client stream after source stream pushes null, due to destroy being called on client

var Multiplex = require('multiplex');
var Stream = require('stream');
var net = require('net');

// setup the 'server' side
var server = net.createServer(function (stream) {

    // setup stream log in 'server'
    var multiplex = Multiplex(function (stream, id) {

        // create stream on multiplex channel open
        var s = new Stream.Readable();
        s._read = function() {};
        var intervalId = setInterval(function() {
            s.push('foo');
        }, 500);
        // define destroy method on stream to close it properly when it gets destroy signal from client
        s.destroy = function () {
           clearInterval(intervalId);
           s.push(null);
        };

        // pipe created stream into multiplex
        s.pipe(stream);

        // handle error events from stream, and interpret as destroy call when
        // necessary
        stream.once('error', function (err) {
            if (err.message==='Channel destroyed') {
                if (s.destroy) s.destroy();
            } else {
                s.emit('error', err);
            }
        });

    });

    stream.pipe(multiplex).pipe(stream);
});
server.listen(5000);


// setup the 'client' side
var con = net.connect(5000, function () {

    var multiplex = Multiplex();
    con.pipe(multiplex).pipe(con);

    // open stream on 'client'
    var stream = multiplex.createStream('someId');

    stream.on('data', function(data) {
        console.log('got data from stream:');
        console.log(data);
    });
    stream.on('end', function() {
        console.log('end event emitted! THIS SHOULD BE EMITTED ONLY ONCE THE SOURCE STREAM ON THE CLIENT PUSHES NULL (in this example it is never emitted, and it should be. THIS GIVES THE CLIENT SIDE USEFUL INFO ABOUT WHEN THE SERVER STREAM IS DONE.');
    });
    stream.on('close', function() {
        console.log('close event emitted! THIS GETS EMITTED AS SOON AS DESTROY IS CALLED ON THE CLIENT STREAM (as it is this example). THEREFORE THIS EVENT GIVES NO INFO ABOUT WHEN THE SERVER SIDE STREAM IS DONE. EMITTING THE END EVENT ON THE CLIENT SIDE WHEN THE SERVER SIDE IS DONE IS VERY USEFUL.');
    });

    setTimeout(function() {
        stream.destroy();
        // this should cause the end event to be emitted on the this stream, but currently does not
    }, 2000);

});

Send all Error properties through multiplex, not just the message

It looks like only the message property is copied over when an Error is sent through multiplex.

// index.js line 67
this._multiplex._send(this.channel << 3 | (this.initiator ? 6 : 5), err ? new Buffer(err.message) : null)

It is very common to augment Error objects with additional properties. Could we have multiplex copy over the entire Error object (recursively with nested objects)? Currently we lose the majority of the useful information with Errors that have additional properties.

Thanks in advance.

"finish" event propagation bug

When calling end() on a client side stream, the "finish" should not be emitted on the server side -- it currently is (when an on "data" listener is applied on the server).

Calling end() on the client indicates no more data to be written from the client to the server. Calling end() end on the client should not cause a "finish" event to be emitted on the server. A "finish" event indicates that you can no longer write to a stream. A "finish" event should only by emitted on the server when we no longer want to write data from the server to the client. This is because multiplex supports full duplex streams. Calling end() in one direction (e.g. on the client), should not stop you from being able to send data the other way (e.g. sending data from server to client).

The following example illustrates this bug. A full duplex stream is opened between server and client. Data is sent from server to client, but when client calls end(), it stops data transfer from server to client. It should only be stopping data transfer from client to server.

var multiplex = require('multiplex');
var plex1 = multiplex();
var stream1 = plex1.createStream();


////////////////////////////////////////////////////////////////////////////////
// server side
////////////////////////////////////////////////////////////////////////////////
var plex2 = multiplex(function onStream(stream, id) {

  stream.on('error', function(error) {
    console.log(error);
  });

  stream.on('finish', function() {
    console.log('Client stream emits "finish" event -- this should not happen');
  });


  // commenting this block out makes it work, wierd!
  stream.on('data', function(c) {
    console.log('Client received: ' + c.toString());
  });


  setInterval(function () {
    console.log('Client tries to send data: yo');
    stream.write('yo');
  }, 200);

});


////////////////////////////////////////////////////////////////////////////////
// client side
////////////////////////////////////////////////////////////////////////////////

setTimeout(function() {
    console.log('Server calls end()');
    stream1.end('some_end_data_from_server');
}, 1000);

stream1.on('error', function(err) {
    console.log(err);
});

stream1.on('data', function(data) {
    console.log('Server received data from client: ' + data.toString());
});


////////////////////////////////////////////////////////////////////////////////
// pipe the client server together
////////////////////////////////////////////////////////////////////////////////
plex1.pipe(plex2).pipe(plex1);

multiplex hangs with a lot of load

I made a thin shim on top of multiplex to support the interface defined in abstract-stream-muxer, so that I could run the stress tests. Found that on the "mega" test (10000 streams with 10000 messages each), multiplex hangs with:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory

To run the example, simple turn the mega test on by changing the bool on: https://github.com/diasdavid/node-multiplex-stream-muxer/blob/master/tests/test.js#L14

As a note, the spdy-stream-muxer is able to cope with the test.

Btw, thanks for the inspiration with abstract-blob-store :):)

Is there a clean way to close a stream between two muxes?

var mux1 =  multiplex(function () {

});

var mux2 = multiplex(function (stream) {
  stream.on('error', function (err) {
    console.log("Got error on mux2 - " + err);
  });
  stream.on('finish', function () {
    console.log("We are finish in mux2");
  });
  stream.on('close', function () {
    console.log("We are closed in mux2");
  });
});

mux1.pipe(mux2).pipe(mux1);

var stream = mux1.createStream();
stream.on('finish', function () {
  console.log("We are finish");
});
stream.on('close', function () {
  console.log("We are closed");
});
stream.end();

If I run that code then all I get out is "We are finish". In other words, there doesn't seem to be anyway for mux2 to find out that the stream it's connected to on mux1 has ended. That seems surprising to me.

Now if I change stream.end() to stream.destroy() then I will get an error in mux2 followed by a close. I suppose that works but it's not terribly gentle. I would have expected that there is some 'clean' way to close a stream.

What am I missing? Thanks!

Pass in any object to `destroy` method?

It looks like the current destroy method only accepts an Error object. Would it make sense for the destroy method to accept any object instead of a strictly an Error object?

Thanks in advance.

Update readable-stream

Would a PR updating readable-stream be welcome? I'm mostly interested in getting the new goodies like async iteration.

optimize packing of automatic ids

At the moment when createStream() is called the passed id is converted to a string. This means that when a stream is created with an automatic id you can only have 10 ids until another byte is required. This could be optimized to allow up to 256 automatic ids for a single id byte.

The downside of this would be potential clashes between streams created with automatic id vs manual id. For example createStream('a') would create the same id as the 41st stream created automatically.

What do you think?

collisions in automatic ids

Examining the code, I think there is a problem with the automatic ids.
If both ends of the outer stream create automatic ids,
then they will have the same value, so these will be routed together.

If we both create the same id at the same time, that becomes the same stream,
maybe with a named stream that is a feature not a bug, but normally connections do not behave like that. If you want to create a particular type of stream you should send a header at the start of the stream - like an http header.

A better solution would be to use positive numbers for a call you created (when you dial the phone) and negative numbers your calls you received (when you answer the phone)

So, if you see a stream with a negative number, it's a response to the stream you created Math.abs(stream.id).

Also, you could use varints instead of strings inside multibuffers.

Memory leak when multiple streams created at the same time

Hi all,

When creating multiple streams at once the _.local and _remote lists can continuously grow in size due to the null entries never getting cleared up. This gradually causes a memory leak.

Sample gist here

https://gist.github.com/Alan01252/efa75907063ff5c5b63df26098f1a06d

I've naively fixed the issue here, it's not perfect ( I'm sure you guys will have much better ideas on how to do this ) but it does seem to work.

https://github.com/Alan01252/multiplex/tree/fix/memoryLeak

If you have a few minutes I'd be very appreciative of your thoughts / solutions :)

Thank you

unable to send data back over substream / reply

i've taken the basic example and am attempting to write data back in the reverse direction (i.e. answer the incoming data), which doesn't seem to work - any idea why?

var multiplex = require("multiplex")

var plex1 = multiplex()
var stream1 = plex1.createStream()
var stream2 = plex1.createStream()

stream1.on('data',function(c){
	console.log('data back', c.toString())
})
stream2.on('data',function(c){
	console.log('data back', c.toString())
})

var plex2 = multiplex(function onStream(stream, id) {
	stream.on('data', function(c) {
		console.log('data', id, c.toString())
		stream.write('got it!')
	})
})

plex1.pipe(plex2)

stream1.write(new Buffer('stream one!'))
stream2.write(new Buffer('stream two!'))

issues with old version of through2 in the browser

I'm getting some issues with the old version of through2 that this module depends on (~0.2.3) when used in the browser:

Uncaught TypeError: Object prototype may only be an Object or null: undefined

When I upgrade to the latest version, this error goes away.

document internal state var

this._state is a number but its not clear what the numbers represent

trying to debug an unexpected stream closure and unsure what each state number represents

2.0.3 breaks in the browser when used with websocket-stream

websocket-stream receives ArrayBuffer instances on the client (in my case Uint8Array), through2 doesn't play nice with anything that is not a Buffer or String (the older version of through in <2.0.2 did play nice), as a result multiplex breaks when used alongside websocket-stream in the browser...

What is the motivation for the different behaviors of stream.end() and stream.destroy()?

var mux =  multiplex(function () {

});
var stream = mux.createStream();
stream.on('finish', function () {
  console.log("We are finish");
});
stream.on('close', function () {
  console.log("We are closed");
});
stream.end();

I would have expected that this code would print both "We are finish" and "We are closed". But in fact, it only prints "We are finish". There never seems to be a close event emitted on the stream.

If I change stream.end() to stream.destroy() then I get the opposite behavior, "We are closed" is printed but not "We are finish".

This was tested with 6.7.0.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.