C++ Actor Framework 1.0.0
Loading...
Searching...
No Matches
caf::flow::observable_def< Materializer, Steps > Class Template Reference

Captures the definition of an observable that has not materialized yet. More...

#include <observable.hpp>

Public Types

using output_type = output_type_t<Materializer, Steps...>
 

Public Member Functions

 observable_def (const observable_def &)=delete
 
observable_defoperator= (const observable_def &)=delete
 
 observable_def (observable_def &&)=default
 
observable_defoperator= (observable_def &&)=default
 
template<size_t N = sizeof...(Steps), class = std::enable_if_t<N == 0>>
 observable_def (Materializer &&materializer)
 
 observable_def (Materializer &&materializer, std::tuple< Steps... > &&steps)
 
template<class NewStep >
observable_def< Materializer, Steps..., NewStep > transform (NewStep step) &&
 Returns a transformation that applies a step function to each input.
 
template<class Fn >
auto compose (Fn &&fn) &&
 Transforms this observable by applying a function object to it.
 
auto element_at (size_t n) &&
 Returns a transformation that selects only the item at index n.
 
auto ignore_elements () &&
 Returns a transformation that ignores all items and only forwards calls to on_complete and on_error.
 
auto skip (size_t n) &&
 Returns a transformation that selects all but the first n items.
 
auto skip_last (size_t n) &&
 Returns a transformation that discards only the last n items.
 
auto take (size_t n) &&
 Returns a transformation that selects only the first n items.
 
auto first () &&
 Returns a transformation that selects only the first item.
 
auto take_last (size_t n) &&
 Returns a transformation that selects only the last n items.
 
auto last () &&
 Returns a transformation that selects only the last item.
 
auto buffer (size_t count) &&
 Emits items in buffers of size count.
 
auto buffer (size_t count, timespan period)
 
auto sample (timespan period)
 Emits the most recent item of the input observable once per interval.
 
template<class Predicate >
auto filter (Predicate predicate) &&
 
template<class Predicate >
auto take_while (Predicate predicate) &&
 
template<class Init , class Reducer >
auto reduce (Init init, Reducer reducer) &&
 
template<class Init , class Scanner >
auto scan (Init init, Scanner scanner) &&
 
auto sum () &&
 
auto to_vector () &&
 
auto distinct () &&
 
template<class F >
auto map (F f) &&
 
template<class F >
auto do_on_next (F f) &&
 
template<class F >
auto do_on_complete (F f) &&
 
template<class F >
auto do_on_error (F f) &&
 
template<class F >
auto do_finally (F f) &&
 
auto on_backpressure_buffer (size_t buffer_size, backpressure_overflow_strategy strategy=backpressure_overflow_strategy::fail)
 When producing items faster than the consumer can consume them, the observable will buffer up to buffer_size items before raising an error.
 
auto on_error_complete ()
 
auto on_error_return_item (output_type item)
 
template<class ErrorHandler >
auto on_error_return (ErrorHandler error_handler) &&
 
observable< output_type > as_observable () &&
 Materializes the observable.
 
template<class OnNext >
auto for_each (OnNext on_next) &&
 Calls on_next for each item emitted by this observable.
 
template<class OnNext , class OnError >
auto for_each (OnNext on_next, OnError on_error) &&
 Calls on_next for each item emitted by this observable.
 
template<class... Inputs>
auto merge (Inputs &&... xs) &&
 Combines the output of multiple observable objects into one by merging their outputs.
 
template<class... Inputs>
auto concat (Inputs &&... xs) &&
 Combines the output of multiple observable objects into one by concatenating their outputs.
 
template<class Input >
auto start_with (Input &&value) &&
 Adds a value or observable to the beginning of current observable.
 
template<class F >
auto flat_map (F f) &&
 Returns a transformation that emits items by merging the outputs of all observables returned by f.
 
template<class F >
auto concat_map (F f) &&
 Returns a transformation that emits items by concatenating the outputs of all observables returned by f.
 
template<class F , class T0 , class... Ts>
auto zip_with (F fn, T0 input0, Ts... inputs)
 Creates an observable that combines the emitted from all passed source observables by applying a function object.
 
auto publish () &&
 Convert this observable into a connectable observable.
 
auto share (size_t subscriber_threshold=1) &&
 Convenience alias for publish().ref_count(subscriber_threshold).
 
observable< cow_tuple< cow_vector< output_type >, observable< output_type > > > prefix_and_tail (size_t prefix_size) &&
 Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.
 
observable< cow_tuple< output_type, observable< output_type > > > head_and_tail () &&
 Similar to prefix_and_tail(1) but passes the single element directly in the tuple instead of wrapping it in a list.
 
template<class Out >
disposable subscribe (Out &&out) &&
 Subscribes a new observer to the items emitted by this observable.
 
async::consumer_resource< output_type > to_resource () &&
 Creates an asynchronous resource that makes emitted items available in an SPSC buffer.
 
async::consumer_resource< output_type > to_resource (size_t buffer_size, size_t min_request_size) &&
 Creates an asynchronous resource that makes emitted items available in an SPSC buffer.
 
async::publisher< output_type > to_publisher () &&
 Creates a publisher that makes emitted items available asynchronously.
 
template<class U = output_type>
stream to_stream (cow_string name, timespan max_delay, size_t max_items_per_batch) &&
 Creates a type-erased stream that makes emitted items available in batches.
 
template<class U = output_type>
stream to_stream (std::string name, timespan max_delay, size_t max_items_per_batch) &&
 Creates a type-erased stream that makes emitted items available in batches.
 
template<class U = output_type>
auto to_typed_stream (cow_string name, timespan max_delay, size_t max_items_per_batch) &&
 Creates a stream that makes emitted items available in batches.
 
template<class U = output_type>
auto to_typed_stream (std::string name, timespan max_delay, size_t max_items_per_batch) &&
 Creates a stream that makes emitted items available in batches.
 
observable< output_type > observe_on (coordinator *other) &&
 Observes items from this observable on another coordinator.
 
observable< output_type > observe_on (coordinator *other, size_t buffer_size, size_t min_request_size) &&
 Observes items from this observable on another coordinator.
 
bool valid () const noexcept
 

Detailed Description

template<class Materializer, class... Steps>
class caf::flow::observable_def< Materializer, Steps >

Captures the definition of an observable that has not materialized yet.

Member Function Documentation

◆ buffer()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::buffer ( size_t count) &&

Emits items in buffers of size count.

◆ compose()

template<class Materializer , class... Steps>
template<class Fn >
auto caf::flow::observable_def< Materializer, Steps >::compose ( Fn && fn) &&

Transforms this observable by applying a function object to it.

◆ concat()

template<class Materializer , class... Steps>
template<class... Inputs>
auto caf::flow::observable_def< Materializer, Steps >::concat ( Inputs &&... xs) &&

Combines the output of multiple observable objects into one by concatenating their outputs.

May also be called without arguments if the T is an observable.

◆ concat_map()

template<class Materializer , class... Steps>
template<class F >
auto caf::flow::observable_def< Materializer, Steps >::concat_map ( F f) &&

Returns a transformation that emits items by concatenating the outputs of all observables returned by f.

◆ element_at()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::element_at ( size_t n) &&

Returns a transformation that selects only the item at index n.

◆ first()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::first ( ) &&

Returns a transformation that selects only the first item.

◆ flat_map()

template<class Materializer , class... Steps>
template<class F >
auto caf::flow::observable_def< Materializer, Steps >::flat_map ( F f) &&

Returns a transformation that emits items by merging the outputs of all observables returned by f.

◆ for_each() [1/2]

template<class Materializer , class... Steps>
template<class OnNext >
auto caf::flow::observable_def< Materializer, Steps >::for_each ( OnNext on_next) &&

Calls on_next for each item emitted by this observable.

◆ for_each() [2/2]

template<class Materializer , class... Steps>
template<class OnNext , class OnError >
auto caf::flow::observable_def< Materializer, Steps >::for_each ( OnNext on_next,
OnError on_error ) &&

Calls on_next for each item emitted by this observable.

◆ head_and_tail()

template<class Materializer , class... Steps>
observable< cow_tuple< output_type, observable< output_type > > > caf::flow::observable_def< Materializer, Steps >::head_and_tail ( ) &&

Similar to prefix_and_tail(1) but passes the single element directly in the tuple instead of wrapping it in a list.

◆ ignore_elements()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::ignore_elements ( ) &&

Returns a transformation that ignores all items and only forwards calls to on_complete and on_error.

◆ last()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::last ( ) &&

Returns a transformation that selects only the last item.

◆ merge()

template<class Materializer , class... Steps>
template<class... Inputs>
auto caf::flow::observable_def< Materializer, Steps >::merge ( Inputs &&... xs) &&

Combines the output of multiple observable objects into one by merging their outputs.

May also be called without arguments if the T is an observable.

◆ observe_on() [1/2]

template<class Materializer , class... Steps>
observable< output_type > caf::flow::observable_def< Materializer, Steps >::observe_on ( coordinator * other) &&

Observes items from this observable on another coordinator.

Warning
The other coordinator must not run at this point.

◆ observe_on() [2/2]

template<class Materializer , class... Steps>
observable< output_type > caf::flow::observable_def< Materializer, Steps >::observe_on ( coordinator * other,
size_t buffer_size,
size_t min_request_size ) &&

Observes items from this observable on another coordinator.

Warning
The other coordinator must not run at this point.

◆ on_backpressure_buffer()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::on_backpressure_buffer ( size_t buffer_size,
backpressure_overflow_strategy strategy = backpressure_overflow_strategy::fail )

When producing items faster than the consumer can consume them, the observable will buffer up to buffer_size items before raising an error.

◆ prefix_and_tail()

template<class Materializer , class... Steps>
observable< cow_tuple< cow_vector< output_type >, observable< output_type > > > caf::flow::observable_def< Materializer, Steps >::prefix_and_tail ( size_t prefix_size) &&

Takes prefix_size elements from this observable and emits it in a tuple containing an observable for the remaining elements as the second value.

The returned observable either emits a single element (the tuple) or none if this observable never produces sufficient elements for the prefix.

Precondition
prefix_size > 0

◆ publish()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::publish ( ) &&

Convert this observable into a connectable observable.

◆ sample()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::sample ( timespan period)

Emits the most recent item of the input observable once per interval.

◆ share()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::share ( size_t subscriber_threshold = 1) &&

Convenience alias for publish().ref_count(subscriber_threshold).

◆ skip()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::skip ( size_t n) &&

Returns a transformation that selects all but the first n items.

◆ skip_last()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::skip_last ( size_t n) &&

Returns a transformation that discards only the last n items.

◆ start_with()

template<class Materializer , class... Steps>
template<class Input >
auto caf::flow::observable_def< Materializer, Steps >::start_with ( Input && value) &&

Adds a value or observable to the beginning of current observable.

◆ subscribe()

template<class Materializer , class... Steps>
template<class Out >
disposable caf::flow::observable_def< Materializer, Steps >::subscribe ( Out && out) &&

Subscribes a new observer to the items emitted by this observable.

◆ take()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::take ( size_t n) &&

Returns a transformation that selects only the first n items.

◆ take_last()

template<class Materializer , class... Steps>
auto caf::flow::observable_def< Materializer, Steps >::take_last ( size_t n) &&

Returns a transformation that selects only the last n items.

◆ to_publisher()

template<class Materializer , class... Steps>
async::publisher< output_type > caf::flow::observable_def< Materializer, Steps >::to_publisher ( ) &&

Creates a publisher that makes emitted items available asynchronously.

◆ to_resource() [1/2]

template<class Materializer , class... Steps>
async::consumer_resource< output_type > caf::flow::observable_def< Materializer, Steps >::to_resource ( ) &&

Creates an asynchronous resource that makes emitted items available in an SPSC buffer.

◆ to_resource() [2/2]

template<class Materializer , class... Steps>
async::consumer_resource< output_type > caf::flow::observable_def< Materializer, Steps >::to_resource ( size_t buffer_size,
size_t min_request_size ) &&

Creates an asynchronous resource that makes emitted items available in an SPSC buffer.

◆ to_stream() [1/2]

template<class Materializer , class... Steps>
template<class U = output_type>
stream caf::flow::observable_def< Materializer, Steps >::to_stream ( cow_string name,
timespan max_delay,
size_t max_items_per_batch ) &&

Creates a type-erased stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ to_stream() [2/2]

template<class Materializer , class... Steps>
template<class U = output_type>
stream caf::flow::observable_def< Materializer, Steps >::to_stream ( std::string name,
timespan max_delay,
size_t max_items_per_batch ) &&

Creates a type-erased stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ to_typed_stream() [1/2]

template<class Materializer , class... Steps>
template<class U = output_type>
auto caf::flow::observable_def< Materializer, Steps >::to_typed_stream ( cow_string name,
timespan max_delay,
size_t max_items_per_batch ) &&

Creates a stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a typed_stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ to_typed_stream() [2/2]

template<class Materializer , class... Steps>
template<class U = output_type>
auto caf::flow::observable_def< Materializer, Steps >::to_typed_stream ( std::string name,
timespan max_delay,
size_t max_items_per_batch ) &&

Creates a stream that makes emitted items available in batches.

Requires that this observable runs on an actor, otherwise returns an invalid stream.

Parameters
nameThe human-readable name for this stream.
max_delayThe maximum delay between emitting two batches.
max_items_per_batchThe maximum amount of items per batch.
Returns
a typed_stream that makes this observable available to other actors or an invalid stream if this observable does not run on an actor.

◆ transform()

template<class Materializer , class... Steps>
template<class NewStep >
observable_def< Materializer, Steps..., NewStep > caf::flow::observable_def< Materializer, Steps >::transform ( NewStep step) &&

Returns a transformation that applies a step function to each input.

◆ zip_with()

template<class Materializer , class... Steps>
template<class F , class T0 , class... Ts>
auto caf::flow::observable_def< Materializer, Steps >::zip_with ( F fn,
T0 input0,
Ts... inputs )

Creates an observable that combines the emitted from all passed source observables by applying a function object.

Parameters
fnThe zip function. Takes one element from each input at a time and reduces them into a single result.
input0The first additional input.
inputsAdditional inputs, if any.

The documentation for this class was generated from the following files: