C++ Actor Framework 1.0.0
Loading...
Searching...
No Matches
caf::flow::observable< T > Class Template Reference

Represents a potentially unbound sequence of values. More...

#include <observable_decl.hpp>

Public Types

using output_type = T
 The type of emitted items.
 
using pimpl_type = intrusive_ptr<op::base<T>>
 The pointer-to-implementation type.
 
using ignore_t = decltype(std::ignore)
 Type for drop-all subscribers.
 

Public Member Functions

 observable (pimpl_type pimpl) noexcept
 
observableoperator= (std::nullptr_t) noexcept
 
template<class Operator >
std::enable_if_t< std::is_base_of_v< op::base< T >, Operator >, observable & > operator= (intrusive_ptr< Operator > ptr) noexcept
 
 observable (observable &&) noexcept=default
 
 observable (const observable &) noexcept=default
 
observableoperator= (observable &&) noexcept=default
 
observableoperator= (const observable &) noexcept=default
 
disposable subscribe (observer< T > what)
 Subscribes a new observer to the items emitted by this observable.
 
disposable subscribe (async::producer_resource< T > resource)
 Creates a new observer that pushes all observed items to the resource.
 
disposable subscribe (ignore_t)
 Subscribes a new observer that discards all items it receives.
 
template<class OnNext >
disposable for_each (OnNext on_next)
 Calls on_next for each item emitted by this observable.
 
template<class OnNext , class OnError >
disposable for_each (OnNext on_next, OnError on_error)
 Calls on_next for each item emitted by this observable and on_error in case of an error.
 
template<class Step >
transformation< Step > transform (Step step)
 Returns a transformation that applies a step function to each input.
 
template<class U = T>
transformation< step::distinct< U > > distinct ()
 Makes all values unique by suppressing items that have been emitted in the past.
 
template<class F >
transformation< step::do_finally< T, F > > do_finally (F f)
 Registers a callback for on_complete and on_error events.
 
template<class F >
transformation< step::do_on_complete< T, F > > do_on_complete (F f)
 Registers a callback for on_complete events.
 
template<class F >
transformation< step::do_on_error< T, F > > do_on_error (F f)
 Registers a callback for on_error events.
 
template<class F >
transformation< step::do_on_next< F > > do_on_next (F f)
 Registers a callback for on_next events.
 
template<class Predicate >
transformation< step::filter< Predicate > > filter (Predicate prediate)
 Returns a transformation that selects only items that satisfy predicate.
 
transformation< step::ignore_elements< T > > ignore_elements ()
 Returns a transformation that ignores all items and only forwards calls to on_complete and on_error.
 
template<class F >
transformation< step::map< F > > map (F f)
 Returns a transformation that applies f to each input and emits the result of the function application.
 
observable< T > on_backpressure_buffer (size_t buffer_size, backpressure_overflow_strategy strategy=backpressure_overflow_strategy::fail)
 When producing items faster than the consumer can consume them, the observable will buffer up to buffer_size items before raising an error.
 
transformation< step::on_error_complete< T > > on_error_complete ()
 Recovers from errors by converting on_error to on_complete events.
 
template<class ErrorHandler >
transformation< step::on_error_return< ErrorHandler > > on_error_return (ErrorHandler error_handler)
 Recovers from errors by returning an item.
 
transformation< step::on_error_return_item< T > > on_error_return_item (T item)
 Recovers from errors by returning an item.
 
template<class Init , class Reducer >
transformation< step::reduce< Reducer > > reduce (Init init, Reducer reducer)
 Reduces the entire sequence of items to a single value.
 
template<class Init , class Scanner >
transformation< step::scan< Scanner > > scan (Init init, Scanner scanner)
 Applies a function to a sequence of items, and emit each successive value.
 
transformation< step::skip< T > > skip (size_t n)
 Returns a transformation that selects all but the first n items.
 
transformation< step::element_at< T > > element_at (size_t n)
 Returns a transformation that selects only the item at index n.
 
transformation< step::skip_last< T > > skip_last (size_t n)
 Returns a transformation that discards only the last n items.
 
transformation< step::take< T > > take (size_t n)
 Returns a transformation that selects only the first n items.
 
transformation< step::take< T > > first ()
 Returns a transformation that selects only the first item.
 
transformation< step::take_last< T > > take_last (size_t n)
 Returns a transformation that selects only the last n items.
 
transformation< step::take_last< T > > last ()
 Returns a transformation that selects only the last item.
 
template<class Predicate >
transformation< step::take_while< Predicate > > take_while (Predicate prediate)
 Returns a transformation that selects all value until the predicate returns false.
 
auto sum ()
 Accumulates all values and emits only the final result.
 
template<class Input >
auto start_with (Input value)
 Adds a value or observable to the beginning of current observable.
 
auto to_vector ()
 Collects all values and emits all values at once in a cow_vector.
 
observable< cow_vector< T > > buffer (size_t count)
 Emits items in buffers of size count.
 
observable< cow_vector< T > > buffer (size_t count, timespan period)
 Emits items in buffers of size up to count and forces an item at regular intervals .
 
observable< T > sample (timespan period)
 Emits the most recent item of the input observable once per interval.
 
template<class Out = output_type, class... Inputs>
auto merge (Inputs &&... xs)
 Combines the output of multiple observable objects into one by merging their outputs.
 
template<class Out = output_type, class... Inputs>
auto concat (Inputs &&...)
 Combines the output of multiple observable objects into one by concatenating their outputs.
 
template<class Out = output_type, class F >
auto flat_map (F f)
 Returns a transformation that emits items by merging the outputs of all observables returned by f.
 
template<class Out = output_type, class F >
auto concat_map (F f)
 Returns a transformation that emits items by concatenating the outputs of all observables returned by f.
 
template<class F , class T0 , class... Ts>
auto zip_with (F fn, T0 input0, Ts... inputs)
 Creates an observable that combines the emitted from all passed source observables by applying a function object.
 
observable< cow_tuple< cow_vector< T >, observable< T > > > prefix_and_tail (size_t prefix_size)
 Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.
 
observable< cow_tuple< T, observable< T > > > head_and_tail ()
 Similar to prefix_and_tail(1) but passes the single element directly in the tuple instead of wrapping it in a list.
 
connectable< T > publish ()
 Convert this observable into a connectable observable.
 
observable< T > share (size_t subscriber_threshold=1)
 Convenience alias for publish().ref_count(subscriber_threshold).
 
template<class Fn >
auto compose (Fn &&fn) &
 Transforms this observable by applying a function object to it.
 
template<class Fn >
auto compose (Fn &&fn) &&
 Transforms this observable by applying a function object to it.
 
observable< async::batchcollect_batches (timespan max_delay, size_t max_items)
 Like buffer, but wraps the collected items into type-erased batches.
 
observable observe_on (coordinator *other, size_t buffer_size, size_t min_request_size)
 Observes items from this observable on another coordinator.
 
observable observe_on (coordinator *other)
 Observes items from this observable on another coordinator.
 
async::consumer_resource< T > to_resource (size_t buffer_size, size_t min_request_size)
 Creates an asynchronous resource that makes emitted items available in an SPSC buffer.
 
async::consumer_resource< T > to_resource ()
 Creates an asynchronous resource that makes emitted items available in an SPSC buffer.
 
async::publisher< T > to_publisher ()
 Creates a publisher that makes emitted items available asynchronously.
 
template<class U = T>
stream to_stream (cow_string name, timespan max_delay, size_t max_items_per_batch)
 Creates a type-erased stream that makes emitted items available in batches.
 
template<class U = T>
stream to_stream (std::string name, timespan max_delay, size_t max_items_per_batch)
 Creates a type-erased stream that makes emitted items available in batches.
 
template<class U = T>
typed_stream< U > to_typed_stream (cow_string name, timespan max_delay, size_t max_items_per_batch)
 Creates a stream that makes emitted items available in batches.
 
template<class U = T>
typed_stream< U > to_typed_stream (std::string name, timespan max_delay, size_t max_items_per_batch)
 Creates a stream that makes emitted items available in batches.
 
const observableas_observable () const &noexcept
 
observable && as_observable () &&noexcept
 
const pimpl_typepimpl () const &noexcept
 
pimpl_type pimpl () &&noexcept
 
bool valid () const noexcept
 
 operator bool () const noexcept
 
bool operator! () const noexcept
 
coordinatorparent () const
 
void swap (observable &other)
 
template<class Out , class... Inputs>
auto concat (Inputs &&... xs)
 

Detailed Description

template<class T>
class caf::flow::observable< T >

Represents a potentially unbound sequence of values.

Member Function Documentation

◆ concat()

template<class T >
template<class Out = output_type, class... Inputs>
auto caf::flow::observable< T >::concat ( Inputs && ...)

Combines the output of multiple observable objects into one by concatenating their outputs.

May also be called without arguments if the T is an observable.

◆ merge()

template<class T >
template<class Out , class... Inputs>
auto caf::flow::observable< T >::merge ( Inputs &&... xs)

Combines the output of multiple observable objects into one by merging their outputs.

May also be called without arguments if the T is an observable.

◆ observe_on() [1/2]

template<class T >
observable caf::flow::observable< T >::observe_on ( coordinator * other)

Observes items from this observable on another coordinator.

Warning
The other coordinator must not run at this point.

◆ observe_on() [2/2]

template<class T >
observable< T > caf::flow::observable< T >::observe_on ( coordinator * other,
size_t buffer_size,
size_t min_request_size )

Observes items from this observable on another coordinator.

Warning
The other coordinator must not run at this point.

◆ parent()

template<class T >
coordinator * caf::flow::observable< T >::parent ( ) const
Precondition
valid()

◆ prefix_and_tail()

template<class T >
observable< cow_tuple< cow_vector< T >, observable< T > > > caf::flow::observable< T >::prefix_and_tail ( size_t prefix_size)

Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.

The returned observable either emits a single element (the tuple) or none if this observable never produces sufficient elements for the prefix.

Precondition
prefix_size > 0

◆ reduce()

template<class T >
template<class Init , class Reducer >
transformation< step::reduce< Reducer > > caf::flow::observable< T >::reduce ( Init init,
Reducer reducer )

Reduces the entire sequence of items to a single value.

Other names for the algorithm are accumulate and fold.

Parameters
initThe initial value for the reduction.
reducerBinary operation function that will be applied.

◆ scan()

template<class T >
template<class Init , class Scanner >
transformation< step::scan< Scanner > > caf::flow::observable< T >::scan ( Init init,
Scanner scanner )

Applies a function to a sequence of items, and emit each successive value.

Other name for the algorithm is accumulator.

Parameters
initThe initial value for the reduction.
scannerBinary operation function that will be applied.

◆ to_stream() [1/2]

template<class T >
template<class U >
stream caf::flow::observable< T >::to_stream ( cow_string name,
timespan max_delay,
size_t max_items_per_batch )

Creates a type-erased stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ to_stream() [2/2]

template<class T >
template<class U >
stream caf::flow::observable< T >::to_stream ( std::string name,
timespan max_delay,
size_t max_items_per_batch )

Creates a type-erased stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ to_typed_stream() [1/2]

template<class T >
template<class U >
typed_stream< U > caf::flow::observable< T >::to_typed_stream ( cow_string name,
timespan max_delay,
size_t max_items_per_batch )

Creates a stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a typed_stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ to_typed_stream() [2/2]

template<class T >
template<class U >
typed_stream< U > caf::flow::observable< T >::to_typed_stream ( std::string name,
timespan max_delay,
size_t max_items_per_batch )

Creates a stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a typed_stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ zip_with()

template<class T >
template<class F , class T0 , class... Ts>
auto caf::flow::observable< T >::zip_with ( F fn,
T0 input0,
Ts... inputs )

Creates an observable that combines the emitted from all passed source observables by applying a function object.

Parameters
fnThe zip function. Takes one element from each input at a time and reduces them into a single result.
input0The first additional input.
inputsAdditional inputs, if any.

The documentation for this class was generated from the following files: