RxJS for Highland.js Users
Highland.js is a general purpose utility belt for handling both synchronous and asynchronous code. It is built upon node.js with EventEmitters and Streams with a focus on composition.
But, before we get started, why use RxJS over Highland.js?
Uses an Array#extras interface instead of node.js Streams
Highland.js is built upon node.js infrastructure, including Streams. This is great if you're a node developer using streams, but there are many who do not and simply use callbacks and events. In addition, Streams are hard to get right, hence the changes between Streams 1, 2 and 3 with regards to pause/resume, high water marks, draining, etc. The Streams Handbook is a great way to get started looking at streams as an approach.
The Reactive Extensions are built on a language and runtime neutral approach that works great across languages, whether it is Java, .NET, Python, Ruby, Scala, Clojure, and JavaScript. This approach allows you easily to switch between Java, JavaScript, and .NET, and get a familiar feel between all the versions. RxJS was built upon the concept with the Array#extras in mind, where if you know Arrays in JavaScript, then you know RxJS. The only real difference is the possible element of time, and a push model instead of the Array's pull model.
For example, you can write the following using Arrays in JavaScript.
var source = Array.of(1,2,3,4,5);
source
.map(function (x, i, a) { return x * x; })
.filter(function (x, i, a) { return x % 3 === 0; })
.forEach(function (x, i, a) {
console.log('next', x);
});
The same can then be done using RxJS with only one small code change of changing Array
to Rx.Observable
.
var source = Rx.Observable.of(1,2,3,4,5);
source
.map(function (x, i, a) { return x * x; })
.filter(function (x, i, a) { return x % 3 === 0; })
.forEach(function (x, i, a) {
console.log('next', x);
});
No Reliance on node.js Infrastructure
Highland.js is built upon node.js infrastructure using both the EventEmitter
and Stream
class which bring a number of dependencies with it for Browserify to make it work for the browser., including the many drawbacks of the ever changing Streams designs within node.js. As it interoperates with existing streams, they may or may not be well behaved in whether they will actually pause or not and whether backpressure actually works. These were key reasons for changes between Streams 1, 2 and 3.
Instead, RxJS takes a clean room approach with no external dependencies, which allows for a cleaner design without the technical debt of the Stream
and EventEmitter
designs in node.js. RxJS is built from scratch on a solid platform of the core objects of Observer
, Observable
, operators for composition, and Scheduler
for controlling concurrency.
Interoperability With Libraries and the DOM
One of the most important parts of when choosing a library is how well it works with the libraries you already use. Highland.js, by default, ties itself directly to jQuery or any kind of event emitter with an on
method, such as Zepto.js for all DOM event binding, with no deterministic way of unbinding that added event handler. For everything else, EventEmitters
are used for eventing.
RxJS, on the other hand is more flexible about binding to the libraries you use or to the plain old DOM. You can bind directly to the DOM elements using Rx.Observable.fromEvent
which binds not only to a standard DOM Element, but also DOM NodeLists, etc directly, with deterministic disposal of events.
For external libraries, if you use jQuery or Zepto.js, Rx.Observable.fromEvent
will work perfectly for you. In addition, we also support the other libraries out of the box:
You can also override this behavior in fromEvent
so that you use native DOM or Node.js events via the EventEmitter
directly by setting the Rx.config.useNativeEvents
flag to true, so that it's never in doubt which event system you are using. When using native DOM events, you can attach a listener to one item, or you can attach listeners to a NodeList's children, we figure that out for you without you having to change your code.
In order to support the libraries you use, it's very simple to add your own event binding, for example we can bind our events to the Dojo Toolkit using the Rx.Observable.fromEventPattern
method:
require(['dojo/on', 'dojo/dom', 'rx'], function (on, dom, rx) {
var input = dom.byId('input');
var source = Rx.Observable.fromEventPattern(
function addHandler (h) {
return on(input, 'click', h);
},
function delHandler (_, signal) {
signal.remove();
}
);
var subscription = source.subscribeOnNext(function (x) {
console.log('Next: Clicked!');
});
on.emit(input, 'click');
// => Next: Clicked!
});
In addition, transducers hold a great amount of potential for high performance querying, and to that end, RxJS has added transducers support via the transduce
method.
var t = transducers;
var source = Rx.Observable.range(1, 4);
function increment(x) { return x + 1; }
function isEven(x) { return x % 2 === 0; }
var transduced = source.transduce(t.comp(t.map(increment), t.filter(isEven)));
transduced.subscribe(
function (x) { console.log('Next: %s', x); },
function (e) { console.log('Error: %s', e); },
function () { console.log('Completed'); });
// => Next: 2
// => Next: 4
// => Completed
Browser Compatibility
Browser and runtime compatibility is important to RxJS. For example, it can run not only in the browser, but also Node.js, RingoJS, Narwhal, Nashorn, and event Windows Scripting Host (WSH). We realize that there are many users out there without access to the latest browser, so we do the best to accommodate this with our compat
builds. These builds bridge all the way back to IE6 for support which includes using attachEvent
with event normalization, to DOM Level 1 events. Highland.js does not have this behavior, and instead relies solely on EventEmitter
or jQuery for events support.
We also want to build RxJS for speed with asynchronous operations, so we optimize for the browser and runtime you are using. For example, if your environment supports setImmediate
for immediate execution, we will use that. Else, if you're in Node.js and setImmediate
is not available, it falls back to process.nextTick
. For browsers and runtimes that do not support setImmediate
, we will fall back to postMessage
, to MessageChannel
, to asynchronous script loading, and finally defaulting back to setTimeout
or event WScript.Sleep
for Windows Scripting Host.
These compat
files are important as we will shim behavior that we require such as Array#extras
, Function#bind
and so forth, so there is no need to bring in your own compatibility library such as html5shiv, although if they do exist already, we will use the native or shimmed methods directly.
Standards Based
Iterables and Array#extras
RxJS has a firm commitment to standards in JavaScript. Whether it is supporting the Array#extras
standard method signatures such as map
, filter
, reduce
, every
and some
, or to some new emerging standard on collections, RxJS will implement these features accordingly from pull collections to push. Unlike Highland.js, RxJS conforms to Array#extras
syntax to have the callback style of function (item, index, collection)
in addition to accepting a thisArg
for the calling context of the callback. This helps as you can easily reuse your code from the Array version to Observable version.
An example of forward thinking is the introduction of ES6+ operators on Array with implementations on Observable such as:
includes
find
findIndex
from
of
In addition, RxJS supports ES6 iterables in many methods which allow you to accept Map
, Set
, Array
and array-like objects, for example in Rx.Observable.from
, flatMap
, concatMap
among others. RxJS also has the capability of converting to Maps and Sets via the toMap
and toSet
methods if available in your runtime.
This makes the following code possible to yield an array from an observable sequence and have it automatically converted into an observable sequence.
Rx.Observable.range(1, 3)
.flatMap(function (x, i) { return [x, i]; })
.subscribeOnNext(function (value) {
console.log('Value: %o', value);
});
// => 1
// => 0
// => 2
// => 1
// => 3
// => 2
Generators
Generators also play a big part in the next version of JavaScript. RxJS also takes full advantage of generators as iterables in such methods as the aforementioned Rx.Observable.from
, flatMap
, concatMap
and other methods. For example, you can yield values like the following:
var source = Rx.Observable.from(
function* () { yield 1; yield 2; yield 3; }()
);
source.subscribeOnNext(function (value) {
console.log('Next: %s', value);
});
// => 1
// => 2
// => 3
Generators also give the developer pretty powerful capabilities when dealing with asynchronous actions. RxJS introduced the Rx.spawn
method which allows you to write async/await style code over Observables, Promises, Arrays, and just plain objects.
Rx.spawn(function* () {
var x = yield Rx.Observable.just(42);
var y = yield Promise.resolve(42);
console.log(x + y);
try {
var source = yield Rx.Observable.throwError(new Error('woops'));
} catch (e) {
console.log(e.message);
}
})();
// => 84
// => woops
Promises
Promises have been a great way for developers to express single value asynchronous values. With ES6, they have now been standardized and are starting to appear in all browsers. RxJS also supports Promise
values as arguments in many places as noted in our Bridging to Promises documentation. No more having to call Rx.Observable.fromPromise
everywhere, RxJS automatically converts them for you.
var source = Rx.Observable.fromEvent(input, 'keyup')
.map(function (e) { return e.target.value; })
.flatMapLatest(function (text) {
return queryPromise(text);
});
RxJS also allows an Observable sequence to be converted to an ES6 compliant Promise
with the toPromise
method.
Rx.Observable.just(42)
.toPromise()
.then(function (value) {
console.log(value);
});
// => 42
Swappable Concurrency Layer
RxJS can easily be described in three pieces. First the Observer
and Observable
objects, secondly by the operators for composition on top of them, and finally a swappable concurrency layer which allows you to swap out your concurrency model at any time. This last part is key which distinguishes RxJS from many other libraries. There are a number of advantages to this approach that may be subtle at first, but invaluable as you start to use them.
The first advantage is that you can switch where callbacks are executed, for example, instead of using setImmediate
or any of its fallbacks, you can execute the callback on window.requestAnimationFrame
for smooth animations.
// Available in RxJS-DOM project
var scheduler = Rx.Scheduler.requestAnimationFrame;
function render(value) {
// Do something to render the value
}
Rx.Observable.range(1, 100, scheduler)
.subscribe(render);
Schedulers also have advantages with virtual time, which means we can say what time it really is. This is great for deterministic testing in which you can verify the behavior of every single operator. RxJS, through the TestScheduler
can record when items happen and what values were yielded, thus no need for asynchronous testing.
var scheduler = new TestScheduler();
var xs = scheduler.createHotObservable(
Rx.ReactiveTest.onNext(201, 1),
Rx.ReactiveTest.onCompleted(202)
);
var results = scheduler.startWithCreate(function () {
return xs.map(function (x, i) { return x + x + i });
});
// Some custom collection assertion for values
collectionAssert.assertEqual(results.messages, [
Rx.ReactiveTest.onNext(201, 2),
Rx.ReactiveTest.onCompleted(202)
]);
// Some custom collection assertion for subscriptions
collectionAssert.assertEqual(xs.subscriptions, [
Rx.ReactiveTest.subscribe(200, 202)
])
Every operator within RxJS has tests written in this style where all data is easily verified. Not only can we create observables through the TestScheduler
, but we can create Promises via the createResolvedPromise
and createRejectedPromise
methods.
Virtual time has more advantages as well. Imagine if you had some historical stock data that you wanted to run through a simulation. Using the Historical
scheduler, you can easily accomplish this.
var scheduler = new Rx.HistoricalScheduler(new Date(2014, 1));
var source = new Rx.Subject();
getStockData().forEach(function (stock) {
scheduler.scheduleWithAbsolute(stock.date, function () {
stock.onNext(stock);
});
});
// Calculate with the data
source.groupBy(function (stock) { return stock.symbol })
/* Do something with the data */
.subscribe(function (info) {
// Process the data
});
Multicast Backpressure
In our applications, we consume a lot of data from external sources. But, what if our consumers cannot handle the load from the sequence?
Highland.js uses the standard backpressure mechanisms such as pause
/resume
from standard node.js streams, as well as throttle
and debounce
for lossy operations. The pause/resume from normal node.js streams work for multicast streams to share backpressure via fork
, or to keep them separate with observe
, but this can sometimes lead to confusion, especially when mixed.
RxJS has a number of mechanisms to handle this in the Backpressure and Observable Sequences documentation. This could come in the form of lossy operations such as debounce
, throttleFirst
, sample
, pausable
, to loss-less operations such as pausableBuffered
, buffer
, windowed
, stopAndWait
, etc. These work on the idea of multicast streams so that one consumer is not punished by another's inability to process the data in a timely manner.
In RxJS, you can synchronize across streams with backpressure with the pause/resume semantics using a Pauser
or just any ordinary Subject
with onNext(true)
to resume and onNext(false)
to pause:
var pauser = new Rx.Pauser();
var source1 = getData().pausableBuffered(pauser);
var source2 = getOtherData().pausableBuffered(pauser);
// To pause both streams
pauser.pause();
// To resume both streams
pauser.resume();
Build What You Want
RxJS has a rather large surface area, so we give you the ability to build RxJS with only the things you want and none of the things you don't via the rx-cli
project. For example, we can build a compat
version of RxJS with only the map
, flatMap
, takeUntil
, and fromEvent
methods, which keeps your RxJS version lean and mean.
rx --lite --compat --methods map,flatmap,takeuntil,fromevent
Long Stack Trace Support
Debugging programming with callbacks can be quite cumbersome. To that end, RxJS has introduced a notion of "Long Stack Traces" which allows you to quickly isolate your code from the plumbing not only of RxJS, but also node.js and the browser cruft, thus getting you to the real cause of the issue.
For example, without "long stack trace" support, typically, an error would look like the following:
var Rx = require('rx');
var source = Rx.Observable.range(0, 100)
.timestamp()
.map(function (x) {
if (x.value > 98) throw new Error();
return x;
});
source.subscribeOnError(
function (err) {
console.log(err.stack);
});
$ node example.js
Error
at C:\GitHub\example.js:6:29
at AnonymousObserver._onNext (C:\GitHub\rxjs\dist\rx.all.js:4013:31)
at AnonymousObserver.Rx.AnonymousObserver.AnonymousObserver.next (C:\GitHub\rxjs\dist\rx.all.js:1863:12)
at AnonymousObserver.Rx.internals.AbstractObserver.AbstractObserver.onNext (C:\GitHub\rxjs\dist\rx.all.js:1795:35)
at AutoDetachObserverPrototype.next (C:\GitHub\rxjs\dist\rx.all.js:9226:23)
at AutoDetachObserver.Rx.internals.AbstractObserver.AbstractObserver.onNext (C:\GitHub\rxjs\dist\rx.all.js:1795:35)
at AnonymousObserver._onNext (C:\GitHub\rxjs\dist\rx.all.js:4018:18)
at AnonymousObserver.Rx.AnonymousObserver.AnonymousObserver.next (C:\GitHub\rxjs\dist\rx.all.js:1863:12)
at AnonymousObserver.Rx.internals.AbstractObserver.AbstractObserver.onNext (C:\GitHub\rxjs\dist\rx.all.js:1795:35)
at AutoDetachObserverPrototype.next (C:\GitHub\rxjs\dist\rx.all.js:9226:23)
This can be remedied using "long stack trace" support by setting the following flag:
Rx.config.longStackSupport = true;
Then we can run our program again using this support
$ node example.js
Error
at C:\GitHub\example.js:6:29
From previous event:
at Object.<anonymous> (C:\GitHub\example.js:3:28)
From previous event:
at Object.<anonymous> (C:\GitHub\example.js:4:4)
From previous event:
at Object.<anonymous> (C:\GitHub\example.js:5:4)
Many Examples and Tutorials
As people try to learn RxJS, it's always great to have examples to get them started. To that end, RxJS ships a number of examples out of the box including simple scenarios such as Autocomplete, Follow the Mouse, Drawing, to more complete examples like databinding using a little demo project called TKO
, to a complete game of Alphabet Invasion.
Want to learn RxJS at your own pace? We also have tutorials for that as well called LearnRx which will walk you through the basics of learning to compose arrays, and then how that applies to observable sequences.
There are also many community resources to learn RxJS from videos, to presentations, to examples with integration with such libraries as AngularJS and React.
Extensive Documentation
As stated before, RxJS has a rather large surface area so sometimes it's hard to know where to start. To that end, RxJS has complete API documentation, as well as a Getting Started Guide as well as Guidelines, a How Do I? section as well as a growing list of comparisons with other libraries.