github Reactive-Extensions/RxJS v2.4.7
RxJS version 2.4

latest releases: v4.1.0, v4.0.8, v4.0.7...
9 years ago

This is the first stable release of RxJS 2.4! There have been several things at the forefront of our mind which is performance and modularity.

To that end, we focused on the following for this release:

  • Performance Enhancements
  • New NPM Packages
  • New operators/methods
  • Non-breaking Changes
  • Breaking Changes

Performance Enhancements

Performance has been a key aspect of the past RxJS releases and for 2.4 will continue to be a work in progress. You will note that we have included performance tests using benchmark.js which tests 2.4.x against previous 2.3.x releases in the tests/perf/operators folder.

You can run them to see what kinds of gains we were able to get for example, this is a check of the new map implementation versus the previous version.

function square(x) { return x * x; }
function double(x) { return x + x; }

Rx.Observable.range(0, 50)
  .map(square)
  .map(double).subscribe();

Running this code between versions, we can see a definite speedup:

$ node map.js
old x 12,968 ops/sec ±3.44% (89 runs sampled)
new x 15,079 ops/sec ±4.34% (81 runs sampled)
Fastest is new

We can also test the two different versions of range to see what kinds of gains we were able to make:

Rx.Observable.range(0, 25).subscribe();

Running that sample, we can find the difference pretty staggering.

$ node range.js
old x 26,711 ops/sec ±3.81% (87 runs sampled)
new x 37,831 ops/sec ±4.33% (83 runs sampled)
Fastest is new

What Changes Did We Make?

There were a number of rules that we followed that allowed us to get faster code. They include:

  1. Do not let arguments leave the scope of the method
  2. Do not perform Array.prototype.slice on arguments and instead do a local copy
  3. Avoid try/catch/finally if possible such as the work Bluebird did
  4. Use the state based methods for scheduling which reduced scope chains
  5. Use classes instead of anonymous classes created by scope chains.
  6. Use method fusion when possible to chain together map with map calls, and filter with filter calls.

To cover some of them in more detail, we can for example reduce chained scopes by using the scheduler, let's look at the following example. Let's just use scheduleRecursive which does not allow for state to be passed in. Instead, we would have to close over it in our inner scope.

Observable.range = function (start, count, scheduler) {
  isScheduler(scheduler) || (scheduler = currentThreadScheduler);
  return new AnonymousObservable(function (observer) {
    var i = 0;
    return scheduler.scheduleRecursive(function (self) {
      if (i < count) {
        observer.onNext(start + (i++));
        self();
      } else {
        observer.onCompleted();
      }
    });
  });
};

Instead, we could use our state based which will rid ourselves of the i variable and capture the state solely within our scheduler.

Observable.range = function (start, count, scheduler) {
  isScheduler(scheduler) || (scheduler = currentThreadScheduler);
  return new AnonymousObservable(function (observer) {
    return scheduler.scheduleRecursiveWithState(0, function (i, self) {
      if (i < count) {
        observer.onNext(start + i);
        self(i + 1);
      } else {
        observer.onCompleted();
      }
    });
  });
};

Of course we went further by turning the range operator into its own class. You can find the optimized operators in the src/core/perf/operators folder.

We also looked into method fusion, for example, if the current operation is a map and the next operation is also a map, then we can shortcut it so that we only use a single Observer for them both.

return this instanceof MapObservable ?
  this.internalMap(selectorFn, thisArg) :
  new MapObservable(this, selectorFn, thisArg);

Then the internalMap function might look like the following where we have the two selector functions chained together for our new selector function.

MapObservable.prototype.internalMap = function (selector, thisArg) {
  var self = this;
  return new MapObservable(
    this.source, 
    function (x, i, o) { return selector(self.selector(x, i, o), i, o); }, thisArg);
};

This is by no means the end of our optimizations. We must carefully weigh the changes made here with the performance gains obtained versus the code size that we produce and that we are going for larger optimizations and not micro-optimizations.

New NPM Packages

Many users of RxJS want better modularity and smaller builds of RxJS using only what they need. One of the ways we have done this is by creating rx.lite.js which is a subset of the complete RxJS which has the most used operators in a single package.

We have now made the following packages also available on NPM:

New Operators/Methods

We have introduced two new operators/methods in this release:

  • Rx.Observable.mergeDelayError(..args)
  • Rx.BehaviorSubject.prototype.getValue()

Rx.Observable.mergeDelayError(..args)

The first operator, mergeDelayError flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to receive all successfully emitted items from all of the source Observables without being interrupted by an error notification from one of them.

This behaves like Observable.prototype.mergeAll except that if any of the merged Observables notify of an error via the Observer's onError, mergeDelayError will refrain from propagating that error notification until all of the merged Observables have finished emitting items.

var source1 = Rx.Observable.of(1,2,3);
var source2 = Rx.Observable.throwError(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6);

var source = Rx.Observable.mergeDelayError(source1, source2, source3);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => Error: Error: woops

Rx.BehaviorSubject.prototype.getValue()

Another operator that was added was to properly get the current value from a BehaviorSubject. In previous releases, you could use the value field, but it was not guarded against improper access. This value is frozen in time once onCompleted has been called. If onError has been called, calling this method will cause the Error from the onError call to be thrown.

var b = new Rx.BehaviorSubject(42);
console.log(b.getValue());
// => 42

b.onNext(56);
console.log(b.getValue());

try {
  b.onError(new Error('woops'));
  var x = b.getValue();
} catch (e) {
  console.log(e.message);
}
// => woops

Non-Breaking Changes

Sometimes we feel that in a previous release, we did not consider naming and now we're stuck with it, whether we like it or not.

Aliasing Rx.Scheduler.timeout to Rx.Scheduler.default

Such is the case for Rx.Scheduler.timeout which in the old days of RxJS used setTimeout indeed to schedule items. But, it is much more than that now that it should be the default scheduler used for asynchronous operations. To that, we have created an alias to it for Rx.Scheduler.default to indicate its new status and we have changed our documentation appropriately.

Breaking Changes

For point releases, RxJS will keep any changes to a minimum until we get to version 3 and beyond. There are some instances where there are breaking changes on internal code, not meant for public consumption.

Current Thread and Immediate Scheduler Changes

In previous releases, the Rx.Scheduler.immediate and Rx.Scheduler.currentThread allowed for blocking on the main thread for future scheduling with either relative or absolute time. This is now disallowed and will throw an Error if it is attempted. The affected APIs are:

Rx.Scheduler.immediate.scheduleWithRelative
Rx.Scheduler.immediate.scheduleWithRelativeAndState
Rx.Scheduler.currentThread.scheduleWithRelative
Rx.Scheduler.currentThread.scheduleWithRelativeAndState

Note that these were internal APIs not meant for public consumption, so none of your code should be affected. We wanted to ensure the proper usage of schedulers, hence disallowing anything that might block the main thread.

Don't miss a new RxJS release

NewReleases is sending notifications on new releases.