C++ Actor Framework 0.18
Loading...
Searching...
No Matches
Classes | Public Types | Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
caf::downstream_manager Class Reference

Manages downstream communication for a stream_manager. More...

#include <downstream_manager.hpp>

Inheritance diagram for caf::downstream_manager:
caf::downstream_manager_base caf::fused_downstream_manager< T, Ts > caf::buffered_downstream_manager< T >

Classes

struct  path_predicate
 Predicate object for paths. More...
 
struct  path_visitor
 Function object for iterating over all paths. More...
 

Public Types

enum  path_algorithm {
  all_of ,
  any_of ,
  none_of
}
 Selects a check algorithms.
 
using path_type = outbound_path
 Outbound path.
 
using path_ptr = path_type *
 Pointer to an outbound path.
 
using const_path_ptr = const path_type *
 Pointer to an immutable outbound path.
 
using unique_path_ptr = std::unique_ptr< path_type >
 Unique pointer to an outbound path.
 
using time_point = typename actor_clock::time_point
 Discrete point in time, as reported by the actor clock.
 

Public Member Functions

 downstream_manager (stream_manager *parent)
 
 downstream_manager (const downstream_manager &)=delete
 
downstream_manageroperator= (const downstream_manager &)=delete
 
scheduled_actor * self () const noexcept
 
stream_managerparent () const noexcept
 
virtual bool terminal () const noexcept
 Returns true if this manager belongs to a sink, i.e., terminates the stream and never has outbound paths.
 
void tick (time_point now, timespan max_batch_delay)
 Forces underful batches after reaching the maximum delay.
 
template<class F >
void for_each_path (F f)
 Applies f to each path.
 
template<class F >
void for_each_path (F f) const
 Applies f to each path.
 
std::vector< stream_slotpath_slots ()
 Returns all used slots.
 
std::vector< stream_slotopen_path_slots ()
 Returns all open slots, i.e., slots assigned to outbound paths with closing == false.
 
template<class Predicate >
bool all_paths (Predicate predicate) const noexcept
 Checks whether predicate holds true for all paths.
 
template<class Predicate >
bool any_path (Predicate predicate) const noexcept
 Checks whether predicate holds true for any path.
 
template<class Predicate >
bool no_path (Predicate predicate) const noexcept
 Checks whether predicate holds true for no path.
 
virtual size_t num_paths () const noexcept
 Returns the current number of paths.
 
path_ptr add_path (stream_slot slot, strong_actor_ptr target)
 Adds a pending path to target to the manager.
 
virtual bool remove_path (stream_slot slot, error reason, bool silent) noexcept
 Removes a path from the manager.
 
virtual path_ptr path (stream_slot slot) noexcept
 Returns the path associated to slot or nullptr.
 
const_path_ptr path (stream_slot slot) const noexcept
 Returns the path associated to slot or nullptr.
 
bool clean () const noexcept
 Returns true if there is no data pending and all batches are acknowledged batch on all paths.
 
bool clean (stream_slot slot) const noexcept
 Returns true if slot is unknown or if there is no data pending and all batches are acknowledged on slot.
 
virtual void close ()
 Removes all paths gracefully.
 
virtual void close (stream_slot slot)
 Removes path slot gracefully by sending pending batches before removing it.
 
virtual void abort (error reason)
 Removes all paths with an error message.
 
bool empty () const noexcept
 Returns num_paths() == 0.
 
size_t min_credit () const
 Returns the minimum amount of credit on all output paths.
 
size_t max_credit () const
 Returns the maximum amount of credit on all output paths.
 
size_t total_credit () const
 Returns the total amount of credit on all output paths, i.e., the sum of all individual credits.
 
virtual void emit_batches ()
 Sends batches to sinks.
 
virtual void force_emit_batches ()
 Sends batches to sinks regardless of whether or not the batches reach the desired batch size.
 
virtual size_t capacity () const noexcept
 Queries the currently available capacity for the output buffer.
 
virtual size_t buffered () const noexcept
 Queries the size of the output buffer.
 
virtual size_t buffered (stream_slot slot) const noexcept
 Queries an estimate of the size of the output buffer for slot.
 
bool stalled () const noexcept
 Queries whether the manager cannot make any progress, because its buffer is full and no more credit is available.
 
virtual void clear_paths ()
 Silently removes all paths.
 

Protected Member Functions

virtual bool insert_path (unique_path_ptr ptr)
 Inserts ptr to the implementation-specific container.
 
virtual void for_each_path_impl (path_visitor &f)
 Applies f to each path.
 
virtual bool check_paths_impl (path_algorithm algo, path_predicate &pred) const noexcept
 Dispatches the predicate to std::all_of, std::any_of, or std::none_of.
 
virtual void about_to_erase (path_ptr ptr, bool silent, error *reason)
 Emits a regular (reason == nullptr) or irregular (reason != nullptr) shutdown if silent == false.
 
template<class Predicate >
bool check_paths (path_algorithm algorithm, Predicate predicate) const noexcept
 Delegates to check_paths_impl.
 

Protected Attributes

stream_managerparent_
 
time_point last_send_
 Stores the time stamp of our last batch.
 

Detailed Description

Manages downstream communication for a stream_manager.

The downstream manager owns the outbound_path objects, has a buffer for storing pending output and is responsible for the dispatching policy (broadcasting, for example). The default implementation terminates the stream and never accepts any paths.

Member Function Documentation

◆ abort()

virtual void caf::downstream_manager::abort ( error  reason)
virtual

Removes all paths with an error message.

Reimplemented in caf::fused_downstream_manager< T, Ts >.

◆ about_to_erase()

virtual void caf::downstream_manager::about_to_erase ( path_ptr  ptr,
bool  silent,
error reason 
)
protectedvirtual

Emits a regular (reason == nullptr) or irregular (reason != nullptr) shutdown if silent == false.

Warning
moves *reason if reason == nullptr

◆ add_path()

path_ptr caf::downstream_manager::add_path ( stream_slot  slot,
strong_actor_ptr  target 
)

Adds a pending path to target to the manager.

Returns
The added path on success, nullptr otherwise.

◆ buffered() [1/2]

virtual size_t caf::downstream_manager::buffered ( ) const
virtualnoexcept

Queries the size of the output buffer.

Reimplemented in caf::buffered_downstream_manager< T >, and caf::fused_downstream_manager< T, Ts >.

◆ buffered() [2/2]

virtual size_t caf::downstream_manager::buffered ( stream_slot  slot) const
virtualnoexcept

Queries an estimate of the size of the output buffer for slot.

Reimplemented in caf::fused_downstream_manager< T, Ts >.

◆ capacity()

virtual size_t caf::downstream_manager::capacity ( ) const
virtualnoexcept

Queries the currently available capacity for the output buffer.

Reimplemented in caf::buffered_downstream_manager< T >, and caf::fused_downstream_manager< T, Ts >.

◆ check_paths_impl()

virtual bool caf::downstream_manager::check_paths_impl ( path_algorithm  algo,
path_predicate pred 
) const
protectedvirtualnoexcept

Dispatches the predicate to std::all_of, std::any_of, or std::none_of.

Reimplemented in caf::downstream_manager_base, and caf::fused_downstream_manager< T, Ts >.

◆ clean()

bool caf::downstream_manager::clean ( stream_slot  slot) const
noexcept

Returns true if slot is unknown or if there is no data pending and all batches are acknowledged on slot.

The default implementation returns false for all paths, even if clean() return true.

◆ clear_paths()

virtual void caf::downstream_manager::clear_paths ( )
virtual

Silently removes all paths.

Reimplemented in caf::downstream_manager_base, and caf::fused_downstream_manager< T, Ts >.

◆ close() [1/2]

virtual void caf::downstream_manager::close ( )
virtual

Removes all paths gracefully.

Reimplemented in caf::fused_downstream_manager< T, Ts >, and caf::fused_downstream_manager< T, Ts >.

◆ close() [2/2]

virtual void caf::downstream_manager::close ( stream_slot  slot)
virtual

Removes path slot gracefully by sending pending batches before removing it.

Effectively calls path(slot)->closing = true.

Reimplemented in caf::fused_downstream_manager< T, Ts >.

◆ emit_batches()

virtual void caf::downstream_manager::emit_batches ( )
virtual

Sends batches to sinks.

Reimplemented in caf::fused_downstream_manager< T, Ts >.

◆ for_each_path_impl()

virtual void caf::downstream_manager::for_each_path_impl ( path_visitor f)
protectedvirtual

Applies f to each path.

Reimplemented in caf::downstream_manager_base, and caf::fused_downstream_manager< T, Ts >.

◆ force_emit_batches()

virtual void caf::downstream_manager::force_emit_batches ( )
virtual

Sends batches to sinks regardless of whether or not the batches reach the desired batch size.

Reimplemented in caf::fused_downstream_manager< T, Ts >.

◆ insert_path()

virtual bool caf::downstream_manager::insert_path ( unique_path_ptr  ptr)
protectedvirtual

Inserts ptr to the implementation-specific container.

Reimplemented in caf::downstream_manager_base, and caf::fused_downstream_manager< T, Ts >.

◆ num_paths()

virtual size_t caf::downstream_manager::num_paths ( ) const
virtualnoexcept

Returns the current number of paths.

Reimplemented in caf::downstream_manager_base, and caf::fused_downstream_manager< T, Ts >.

◆ path()

virtual path_ptr caf::downstream_manager::path ( stream_slot  slot)
virtualnoexcept

Returns the path associated to slot or nullptr.

Reimplemented in caf::fused_downstream_manager< T, Ts >, and caf::downstream_manager_base.

◆ remove_path()

virtual bool caf::downstream_manager::remove_path ( stream_slot  slot,
error  reason,
bool  silent 
)
virtualnoexcept

Removes a path from the manager.

Reimplemented in caf::fused_downstream_manager< T, Ts >, and caf::downstream_manager_base.

◆ terminal()

virtual bool caf::downstream_manager::terminal ( ) const
virtualnoexcept

Returns true if this manager belongs to a sink, i.e., terminates the stream and never has outbound paths.

Reimplemented in caf::buffered_downstream_manager< T >, and caf::fused_downstream_manager< T, Ts >.


The documentation for this class was generated from the following file: