Rx.Observable.prototype.buffer()
The buffer
method periodically gathers items emitted by a source Observable into buffers, and emits these buffers as its own emissions.
Note that if the source Observable issues an onError
notification, buffer
will pass on this notification immediately without first emitting the buffer it is in the process of assembling, even if that buffer contains items that were emitted by the source Observable before it issued the error notification.
There are a number of ways with which you can regulate how buffer
gathers items from the source Observable into buffers:
With buffer closing selector
Rx.Observable.prototype.buffer(bufferClosingSelector);
Returns an Observable that emits buffers of items it collects from the source Observable
. The resulting Observable
emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a new buffer whenever the Observable
produced by the specified bufferClosingSelector
emits an item.
Arguments
bufferClosingSelector
(Function
): A function invoked to define the closing of each produced window.
Returns
(Observable
): An observable sequence of windows.
Example
// With closings
var source = Rx.Observable.timer(0, 50)
.buffer(function () { return Rx.Observable.timer(125); })
.take(3);
var subscription = source.subscribe(
function (x) {
console.log('Next: %s', x);
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
// => Next: 0,1,2
// => Next: 3,4,5
// => Next: 6,7
// => Completed
With buffer opening and buffer closing selector
Rx.Observable.prototype.buffer(bufferOpenings, bufferClosingSelector);
This version of buffer
monitors an Observable
, bufferOpenings
, that emits Observable objects. Each time it observes such an emitted object, it creates a new bundle to begin collecting items emitted by the source Observable and it passes the bufferOpenings
Observable into the bufferClosingSelector
function. That function returns an Observable
. buffer
monitors that Observable
and when it detects an emitted object, it closes its bundle and emits it as its own emission.
bufferOpenings
(Observable
): Observable sequence whose elements denote the creation of new windows.bufferClosingSelector
(Function
): A function invoked to define the closing of each produced window.
Returns
(Observable
): An observable sequence of windows.
Example
/* Using Openings and Closing Selector */
var openings = Rx.Observable.interval(200);
var source = Rx.Observable.interval(50)
.buffer(openings, function (x) { return Rx.Observable.timer(x + 100); })
.take(3);
var subscription = source.subscribe(
function (x) {
console.log('Next: %s', x);
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
// => Next: 3,4
// => Next: 7,8
// => Next: 11,12
// => Completed
With boundaries
Rx.Observable.prototype.buffer(bufferBoundaries);
Returns an Observable
that emits non-overlapping buffered items from the source Observable
each time the specified boundary Observable
emits an item.
Arguments
bufferBoundaries
(Observable
): Sequence of buffer boundary markers. The current buffer is closed and a new buffer is opened upon receiving a boundary marker.
Returns
(Observable
): An observable sequence of windows.
Example
/* Using buffer boundaries */
var openings = Rx.Observable.interval(500);
// Convert the window to an array
var source = Rx.Observable.timer(0, 100)
.buffer(openings)
.take(3);
var subscription = source.subscribe(
function (x) {
console.log('Next: %s', x);
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
// => Next: 0,1,2,3,4
// => Next: 5,6,7,8,9,10
// => Next: 11,12,13,14,15
// => Completed
Location
File:
Dist:
Prerequisites:
NPM Packages:
NuGet Packages:
Unit Tests: