C++ Actor Framework 0.18
Loading...
Searching...
No Matches
Public Types | Public Member Functions | Static Public Attributes | Protected Member Functions | Protected Attributes | Related Functions | List of all members
caf::stream_manager Class Referenceabstract

Manages a single stream with any number of in- and outbound paths. More...

#include <stream_manager.hpp>

Inheritance diagram for caf::stream_manager:
caf::ref_counted caf::detail::stream_distribution_tree< Policy >

Public Types

using inbound_path_ptr = std::unique_ptr< inbound_path >
 
using inbound_paths_list = std::vector< inbound_path * >
 
using time_point = typename actor_clock::time_point
 Discrete point in time.
 

Public Member Functions

 stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal)
 
virtual void handle (inbound_path *from, downstream_msg::batch &x)
 
virtual void handle (inbound_path *from, downstream_msg::close &x)
 
virtual void handle (inbound_path *from, downstream_msg::forced_close &x)
 
virtual bool handle (stream_slots, upstream_msg::ack_open &x)
 
virtual void handle (stream_slots slots, upstream_msg::ack_batch &x)
 
virtual void handle (stream_slots slots, upstream_msg::drop &x)
 
virtual void handle (stream_slots slots, upstream_msg::forced_drop &x)
 
virtual void stop (error reason=none)
 Closes all output and input paths and sends the final result to the client.
 
virtual void shutdown ()
 Mark this stream as shutting down, only allowing flushing all related buffers of in- and outbound paths.
 
virtual void push ()
 Pushes new data to downstream actors by sending batches.
 
virtual bool congested (const inbound_path &path) const noexcept
 Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own.
 
virtual void deliver_handshake (response_promise &rp, stream_slot slot, message handshake)
 Sends a handshake to dest.
 
virtual bool generate_messages ()
 Tries to generate new messages for the stream.
 
virtual downstream_managerout ()=0
 Returns the manager for downstream communication.
 
const downstream_managerout () const
 Returns the manager for downstream communication.
 
virtual bool done () const =0
 Returns whether the manager has reached the end and can be discarded safely.
 
virtual bool idle () const noexcept=0
 Returns whether the manager cannot make any progress on its own at the moment.
 
virtual void cycle_timeout (size_t cycle_nr)
 Advances time.
 
virtual int32_t acquire_credit (inbound_path *path, int32_t desired)
 Acquires credit on an inbound path.
 
virtual void register_input_path (inbound_path *x)
 Informs the manager that a new input path opens.
 
virtual void deregister_input_path (inbound_path *x) noexcept
 Informs the manager that an input path closes.
 
virtual void remove_input_path (stream_slot slot, error reason, bool silent)
 Removes an input path.
 
bool running () const noexcept
 Returns whether this stream is neither shutting down nor has stopped.
 
bool continuous () const noexcept
 Returns whether this stream remains open even if no in- or outbound paths exist.
 
void continuous (bool x) noexcept
 Sets whether this stream remains open even if no in- or outbound paths exist.
 
const inbound_paths_list & inbound_paths () const noexcept
 Returns the list of inbound paths.
 
inbound_pathget_inbound_path (stream_slot x) const noexcept
 Returns the inbound paths at slot x.
 
bool inbound_paths_idle () const noexcept
 Queries whether all inbound paths are up-to-date and have non-zero credit.
 
scheduled_actor * self () noexcept
 Returns the parent actor.
 
void tick (time_point now)
 
- Public Member Functions inherited from caf::ref_counted
 ref_counted (const ref_counted &)
 
ref_countedoperator= (const ref_counted &)
 
void ref () const noexcept
 Increases reference count by one.
 
void deref () const noexcept
 Decreases reference count by one and calls request_deletion when it drops to zero.
 
bool unique () const noexcept
 Queries whether there is exactly one reference.
 
size_t get_reference_count () const noexcept
 

Static Public Attributes

static constexpr int is_continuous_flag = 0x0001
 Configures whether this stream shall remain open even if no in- or outbound paths exist.
 
static constexpr int is_shutting_down_flag = 0x0002
 Denotes whether the stream is about to stop, only sending buffered elements.
 
static constexpr int is_stopped_flag = 0x0004
 Denotes whether the manager has stopped.
 

Protected Member Functions

stream_slot assign_next_slot ()
 
stream_slot assign_next_pending_slot ()
 
virtual void finalize (const error &reason)
 
virtual void input_closed (error reason)
 Called when in().closed() changes to true.
 
virtual void downstream_demand (outbound_path *ptr, long demand)
 Called whenever new credit becomes available.
 
virtual void output_closed (error reason)
 Called when out().closed() changes to true.
 
stream_slot add_unchecked_inbound_path_impl (type_id_t input_type, inbound_path_ptr path)
 Adds the current sender as an inbound path.
 

Protected Attributes

scheduled_actor * self_
 Points to the parent actor.
 
inbound_paths_list inbound_paths_
 Stores non-owning pointers to all input paths.
 
long pending_handshakes_
 Keeps track of pending handshakes.
 
stream_priority priority_
 Configures the importance of outgoing traffic.
 
int flags_
 Stores individual flags, for continuous streaming or when shutting down.
 
timespan max_batch_delay_
 Stores the maximum amount of time outbound paths should buffer elements before sending underful batches.
 
- Protected Attributes inherited from caf::ref_counted
std::atomic< size_t > rc_
 

Related Functions

(Note that these are not member functions.)

using stream_manager_ptr = intrusive_ptr< stream_manager >
 A reference counting pointer to a stream_manager.
 

Detailed Description

Manages a single stream with any number of in- and outbound paths.

Member Function Documentation

◆ acquire_credit()

virtual int32_t caf::stream_manager::acquire_credit ( inbound_path path,
int32_t  desired 
)
virtual

Acquires credit on an inbound path.

The calculated credit to fill our queue for two cycles is desired, but the manager is allowed to return any non-negative value.

◆ add_unchecked_inbound_path_impl()

stream_slot caf::stream_manager::add_unchecked_inbound_path_impl ( type_id_t  input_type,
inbound_path_ptr  path 
)
protected

Adds the current sender as an inbound path.

Precondition
Current message is an open_stream_msg.

◆ continuous()

bool caf::stream_manager::continuous ( ) const
noexcept

Returns whether this stream remains open even if no in- or outbound paths exist.

The default is false. Does not keep a source alive past the point where its driver returns done() == true.

◆ deliver_handshake()

virtual void caf::stream_manager::deliver_handshake ( response_promise rp,
stream_slot  slot,
message  handshake 
)
virtual

Sends a handshake to dest.

Precondition
dest != nullptr

◆ deregister_input_path()

virtual void caf::stream_manager::deregister_input_path ( inbound_path x)
virtualnoexcept

Informs the manager that an input path closes.

Note
The lifetime of inbound paths is managed by the downstream queue. This function is called from the destructor of inbound_path.

◆ done()

virtual bool caf::stream_manager::done ( ) const
pure virtual

Returns whether the manager has reached the end and can be discarded safely.

Implemented in caf::detail::stream_distribution_tree< Policy >.

◆ downstream_demand()

virtual void caf::stream_manager::downstream_demand ( outbound_path ptr,
long  demand 
)
protectedvirtual

Called whenever new credit becomes available.

The default implementation logs an error (sources are expected to override this hook).

◆ generate_messages()

virtual bool caf::stream_manager::generate_messages ( )
virtual

Tries to generate new messages for the stream.

This member function does nothing on stages and sinks, but can trigger a source to produce more messages.

◆ idle()

virtual bool caf::stream_manager::idle ( ) const
pure virtualnoexcept

Returns whether the manager cannot make any progress on its own at the moment.

For example, a source is idle if it has filled its output buffer and there isn't any credit left.

Implemented in caf::detail::stream_distribution_tree< Policy >.

◆ inbound_paths_idle()

bool caf::stream_manager::inbound_paths_idle ( ) const
noexcept

Queries whether all inbound paths are up-to-date and have non-zero credit.

A sink is idle if this function returns true.

◆ input_closed()

virtual void caf::stream_manager::input_closed ( error  reason)
protectedvirtual

Called when in().closed() changes to true.

The default implementation does nothing.

◆ out()

virtual downstream_manager & caf::stream_manager::out ( )
pure virtual

Returns the manager for downstream communication.

Implemented in caf::detail::stream_distribution_tree< Policy >.

◆ output_closed()

virtual void caf::stream_manager::output_closed ( error  reason)
protectedvirtual

Called when out().closed() changes to true.

The default implementation does nothing.

◆ push()

virtual void caf::stream_manager::push ( )
virtual

Pushes new data to downstream actors by sending batches.

The amount of pushed data is limited by the available credit.

◆ register_input_path()

virtual void caf::stream_manager::register_input_path ( inbound_path x)
virtual

Informs the manager that a new input path opens.

Note
The lifetime of inbound paths is managed by the downstream queue. This function is called from the constructor of inbound_path.

Member Data Documentation

◆ is_stopped_flag

constexpr int caf::stream_manager::is_stopped_flag = 0x0004
staticconstexpr

Denotes whether the manager has stopped.

Calling member functions such as stop() or abort() on it no longer has any effect.


The documentation for this class was generated from the following file: