C++ Actor Framework 0.18
|
Manages a single stream with any number of in- and outbound paths. More...
#include <stream_manager.hpp>
Public Types | |
using | inbound_path_ptr = std::unique_ptr< inbound_path > |
using | inbound_paths_list = std::vector< inbound_path * > |
using | time_point = typename actor_clock::time_point |
Discrete point in time. | |
Public Member Functions | |
stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal) | |
virtual void | handle (inbound_path *from, downstream_msg::batch &x) |
virtual void | handle (inbound_path *from, downstream_msg::close &x) |
virtual void | handle (inbound_path *from, downstream_msg::forced_close &x) |
virtual bool | handle (stream_slots, upstream_msg::ack_open &x) |
virtual void | handle (stream_slots slots, upstream_msg::ack_batch &x) |
virtual void | handle (stream_slots slots, upstream_msg::drop &x) |
virtual void | handle (stream_slots slots, upstream_msg::forced_drop &x) |
virtual void | stop (error reason=none) |
Closes all output and input paths and sends the final result to the client. | |
virtual void | shutdown () |
Mark this stream as shutting down, only allowing flushing all related buffers of in- and outbound paths. | |
virtual void | push () |
Pushes new data to downstream actors by sending batches. | |
virtual bool | congested (const inbound_path &path) const noexcept |
Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own. | |
virtual void | deliver_handshake (response_promise &rp, stream_slot slot, message handshake) |
Sends a handshake to dest . | |
virtual bool | generate_messages () |
Tries to generate new messages for the stream. | |
virtual downstream_manager & | out ()=0 |
Returns the manager for downstream communication. | |
const downstream_manager & | out () const |
Returns the manager for downstream communication. | |
virtual bool | done () const =0 |
Returns whether the manager has reached the end and can be discarded safely. | |
virtual bool | idle () const noexcept=0 |
Returns whether the manager cannot make any progress on its own at the moment. | |
virtual void | cycle_timeout (size_t cycle_nr) |
Advances time. | |
virtual int32_t | acquire_credit (inbound_path *path, int32_t desired) |
Acquires credit on an inbound path. | |
virtual void | register_input_path (inbound_path *x) |
Informs the manager that a new input path opens. | |
virtual void | deregister_input_path (inbound_path *x) noexcept |
Informs the manager that an input path closes. | |
virtual void | remove_input_path (stream_slot slot, error reason, bool silent) |
Removes an input path. | |
bool | running () const noexcept |
Returns whether this stream is neither shutting down nor has stopped. | |
bool | continuous () const noexcept |
Returns whether this stream remains open even if no in- or outbound paths exist. | |
void | continuous (bool x) noexcept |
Sets whether this stream remains open even if no in- or outbound paths exist. | |
const inbound_paths_list & | inbound_paths () const noexcept |
Returns the list of inbound paths. | |
inbound_path * | get_inbound_path (stream_slot x) const noexcept |
Returns the inbound paths at slot x . | |
bool | inbound_paths_idle () const noexcept |
Queries whether all inbound paths are up-to-date and have non-zero credit. | |
scheduled_actor * | self () noexcept |
Returns the parent actor. | |
void | tick (time_point now) |
![]() | |
ref_counted (const ref_counted &) | |
ref_counted & | operator= (const ref_counted &) |
void | ref () const noexcept |
Increases reference count by one. | |
void | deref () const noexcept |
Decreases reference count by one and calls request_deletion when it drops to zero. | |
bool | unique () const noexcept |
Queries whether there is exactly one reference. | |
size_t | get_reference_count () const noexcept |
Static Public Attributes | |
static constexpr int | is_continuous_flag = 0x0001 |
Configures whether this stream shall remain open even if no in- or outbound paths exist. | |
static constexpr int | is_shutting_down_flag = 0x0002 |
Denotes whether the stream is about to stop, only sending buffered elements. | |
static constexpr int | is_stopped_flag = 0x0004 |
Denotes whether the manager has stopped. | |
Protected Member Functions | |
stream_slot | assign_next_slot () |
stream_slot | assign_next_pending_slot () |
virtual void | finalize (const error &reason) |
virtual void | input_closed (error reason) |
Called when in().closed() changes to true . | |
virtual void | downstream_demand (outbound_path *ptr, long demand) |
Called whenever new credit becomes available. | |
virtual void | output_closed (error reason) |
Called when out().closed() changes to true . | |
stream_slot | add_unchecked_inbound_path_impl (type_id_t input_type, inbound_path_ptr path) |
Adds the current sender as an inbound path. | |
Protected Attributes | |
scheduled_actor * | self_ |
Points to the parent actor. | |
inbound_paths_list | inbound_paths_ |
Stores non-owning pointers to all input paths. | |
long | pending_handshakes_ |
Keeps track of pending handshakes. | |
stream_priority | priority_ |
Configures the importance of outgoing traffic. | |
int | flags_ |
Stores individual flags, for continuous streaming or when shutting down. | |
timespan | max_batch_delay_ |
Stores the maximum amount of time outbound paths should buffer elements before sending underful batches. | |
![]() | |
std::atomic< size_t > | rc_ |
Related Functions | |
(Note that these are not member functions.) | |
using | stream_manager_ptr = intrusive_ptr< stream_manager > |
A reference counting pointer to a stream_manager . | |
![]() | |
template<class T , class... Ts> | |
intrusive_cow_ptr< T > | make_copy_on_write (Ts &&... xs) |
Constructs an object of type T in an intrusive_cow_ptr . | |
template<class T , class... Ts> | |
intrusive_ptr< T > | make_counted (Ts &&... xs) |
Constructs an object of type T in an intrusive_ptr . | |
void | intrusive_ptr_add_ref (const ref_counted *p) |
void | intrusive_ptr_release (const ref_counted *p) |
Manages a single stream with any number of in- and outbound paths.
|
virtual |
Acquires credit on an inbound path.
The calculated credit to fill our queue for two cycles is desired
, but the manager is allowed to return any non-negative value.
|
protected |
Adds the current sender as an inbound path.
open_stream_msg
.
|
noexcept |
Returns whether this stream remains open even if no in- or outbound paths exist.
The default is false
. Does not keep a source alive past the point where its driver returns done() == true
.
|
virtual |
Sends a handshake to dest
.
dest != nullptr
|
virtualnoexcept |
Informs the manager that an input path closes.
inbound_path
.
|
pure virtual |
Returns whether the manager has reached the end and can be discarded safely.
Implemented in caf::detail::stream_distribution_tree< Policy >.
|
protectedvirtual |
Called whenever new credit becomes available.
The default implementation logs an error (sources are expected to override this hook).
|
virtual |
Tries to generate new messages for the stream.
This member function does nothing on stages and sinks, but can trigger a source to produce more messages.
|
pure virtualnoexcept |
Returns whether the manager cannot make any progress on its own at the moment.
For example, a source is idle if it has filled its output buffer and there isn't any credit left.
Implemented in caf::detail::stream_distribution_tree< Policy >.
|
noexcept |
Queries whether all inbound paths are up-to-date and have non-zero credit.
A sink is idle if this function returns true
.
|
protectedvirtual |
Called when in().closed()
changes to true
.
The default implementation does nothing.
|
pure virtual |
Returns the manager for downstream communication.
Implemented in caf::detail::stream_distribution_tree< Policy >.
|
protectedvirtual |
Called when out().closed()
changes to true
.
The default implementation does nothing.
|
virtual |
Pushes new data to downstream actors by sending batches.
The amount of pushed data is limited by the available credit.
|
virtual |
Informs the manager that a new input path opens.
inbound_path
.
|
staticconstexpr |
Denotes whether the manager has stopped.
Calling member functions such as stop() or abort() on it no longer has any effect.