github Reactive-Extensions/RxJS v2.2.15
RxJS Release v2.2.15

latest releases: v4.1.0, v4.0.8, v4.0.7...
10 years ago

This is a slight update to RxJS v2.2.14 release to fix some bugs associated with backpressure. This is a much more tested solution complete with controlled to get the number of requested items.

BackPressure

This is the first experimental release of backpressure. The idea is to pause and resume for a particular observable if the observer cannot keep up for whatever reason. To do this automatically seems to us naive, and instead, we should not punish the producer if the consumer cannot keep up, so we've set a pretty high bar for getting it right. You can now find them in their own file rx.backpressure.js or if you're using rx.lite.js, then you're already in luck because you have them already.

There are many ways around this problem of backpressure including using throttle if you're ok with losing some data, the buffer methods by time, count, etc, if you'd like the results in batch for a particular count or timeframe. In addition, if you want only a value in a given time span, then you could use sample. In this case, we've added three methods, pausable, pausableBuffered and controlled.

With pausable, you have the ability to pause a hot observable, such as mouse movements and then resume, but you will lose data in between the pause and resume methods. Below is an example of it in action.

var controller = new Rx.Subject();
var events = Rx.Observable.fromEvent(document, 'mousemove');

// Control the events by the controller
var controlled = events.pausable(controller);

var subscription = controlled.subscribe(function (e) {
  // Do something with events
  // Woops, too fast
  // Pause the event stream
  controller.onNext(false);

  // When you want to start again, call this: 
  controller.onNext(true);
});

// Start listening
controller.onNext(true);

For a more comprehensive view of it in action, check out the tests for pausable.

The other is [pausableBuffered`](https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/backpressure/pausablebuffered.js) where you will not lose data, in fact, it will be kept in a buffer until you are ready to start consuming again.

var controller = new Rx.Subject();
var interval = Rx.Observable.interval(1000).timeInterval();

// Control the events by the controller
var controlled = interval.pausableBuffered(controller);

var subscription = controlled.subscribe(function (x) {
  console.log('x', x.value);
});

// Start it
var shouldRun = true;
controller.onNext(shouldRun);

// Make it pause every so often, and then will backfill with results emptying the buffer
setInterval(function () {
  controller.onNext(shouldRun = !shouldRun);
}, 5000);

Once again, to see a more comprehensive view of this in action, check the associated tests for pausableBuffererd

Finally, we have the controlled operator, which turns the observable sequence into a push/pull scenario in which we can request the number of items we want at a time. This gives the observer the chance to tell the observable sequence how many items it wants at any point, unlike any of the buffer methods.

var source = Rx.Observable.range(0, 1000).controlled();

source.subscribe(function(x) {
  console.log('x', x);
});

// Get 10 items
source.request(10);

// Maybe later get another
source.request(5);

You can get a better sense of the method's details, check out the associated tests for controlled

What's Next?

Before we hit the 2.3 mark, we'd like to get more feedback on the backpressure items, including if some of the other implementations such as windowed and stop and wait mechanisms are worthwhile. In addition, we are starting the expression parsing work in order to enable scenarios like remote communication with serialized observable sequences.

Don't miss a new RxJS release

NewReleases is sending notifications on new releases.