C++ Actor Framework 1.0.0
|
Represents a potentially unbound sequence of values. More...
#include <observable_decl.hpp>
Public Types | |
using | output_type = T |
The type of emitted items. | |
using | pimpl_type = intrusive_ptr<op::base<T>> |
The pointer-to-implementation type. | |
using | ignore_t = decltype(std::ignore) |
Type for drop-all subscribers. | |
Public Member Functions | |
observable (pimpl_type pimpl) noexcept | |
observable & | operator= (std::nullptr_t) noexcept |
template<class Operator > | |
std::enable_if_t< std::is_base_of_v< op::base< T >, Operator >, observable & > | operator= (intrusive_ptr< Operator > ptr) noexcept |
observable (observable &&) noexcept=default | |
observable (const observable &) noexcept=default | |
observable & | operator= (observable &&) noexcept=default |
observable & | operator= (const observable &) noexcept=default |
disposable | subscribe (observer< T > what) |
Subscribes a new observer to the items emitted by this observable. | |
disposable | subscribe (async::producer_resource< T > resource) |
Creates a new observer that pushes all observed items to the resource. | |
disposable | subscribe (ignore_t) |
Subscribes a new observer that discards all items it receives. | |
template<class OnNext > | |
disposable | for_each (OnNext on_next) |
Calls on_next for each item emitted by this observable. | |
template<class OnNext , class OnError > | |
disposable | for_each (OnNext on_next, OnError on_error) |
Calls on_next for each item emitted by this observable and on_error in case of an error. | |
template<class Step > | |
transformation< Step > | transform (Step step) |
Returns a transformation that applies a step function to each input. | |
template<class U = T> | |
transformation< step::distinct< U > > | distinct () |
Makes all values unique by suppressing items that have been emitted in the past. | |
template<class F > | |
transformation< step::do_finally< T, F > > | do_finally (F f) |
Registers a callback for on_complete and on_error events. | |
template<class F > | |
transformation< step::do_on_complete< T, F > > | do_on_complete (F f) |
Registers a callback for on_complete events. | |
template<class F > | |
transformation< step::do_on_error< T, F > > | do_on_error (F f) |
Registers a callback for on_error events. | |
template<class F > | |
transformation< step::do_on_next< F > > | do_on_next (F f) |
Registers a callback for on_next events. | |
template<class Predicate > | |
transformation< step::filter< Predicate > > | filter (Predicate prediate) |
Returns a transformation that selects only items that satisfy predicate . | |
transformation< step::ignore_elements< T > > | ignore_elements () |
Returns a transformation that ignores all items and only forwards calls to on_complete and on_error . | |
template<class F > | |
transformation< step::map< F > > | map (F f) |
Returns a transformation that applies f to each input and emits the result of the function application. | |
observable< T > | on_backpressure_buffer (size_t buffer_size, backpressure_overflow_strategy strategy=backpressure_overflow_strategy::fail) |
When producing items faster than the consumer can consume them, the observable will buffer up to buffer_size items before raising an error. | |
transformation< step::on_error_complete< T > > | on_error_complete () |
Recovers from errors by converting on_error to on_complete events. | |
template<class ErrorHandler > | |
transformation< step::on_error_return< ErrorHandler > > | on_error_return (ErrorHandler error_handler) |
Recovers from errors by returning an item. | |
transformation< step::on_error_return_item< T > > | on_error_return_item (T item) |
Recovers from errors by returning an item. | |
template<class Init , class Reducer > | |
transformation< step::reduce< Reducer > > | reduce (Init init, Reducer reducer) |
Reduces the entire sequence of items to a single value. | |
template<class Init , class Scanner > | |
transformation< step::scan< Scanner > > | scan (Init init, Scanner scanner) |
Applies a function to a sequence of items, and emit each successive value. | |
transformation< step::skip< T > > | skip (size_t n) |
Returns a transformation that selects all but the first n items. | |
transformation< step::element_at< T > > | element_at (size_t n) |
Returns a transformation that selects only the item at index n . | |
transformation< step::skip_last< T > > | skip_last (size_t n) |
Returns a transformation that discards only the last n items. | |
transformation< step::take< T > > | take (size_t n) |
Returns a transformation that selects only the first n items. | |
transformation< step::take< T > > | first () |
Returns a transformation that selects only the first item. | |
transformation< step::take_last< T > > | take_last (size_t n) |
Returns a transformation that selects only the last n items. | |
transformation< step::take_last< T > > | last () |
Returns a transformation that selects only the last item. | |
template<class Predicate > | |
transformation< step::take_while< Predicate > > | take_while (Predicate prediate) |
Returns a transformation that selects all value until the predicate returns false. | |
auto | sum () |
Accumulates all values and emits only the final result. | |
template<class Input > | |
auto | start_with (Input value) |
Adds a value or observable to the beginning of current observable. | |
auto | to_vector () |
Collects all values and emits all values at once in a cow_vector. | |
observable< cow_vector< T > > | buffer (size_t count) |
Emits items in buffers of size count . | |
observable< cow_vector< T > > | buffer (size_t count, timespan period) |
Emits items in buffers of size up to count and forces an item at regular intervals . | |
observable< T > | sample (timespan period) |
Emits the most recent item of the input observable once per interval. | |
template<class Out = output_type, class... Inputs> | |
auto | merge (Inputs &&... xs) |
Combines the output of multiple observable objects into one by merging their outputs. | |
template<class Out = output_type, class... Inputs> | |
auto | concat (Inputs &&...) |
Combines the output of multiple observable objects into one by concatenating their outputs. | |
template<class Out = output_type, class F > | |
auto | flat_map (F f) |
Returns a transformation that emits items by merging the outputs of all observables returned by f . | |
template<class Out = output_type, class F > | |
auto | concat_map (F f) |
Returns a transformation that emits items by concatenating the outputs of all observables returned by f . | |
template<class F , class T0 , class... Ts> | |
auto | zip_with (F fn, T0 input0, Ts... inputs) |
Creates an observable that combines the emitted from all passed source observables by applying a function object. | |
observable< cow_tuple< cow_vector< T >, observable< T > > > | prefix_and_tail (size_t prefix_size) |
Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value. | |
observable< cow_tuple< T, observable< T > > > | head_and_tail () |
Similar to prefix_and_tail(1) but passes the single element directly in the tuple instead of wrapping it in a list. | |
connectable< T > | publish () |
Convert this observable into a connectable observable. | |
observable< T > | share (size_t subscriber_threshold=1) |
Convenience alias for publish().ref_count(subscriber_threshold) . | |
template<class Fn > | |
auto | compose (Fn &&fn) & |
Transforms this observable by applying a function object to it. | |
template<class Fn > | |
auto | compose (Fn &&fn) && |
Transforms this observable by applying a function object to it. | |
observable< async::batch > | collect_batches (timespan max_delay, size_t max_items) |
Like buffer , but wraps the collected items into type-erased batches. | |
observable | observe_on (coordinator *other, size_t buffer_size, size_t min_request_size) |
Observes items from this observable on another coordinator. | |
observable | observe_on (coordinator *other) |
Observes items from this observable on another coordinator. | |
async::consumer_resource< T > | to_resource (size_t buffer_size, size_t min_request_size) |
Creates an asynchronous resource that makes emitted items available in an SPSC buffer. | |
async::consumer_resource< T > | to_resource () |
Creates an asynchronous resource that makes emitted items available in an SPSC buffer. | |
async::publisher< T > | to_publisher () |
Creates a publisher that makes emitted items available asynchronously. | |
template<class U = T> | |
stream | to_stream (cow_string name, timespan max_delay, size_t max_items_per_batch) |
Creates a type-erased stream that makes emitted items available in batches. | |
template<class U = T> | |
stream | to_stream (std::string name, timespan max_delay, size_t max_items_per_batch) |
Creates a type-erased stream that makes emitted items available in batches. | |
template<class U = T> | |
typed_stream< U > | to_typed_stream (cow_string name, timespan max_delay, size_t max_items_per_batch) |
Creates a stream that makes emitted items available in batches. | |
template<class U = T> | |
typed_stream< U > | to_typed_stream (std::string name, timespan max_delay, size_t max_items_per_batch) |
Creates a stream that makes emitted items available in batches. | |
const observable & | as_observable () const &noexcept |
observable && | as_observable () &&noexcept |
const pimpl_type & | pimpl () const &noexcept |
pimpl_type | pimpl () &&noexcept |
bool | valid () const noexcept |
operator bool () const noexcept | |
bool | operator! () const noexcept |
coordinator * | parent () const |
void | swap (observable &other) |
template<class Out , class... Inputs> | |
auto | concat (Inputs &&... xs) |
Represents a potentially unbound sequence of values.
auto caf::flow::observable< T >::concat | ( | Inputs && | ... | ) |
Combines the output of multiple observable objects into one by concatenating their outputs.
May also be called without arguments if the T
is an observable.
auto caf::flow::observable< T >::merge | ( | Inputs &&... | xs | ) |
Combines the output of multiple observable objects into one by merging their outputs.
May also be called without arguments if the T
is an observable.
observable caf::flow::observable< T >::observe_on | ( | coordinator * | other | ) |
Observes items from this observable on another coordinator.
other
coordinator must not run at this point. observable< T > caf::flow::observable< T >::observe_on | ( | coordinator * | other, |
size_t | buffer_size, | ||
size_t | min_request_size ) |
Observes items from this observable on another coordinator.
other
coordinator must not run at this point. coordinator * caf::flow::observable< T >::parent | ( | ) | const |
valid()
observable< cow_tuple< cow_vector< T >, observable< T > > > caf::flow::observable< T >::prefix_and_tail | ( | size_t | prefix_size | ) |
Takes prefix_size
elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.
The returned observable either emits a single element (the tuple) or none if this observable never produces sufficient elements for the prefix.
prefix_size > 0
transformation< step::reduce< Reducer > > caf::flow::observable< T >::reduce | ( | Init | init, |
Reducer | reducer ) |
Reduces the entire sequence of items to a single value.
Other names for the algorithm are accumulate
and fold
.
init | The initial value for the reduction. |
reducer | Binary operation function that will be applied. |
transformation< step::scan< Scanner > > caf::flow::observable< T >::scan | ( | Init | init, |
Scanner | scanner ) |
Applies a function to a sequence of items, and emit each successive value.
Other name for the algorithm is accumulator
.
init | The initial value for the reduction. |
scanner | Binary operation function that will be applied. |
stream caf::flow::observable< T >::to_stream | ( | cow_string | name, |
timespan | max_delay, | ||
size_t | max_items_per_batch ) |
Creates a type-erased stream that makes emitted items available in batches.
Requires that this observable runs on an actor, otherwise returns an invalid stream.
name | The human-readable name for this stream. |
max_delay | The maximum delay between emitting two batches. |
max_items_per_batch | The maximum amount of items per batch. |
stream caf::flow::observable< T >::to_stream | ( | std::string | name, |
timespan | max_delay, | ||
size_t | max_items_per_batch ) |
Creates a type-erased stream that makes emitted items available in batches.
Requires that this observable runs on an actor, otherwise returns an invalid stream.
name | The human-readable name for this stream. |
max_delay | The maximum delay between emitting two batches. |
max_items_per_batch | The maximum amount of items per batch. |
typed_stream< U > caf::flow::observable< T >::to_typed_stream | ( | cow_string | name, |
timespan | max_delay, | ||
size_t | max_items_per_batch ) |
Creates a stream that makes emitted items available in batches.
Requires that this observable runs on an actor, otherwise returns an invalid stream.
name | The human-readable name for this stream. |
max_delay | The maximum delay between emitting two batches. |
max_items_per_batch | The maximum amount of items per batch. |
typed_stream< U > caf::flow::observable< T >::to_typed_stream | ( | std::string | name, |
timespan | max_delay, | ||
size_t | max_items_per_batch ) |
Creates a stream that makes emitted items available in batches.
Requires that this observable runs on an actor, otherwise returns an invalid stream.
name | The human-readable name for this stream. |
max_delay | The maximum delay between emitting two batches. |
max_items_per_batch | The maximum amount of items per batch. |
auto caf::flow::observable< T >::zip_with | ( | F | fn, |
T0 | input0, | ||
Ts... | inputs ) |
Creates an observable that combines the emitted from all passed source observables by applying a function object.
fn | The zip function. Takes one element from each input at a time and reduces them into a single result. |
input0 | The first additional input. |
inputs | Additional inputs, if any. |