The buffer operator groups items emitted on the source observable based on different types of window selectors. These selectors can be defined via another observable, an item count, or time information. The following figure shows the marble diagram of the buffer operator:
This operator has several prototypes:
Observable.buffer(self, buffer_openings=None,
buffer_closing_selector=None)
Observable.buffer_with_count(self, count, skip=None)
Observable.buffer_with_time(self, timespan,
timeshift=None, scheduler=None)
Observable.buffer_with_time_or_count(self,
timespan, count, scheduler=None)
The first version allows us to use another observable to control when a new buffer begins. Each time the buffer_openings observable emits an item, then the current buffer is emitted, and a new buffer is created. The buffer_closing_selector parameter is an optional parameter. When provided, it must be a function that returns an observable whose completion ends the current buffer. On completion, this function will be called again to start a new buffer. The buffer_closing_selector parameter can also be provided as the first parameter of this operator. If the first parameter is an observable, then it is used as the buffer_openings parameters, and if it is a function, then it is used as the buffer_closing_selector parameter.
The following is the first way to use the buffer operator:
numbers = Subject()
windows = Subject()
numbers.buffer(windows).subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)
numbers.on_next(1)
numbers.on_next(2)
windows.on_next(True)
numbers.on_next(3)
numbers.on_next(4)
numbers.on_next(5)
windows.on_next(True)
The numbers observable is the observable that must be buffered. The windows observable emits items each time a new buffering window must start. The following example gives the following result: