The buffer operator

The buffer operator groups items emitted on the source observable based on different types of window selectors. These selectors can be defined via another observable, an item count, or time information. The following figure shows the marble diagram of the buffer operator:

Figure 9.1: The buffer operator

This operator has several prototypes:

Observable.buffer(self, buffer_openings=None, 
buffer_closing_selector=None)
Observable.buffer_with_count(self, count, skip=None)
Observable.buffer_with_time(self, timespan,
timeshift=None, scheduler=None)
Observable.buffer_with_time_or_count(self,
timespan, count, scheduler=None)

The first version allows us to use another observable to control when a new buffer begins. Each time the buffer_openings observable emits an item, then the current buffer is emitted, and a new buffer is created. The buffer_closing_selector parameter is an optional parameter. When provided, it must be a function that returns an observable whose completion ends the current buffer. On completion, this function will be called again to start a new buffer. The buffer_closing_selector parameter can also be provided as the first parameter of this operator. If the first parameter is an observable, then it is used as the buffer_openings parameters, and if it is a function, then it is used as the buffer_closing_selector parameter.

The following is the first way to use the buffer operator:

numbers = Subject()
windows = Subject()
numbers.buffer(windows).subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

numbers.on_next(1)
numbers.on_next(2)
windows.on_next(True)
numbers.on_next(3)
numbers.on_next(4)
numbers.on_next(5)
windows.on_next(True)

The numbers observable is the observable that must be buffered. The windows observable emits items each time a new buffering window must start. The following example gives the following result:

on_next [1, 2]
on_next [3, 4, 5]

The first two items are emitted as a single list item. Then, the next three items are emitted as a second item.

The closing_selector parameter can be used in the following way:

window_selector = None
def closing_selector():
print("closing_selector")
global window_selector
window_selector = Subject()
return window_selector

numbers = Subject()
numbers.buffer(closing_selector).subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

numbers.on_next(1)
numbers.on_next(2)
numbers.on_next(3)
window_selector.on_completed()
numbers.on_next(4)
numbers.on_next(5)
window_selector.on_completed()

The numbers observable is the observable that must be buffered. The closing_selector function returns an observable that emits no items. This observable is completed after emitting three items, and then two items. This example gives the following result:

closing_selector
on_next [1, 2, 3]
closing_selector
on_next [4, 5]
closing_selector

First, the closing_selector function is called. This allows the buffer operator to subscribe to its completion. When the window_selector observable completes, then the first item is emitted (the 1, 2, 3 list). After that, the closing_selector function is called again. When the associated observable completes, then a second item is emitted (the 4, 5 list). The closing_selector function is called one last time by the operator to start a new buffer.

The second prototype of the buffer operator creates buffers based on the item, count. The count parameter indicates how many items must be grouped together. The optional skip parameter indicates how many items must be skipped between each buffer creation. The default value is the one provided in count. The skip parameter allows us to use overlapping windows for buffering.

This second variant can be used this way:

numbers = Observable.from_([1, 2, 3, 4, 5, 6])
numbers.buffer_with_count(3).subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

This snippet gives the following result:

on_next [1, 2, 3]
on_next [4, 5, 6]
on_completed

The six items are grouped by three, and so the resulting observable emits two items.

The third variant of this operator allows us to buffer source items based on time information. The timespan parameter is an integer that indicates, in milliseconds, how many time items must be grouped together. The optional timeshift parameter indicates how many times in milliseconds must elapse before creating a new buffer. By default, its value is one of the timespan parameters. The timespan parameter allows us to use overlapping windows for the buffering.

The following is an example using this operator:

numbers = Subject()
dispoable = numbers.buffer_with_time(200).subscribe(
on_next = lambda i: print("on_next {}".format(i)),
on_error = lambda e: print("on_error: {}".format(e)),
on_completed = lambda: print("on_completed")
)

numbers.on_next(1)
numbers.on_next(2)
t1 = threading.Timer(0.250, lambda: numbers.on_next(3))
t1.start()
t2 = threading.Timer(0.450, lambda: numbers.on_next(4))
t2.start()
t3 = threading.Timer(0.750, lambda: dispoable.dispose())
t3.start()

In this example, items of the numbers observable are grouped in a window of 200 ms. Items 1 and 2 are emitted immediately, then item 3 is emitted after 250 milliseconds, item 4 is emitted after 200 more milliseconds, and finally the subscription is disposed of after 300 milliseconds. This gives the following result:

on_next [1, 2]
on_next [3]
on_next [4]

Numbers 1 and 2 are emitted in the same item because they are received within the same 200 milliseconds. Then, 3 and 4 are emitted as two other items because each one is in a separate 200-millisecond window.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.141.219