C++ Actor Framework 0.19
Loading...
Searching...
No Matches
Public Types | Public Member Functions | Related Functions | List of all members
caf::flow::observable< T > Class Template Reference

Represents a potentially unbound sequence of values. More...

#include <observable_decl.hpp>

Public Types

using output_type = T
 The type of emitted items.
 
using pimpl_type = intrusive_ptr< op::base< T > >
 The pointer-to-implementation type.
 
using ignore_t = decltype(std::ignore)
 Type for drop-all subscribers.
 

Public Member Functions

 observable (pimpl_type pimpl) noexcept
 
observableoperator= (std::nullptr_t) noexcept
 
 observable (observable &&) noexcept=default
 
 observable (const observable &) noexcept=default
 
observableoperator= (observable &&) noexcept=default
 
observableoperator= (const observable &) noexcept=default
 
disposable subscribe (observer< T > what)
 Subscribes a new observer to the items emitted by this observable.
 
disposable subscribe (async::producer_resource< T > resource)
 Creates a new observer that pushes all observed items to the resource.
 
disposable subscribe (ignore_t)
 Subscribes a new observer to the items emitted by this observable.
 
template<class OnNext >
disposable for_each (OnNext on_next)
 Calls on_next for each item emitted by this observable.
 
template<class Step >
transformation< Step > transform (Step step)
 Returns a transformation that applies a step function to each input.
 
template<class U = T>
transformation< step::distinct< U > > distinct ()
 Makes all values unique by suppressing items that have been emitted in the past.
 
template<class F >
transformation< step::do_finally< T, F > > do_finally (F f)
 Registers a callback for on_complete and on_error events.
 
template<class F >
transformation< step::do_on_complete< T, F > > do_on_complete (F f)
 Registers a callback for on_complete events.
 
template<class F >
transformation< step::do_on_error< T, F > > do_on_error (F f)
 Registers a callback for on_error events.
 
template<class F >
transformation< step::do_on_next< F > > do_on_next (F f)
 Registers a callback for on_next events.
 
template<class Predicate >
transformation< step::filter< Predicate > > filter (Predicate prediate)
 Returns a transformation that selects only items that satisfy predicate.
 
template<class F >
transformation< step::map< F > > map (F f)
 Returns a transformation that applies f to each input and emits the result of the function application.
 
transformation< step::on_error_complete< T > > on_error_complete ()
 Recovers from errors by converting on_error to on_complete events.
 
template<class Init , class Reducer >
transformation< step::reduce< Reducer > > reduce (Init init, Reducer reducer)
 Reduces the entire sequence of items to a single value.
 
transformation< step::skip< T > > skip (size_t n)
 Returns a transformation that selects all but the first n items.
 
transformation< step::take< T > > take (size_t n)
 Returns a transformation that selects only the first n items.
 
template<class Predicate >
transformation< step::take_while< Predicate > > take_while (Predicate prediate)
 Returns a transformation that selects all value until the predicate returns false.
 
auto sum ()
 Accumulates all values and emits only the final result.
 
auto to_vector ()
 Collects all values and emits all values at once in a cow_vector.
 
observable< cow_vector< T > > buffer (size_t count)
 Emits items in buffers of size count.
 
observable< cow_vector< T > > buffer (size_t count, timespan period)
 Emits items in buffers of size up to count and forces an item at regular intervals .
 
template<class Out = output_type, class... Inputs>
auto merge (Inputs &&... xs)
 Combines the output of multiple observable objects into one by merging their outputs.
 
template<class Out = output_type, class... Inputs>
auto concat (Inputs &&...)
 Combines the output of multiple observable objects into one by concatenating their outputs.
 
template<class Out = output_type, class F >
auto flat_map (F f)
 Returns a transformation that emits items by merging the outputs of all observables returned by f.
 
template<class Out = output_type, class F >
auto concat_map (F f)
 Returns a transformation that emits items by concatenating the outputs of all observables returned by f.
 
template<class F , class T0 , class... Ts>
auto zip_with (F fn, T0 input0, Ts... inputs)
 Creates an observable that combines the emitted from all passed source observables by applying a function object.
 
observable< cow_tuple< cow_vector< T >, observable< T > > > prefix_and_tail (size_t prefix_size)
 Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.
 
observable< cow_tuple< T, observable< T > > > head_and_tail ()
 Similar to prefix_and_tail(1) but passes the single element directly in the tuple instead of wrapping it in a list.
 
connectable< T > publish ()
 Convert this observable into a connectable observable.
 
observable< T > share (size_t subscriber_threshold=1)
 Convenience alias for publish().ref_count(subscriber_threshold).
 
template<class Fn >
auto compose (Fn &&fn) &
 Transforms this observable by applying a function object to it.
 
template<class Fn >
auto compose (Fn &&fn) &&
 Transforms this observable by applying a function object to it.
 
observable< async::batchcollect_batches (timespan max_delay, size_t max_items)
 Like buffer, but wraps the collected items into type-erased batches.
 
observable observe_on (coordinator *other, size_t buffer_size, size_t min_request_size)
 Observes items from this observable on another coordinator.
 
observable observe_on (coordinator *other)
 Observes items from this observable on another coordinator.
 
async::consumer_resource< T > to_resource (size_t buffer_size, size_t min_request_size)
 Creates an asynchronous resource that makes emitted items available in an SPSC buffer.
 
async::consumer_resource< T > to_resource ()
 Creates an asynchronous resource that makes emitted items available in an SPSC buffer.
 
const observableas_observable () const &noexcept
 
observable && as_observable () &&noexcept
 
const pimpl_typepimpl () const &noexcept
 
pimpl_type pimpl () &&noexcept
 
bool valid () const noexcept
 
 operator bool () const noexcept
 
bool operator! () const noexcept
 
coordinatorctx () const
 
void swap (observable &other)
 
template<class Out , class... Inputs>
auto concat (Inputs &&... xs)
 

Related Functions

(Note that these are not member functions.)

template<class Operator , class CoordinatorType , class... Ts>
observable< typename Operator::output_type > make_observable (CoordinatorType *ctx, Ts &&... xs)
 Convenience function for creating an observable from a concrete operator type.
 

Detailed Description

template<class T>
class caf::flow::observable< T >

Represents a potentially unbound sequence of values.

Member Function Documentation

◆ concat()

template<class T >
template<class Out = output_type, class... Inputs>
auto caf::flow::observable< T >::concat ( Inputs &&  ...)

Combines the output of multiple observable objects into one by concatenating their outputs.

May also be called without arguments if the T is an observable.

◆ ctx()

template<class T >
coordinator * caf::flow::observable< T >::ctx ( ) const
Precondition
valid()

◆ merge()

template<class T >
template<class Out , class... Inputs>
auto caf::flow::observable< T >::merge ( Inputs &&...  xs)

Combines the output of multiple observable objects into one by merging their outputs.

May also be called without arguments if the T is an observable.

◆ observe_on() [1/2]

template<class T >
observable caf::flow::observable< T >::observe_on ( coordinator other)

Observes items from this observable on another coordinator.

Warning
The other coordinator must not run at this point.

◆ observe_on() [2/2]

template<class T >
observable< T > caf::flow::observable< T >::observe_on ( coordinator other,
size_t  buffer_size,
size_t  min_request_size 
)

Observes items from this observable on another coordinator.

Warning
The other coordinator must not run at this point.

◆ prefix_and_tail()

template<class T >
observable< cow_tuple< cow_vector< T >, observable< T > > > caf::flow::observable< T >::prefix_and_tail ( size_t  prefix_size)

Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.

The returned observable either emits a single element (the tuple) or none if this observable never produces sufficient elements for the prefix.

Precondition
prefix_size > 0

◆ reduce()

template<class T >
template<class Init , class Reducer >
transformation< step::reduce< Reducer > > caf::flow::observable< T >::reduce ( Init  init,
Reducer  reducer 
)

Reduces the entire sequence of items to a single value.

Other names for the algorithm are accumulate and fold.

Parameters
initThe initial value for the reduction.
reducerBinary operation function that will be applied.

◆ zip_with()

template<class T >
template<class F , class T0 , class... Ts>
auto caf::flow::observable< T >::zip_with ( fn,
T0  input0,
Ts...  inputs 
)

Creates an observable that combines the emitted from all passed source observables by applying a function object.

Parameters
fnThe zip function. Takes one element from each input at a time and reduces them into a single result.
input0The first additional input.
inputsAdditional inputs, if any.

The documentation for this class was generated from the following files: