|
template<class... Ts> |
| stream_distribution_tree (scheduled_actor *selfptr, Ts &&... xs) |
|
Policy & | policy () |
|
const Policy & | policy () const |
|
void | handle (inbound_path *path, downstream_msg::batch &x) override |
|
void | handle (inbound_path *path, downstream_msg::close &x) override |
|
void | handle (inbound_path *path, downstream_msg::forced_close &x) override |
|
bool | handle (stream_slots slots, upstream_msg::ack_open &x) override |
|
void | handle (stream_slots slots, upstream_msg::drop &x) override |
|
void | handle (stream_slots slots, upstream_msg::forced_drop &x) override |
|
bool | done () const override |
| Returns whether the manager has reached the end and can be discarded safely.
|
|
bool | idle () const noexcept override |
| Returns whether the manager cannot make any progress on its own at the moment.
|
|
downstream_manager_type & | out () override |
| Returns the manager for downstream communication.
|
|
| stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal) |
|
virtual void | handle (inbound_path *from, downstream_msg::batch &x) |
|
virtual void | handle (inbound_path *from, downstream_msg::close &x) |
|
virtual void | handle (inbound_path *from, downstream_msg::forced_close &x) |
|
virtual bool | handle (stream_slots, upstream_msg::ack_open &x) |
|
virtual void | handle (stream_slots slots, upstream_msg::ack_batch &x) |
|
virtual void | handle (stream_slots slots, upstream_msg::drop &x) |
|
virtual void | handle (stream_slots slots, upstream_msg::forced_drop &x) |
|
virtual void | stop (error reason=none) |
| Closes all output and input paths and sends the final result to the client.
|
|
virtual void | shutdown () |
| Mark this stream as shutting down, only allowing flushing all related buffers of in- and outbound paths.
|
|
virtual void | push () |
| Pushes new data to downstream actors by sending batches.
|
|
virtual bool | congested (const inbound_path &path) const noexcept |
| Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own.
|
|
virtual void | deliver_handshake (response_promise &rp, stream_slot slot, message handshake) |
| Sends a handshake to dest .
|
|
virtual bool | generate_messages () |
| Tries to generate new messages for the stream.
|
|
virtual downstream_manager & | out ()=0 |
| Returns the manager for downstream communication.
|
|
const downstream_manager & | out () const |
| Returns the manager for downstream communication.
|
|
virtual bool | done () const =0 |
| Returns whether the manager has reached the end and can be discarded safely.
|
|
virtual bool | idle () const noexcept=0 |
| Returns whether the manager cannot make any progress on its own at the moment.
|
|
virtual void | cycle_timeout (size_t cycle_nr) |
| Advances time.
|
|
virtual int32_t | acquire_credit (inbound_path *path, int32_t desired) |
| Acquires credit on an inbound path.
|
|
virtual void | register_input_path (inbound_path *x) |
| Informs the manager that a new input path opens.
|
|
virtual void | deregister_input_path (inbound_path *x) noexcept |
| Informs the manager that an input path closes.
|
|
virtual void | remove_input_path (stream_slot slot, error reason, bool silent) |
| Removes an input path.
|
|
bool | running () const noexcept |
| Returns whether this stream is neither shutting down nor has stopped.
|
|
bool | continuous () const noexcept |
| Returns whether this stream remains open even if no in- or outbound paths exist.
|
|
void | continuous (bool x) noexcept |
| Sets whether this stream remains open even if no in- or outbound paths exist.
|
|
const inbound_paths_list & | inbound_paths () const noexcept |
| Returns the list of inbound paths.
|
|
inbound_path * | get_inbound_path (stream_slot x) const noexcept |
| Returns the inbound paths at slot x .
|
|
bool | inbound_paths_idle () const noexcept |
| Queries whether all inbound paths are up-to-date and have non-zero credit.
|
|
scheduled_actor * | self () noexcept |
| Returns the parent actor.
|
|
void | tick (time_point now) |
|
| ref_counted (const ref_counted &) |
|
ref_counted & | operator= (const ref_counted &) |
|
void | ref () const noexcept |
| Increases reference count by one.
|
|
void | deref () const noexcept |
| Decreases reference count by one and calls request_deletion when it drops to zero.
|
|
bool | unique () const noexcept |
| Queries whether there is exactly one reference.
|
|
size_t | get_reference_count () const noexcept |
|
|
static constexpr int | is_continuous_flag = 0x0001 |
| Configures whether this stream shall remain open even if no in- or outbound paths exist.
|
|
static constexpr int | is_shutting_down_flag = 0x0002 |
| Denotes whether the stream is about to stop, only sending buffered elements.
|
|
static constexpr int | is_stopped_flag = 0x0004 |
| Denotes whether the manager has stopped.
|
|
stream_slot | assign_next_slot () |
|
stream_slot | assign_next_pending_slot () |
|
virtual void | finalize (const error &reason) |
|
virtual void | input_closed (error reason) |
| Called when in().closed() changes to true .
|
|
virtual void | downstream_demand (outbound_path *ptr, long demand) |
| Called whenever new credit becomes available.
|
|
virtual void | output_closed (error reason) |
| Called when out().closed() changes to true .
|
|
stream_slot | add_unchecked_inbound_path_impl (type_id_t input_type, inbound_path_ptr path) |
| Adds the current sender as an inbound path.
|
|
scheduled_actor * | self_ |
| Points to the parent actor.
|
|
inbound_paths_list | inbound_paths_ |
| Stores non-owning pointers to all input paths.
|
|
long | pending_handshakes_ |
| Keeps track of pending handshakes.
|
|
stream_priority | priority_ |
| Configures the importance of outgoing traffic.
|
|
int | flags_ |
| Stores individual flags, for continuous streaming or when shutting down.
|
|
timespan | max_batch_delay_ |
| Stores the maximum amount of time outbound paths should buffer elements before sending underful batches.
|
|
std::atomic< size_t > | rc_ |
|
using | stream_manager_ptr = intrusive_ptr< stream_manager > |
| A reference counting pointer to a stream_manager .
|
|
template<class T , class... Ts> |
intrusive_cow_ptr< T > | make_copy_on_write (Ts &&... xs) |
| Constructs an object of type T in an intrusive_cow_ptr .
|
|
template<class T , class... Ts> |
intrusive_ptr< T > | make_counted (Ts &&... xs) |
| Constructs an object of type T in an intrusive_ptr .
|
|
void | intrusive_ptr_add_ref (const ref_counted *p) |
|
void | intrusive_ptr_release (const ref_counted *p) |
|
template<class Policy>
class caf::detail::stream_distribution_tree< Policy >
A stream distribution tree consist of peers forming an acyclic graph.
The user is responsible for making sure peers do not form a loop. Data is flooded along the tree. Each peer serves any number of subscribers. The policy of the tree enables subscriptions to different chunks of the whole stream (substreams).
The tree uses two CAF streams between each pair of peers for transmitting data. This automatically adds backpressure to the system, i.e., no peer can overwhelm others.
Policies need to provide the following member types and functions: