C++ Actor Framework 0.18
Loading...
Searching...
No Matches
Public Types | Public Member Functions | List of all members
caf::detail::stream_distribution_tree< Policy > Class Template Reference

A stream distribution tree consist of peers forming an acyclic graph. More...

#include <stream_distribution_tree.hpp>

Inheritance diagram for caf::detail::stream_distribution_tree< Policy >:
caf::stream_manager caf::ref_counted

Public Types

using super = stream_manager
 
using downstream_manager_type = typename Policy::downstream_manager_type
 
- Public Types inherited from caf::stream_manager
using inbound_path_ptr = std::unique_ptr< inbound_path >
 
using inbound_paths_list = std::vector< inbound_path * >
 
using time_point = typename actor_clock::time_point
 Discrete point in time.
 

Public Member Functions

template<class... Ts>
 stream_distribution_tree (scheduled_actor *selfptr, Ts &&... xs)
 
Policy & policy ()
 
const Policy & policy () const
 
void handle (inbound_path *path, downstream_msg::batch &x) override
 
void handle (inbound_path *path, downstream_msg::close &x) override
 
void handle (inbound_path *path, downstream_msg::forced_close &x) override
 
bool handle (stream_slots slots, upstream_msg::ack_open &x) override
 
void handle (stream_slots slots, upstream_msg::drop &x) override
 
void handle (stream_slots slots, upstream_msg::forced_drop &x) override
 
bool done () const override
 Returns whether the manager has reached the end and can be discarded safely.
 
bool idle () const noexcept override
 Returns whether the manager cannot make any progress on its own at the moment.
 
downstream_manager_type & out () override
 Returns the manager for downstream communication.
 
- Public Member Functions inherited from caf::stream_manager
 stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal)
 
virtual void handle (inbound_path *from, downstream_msg::batch &x)
 
virtual void handle (inbound_path *from, downstream_msg::close &x)
 
virtual void handle (inbound_path *from, downstream_msg::forced_close &x)
 
virtual bool handle (stream_slots, upstream_msg::ack_open &x)
 
virtual void handle (stream_slots slots, upstream_msg::ack_batch &x)
 
virtual void handle (stream_slots slots, upstream_msg::drop &x)
 
virtual void handle (stream_slots slots, upstream_msg::forced_drop &x)
 
virtual void stop (error reason=none)
 Closes all output and input paths and sends the final result to the client.
 
virtual void shutdown ()
 Mark this stream as shutting down, only allowing flushing all related buffers of in- and outbound paths.
 
virtual void push ()
 Pushes new data to downstream actors by sending batches.
 
virtual bool congested (const inbound_path &path) const noexcept
 Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own.
 
virtual void deliver_handshake (response_promise &rp, stream_slot slot, message handshake)
 Sends a handshake to dest.
 
virtual bool generate_messages ()
 Tries to generate new messages for the stream.
 
virtual downstream_managerout ()=0
 Returns the manager for downstream communication.
 
const downstream_managerout () const
 Returns the manager for downstream communication.
 
virtual bool done () const =0
 Returns whether the manager has reached the end and can be discarded safely.
 
virtual bool idle () const noexcept=0
 Returns whether the manager cannot make any progress on its own at the moment.
 
virtual void cycle_timeout (size_t cycle_nr)
 Advances time.
 
virtual int32_t acquire_credit (inbound_path *path, int32_t desired)
 Acquires credit on an inbound path.
 
virtual void register_input_path (inbound_path *x)
 Informs the manager that a new input path opens.
 
virtual void deregister_input_path (inbound_path *x) noexcept
 Informs the manager that an input path closes.
 
virtual void remove_input_path (stream_slot slot, error reason, bool silent)
 Removes an input path.
 
bool running () const noexcept
 Returns whether this stream is neither shutting down nor has stopped.
 
bool continuous () const noexcept
 Returns whether this stream remains open even if no in- or outbound paths exist.
 
void continuous (bool x) noexcept
 Sets whether this stream remains open even if no in- or outbound paths exist.
 
const inbound_paths_list & inbound_paths () const noexcept
 Returns the list of inbound paths.
 
inbound_pathget_inbound_path (stream_slot x) const noexcept
 Returns the inbound paths at slot x.
 
bool inbound_paths_idle () const noexcept
 Queries whether all inbound paths are up-to-date and have non-zero credit.
 
scheduled_actor * self () noexcept
 Returns the parent actor.
 
void tick (time_point now)
 
- Public Member Functions inherited from caf::ref_counted
 ref_counted (const ref_counted &)
 
ref_countedoperator= (const ref_counted &)
 
void ref () const noexcept
 Increases reference count by one.
 
void deref () const noexcept
 Decreases reference count by one and calls request_deletion when it drops to zero.
 
bool unique () const noexcept
 Queries whether there is exactly one reference.
 
size_t get_reference_count () const noexcept
 

Additional Inherited Members

- Static Public Attributes inherited from caf::stream_manager
static constexpr int is_continuous_flag = 0x0001
 Configures whether this stream shall remain open even if no in- or outbound paths exist.
 
static constexpr int is_shutting_down_flag = 0x0002
 Denotes whether the stream is about to stop, only sending buffered elements.
 
static constexpr int is_stopped_flag = 0x0004
 Denotes whether the manager has stopped.
 
- Protected Member Functions inherited from caf::stream_manager
stream_slot assign_next_slot ()
 
stream_slot assign_next_pending_slot ()
 
virtual void finalize (const error &reason)
 
virtual void input_closed (error reason)
 Called when in().closed() changes to true.
 
virtual void downstream_demand (outbound_path *ptr, long demand)
 Called whenever new credit becomes available.
 
virtual void output_closed (error reason)
 Called when out().closed() changes to true.
 
stream_slot add_unchecked_inbound_path_impl (type_id_t input_type, inbound_path_ptr path)
 Adds the current sender as an inbound path.
 
- Protected Attributes inherited from caf::stream_manager
scheduled_actor * self_
 Points to the parent actor.
 
inbound_paths_list inbound_paths_
 Stores non-owning pointers to all input paths.
 
long pending_handshakes_
 Keeps track of pending handshakes.
 
stream_priority priority_
 Configures the importance of outgoing traffic.
 
int flags_
 Stores individual flags, for continuous streaming or when shutting down.
 
timespan max_batch_delay_
 Stores the maximum amount of time outbound paths should buffer elements before sending underful batches.
 
- Protected Attributes inherited from caf::ref_counted
std::atomic< size_t > rc_
 

Detailed Description

template<class Policy>
class caf::detail::stream_distribution_tree< Policy >

A stream distribution tree consist of peers forming an acyclic graph.

The user is responsible for making sure peers do not form a loop. Data is flooded along the tree. Each peer serves any number of subscribers. The policy of the tree enables subscriptions to different chunks of the whole stream (substreams).

The tree uses two CAF streams between each pair of peers for transmitting data. This automatically adds backpressure to the system, i.e., no peer can overwhelm others.

Policies need to provide the following member types and functions:

TODO
};

Member Function Documentation

◆ done()

template<class Policy >
bool caf::detail::stream_distribution_tree< Policy >::done ( ) const
overridevirtual

Returns whether the manager has reached the end and can be discarded safely.

Implements caf::stream_manager.

◆ handle() [1/6]

template<class Policy >
void caf::detail::stream_distribution_tree< Policy >::handle ( inbound_path path,
downstream_msg::batch x 
)
overridevirtual

Reimplemented from caf::stream_manager.

◆ handle() [2/6]

template<class Policy >
void caf::detail::stream_distribution_tree< Policy >::handle ( inbound_path path,
downstream_msg::close x 
)
overridevirtual

Reimplemented from caf::stream_manager.

◆ handle() [3/6]

template<class Policy >
void caf::detail::stream_distribution_tree< Policy >::handle ( inbound_path path,
downstream_msg::forced_close x 
)
overridevirtual

Reimplemented from caf::stream_manager.

◆ handle() [4/6]

template<class Policy >
bool caf::detail::stream_distribution_tree< Policy >::handle ( stream_slots  slots,
upstream_msg::ack_open x 
)
overridevirtual

Reimplemented from caf::stream_manager.

◆ handle() [5/6]

template<class Policy >
void caf::detail::stream_distribution_tree< Policy >::handle ( stream_slots  slots,
upstream_msg::drop x 
)
overridevirtual

Reimplemented from caf::stream_manager.

◆ handle() [6/6]

template<class Policy >
void caf::detail::stream_distribution_tree< Policy >::handle ( stream_slots  slots,
upstream_msg::forced_drop x 
)
overridevirtual

Reimplemented from caf::stream_manager.

◆ idle()

template<class Policy >
bool caf::detail::stream_distribution_tree< Policy >::idle ( ) const
overridevirtualnoexcept

Returns whether the manager cannot make any progress on its own at the moment.

For example, a source is idle if it has filled its output buffer and there isn't any credit left.

Implements caf::stream_manager.

◆ out()

template<class Policy >
downstream_manager_type & caf::detail::stream_distribution_tree< Policy >::out ( )
overridevirtual

Returns the manager for downstream communication.

Implements caf::stream_manager.


The documentation for this class was generated from the following file: