C++ Actor Framework 1.0.0
Loading...
Searching...
No Matches
caf::async::spsc_buffer< T > Class Template Reference

A Single Producer Single Consumer buffer. More...

#include <spsc_buffer.hpp>

Inheritance diagram for caf::async::spsc_buffer< T >:
caf::ref_counted caf::detail::atomic_ref_counted

Classes

struct  flags
 Packs various status flags for the buffer into a single struct. More...
 

Public Types

using value_type = T
 
using lock_type = std::unique_lock<std::mutex>
 
- Public Types inherited from caf::ref_counted
using super = detail::atomic_ref_counted
 

Public Member Functions

 spsc_buffer (uint32_t capacity, uint32_t min_pull_size)
 
size_t push (span< const T > items)
 Appends to the buffer and calls on_producer_wakeup on the consumer if the buffer becomes non-empty.
 
size_t push (const T &item)
 
template<class Policy , class Observer >
std::pair< bool, size_t > pull (Policy policy, size_t demand, Observer &dst)
 Consumes up to demand items from the buffer.
 
bool has_data () const noexcept
 Checks whether there is any pending data in the buffer.
 
bool has_consumer_event () const noexcept
 Checks whether the there is data available or whether the producer has closed or aborted the flow.
 
size_t available () const noexcept
 Returns how many items are currently available.
 
error abort_reason () const
 Returns the error from the producer or a default-constructed error if abort was not called yet.
 
void close ()
 Closes the buffer by request of the producer.
 
void abort (error reason)
 Closes the buffer by request of the producer and signals an error to the consumer.
 
void cancel ()
 Closes the buffer by request of the consumer.
 
void set_consumer (consumer_ptr consumer)
 Consumer callback for the initial handshake between producer and consumer.
 
void set_producer (producer_ptr producer)
 Producer callback for the initial handshake between producer and consumer.
 
size_t capacity () const noexcept
 Returns the capacity as passed to the constructor of the buffer.
 
auto & mtx () const noexcept
 Returns the mutex for this object.
 
size_t available_unsafe () const noexcept
 Returns how many items are currently available.
 
const errorabort_reason_unsafe () const noexcept
 Returns the error from the producer.
 
void await_consumer_ready (lock_type &guard, std::condition_variable &cv)
 Blocks until there is at least one item available or the producer stopped.
 
template<class TimePoint >
bool await_consumer_ready (lock_type &guard, std::condition_variable &cv, TimePoint timeout)
 Blocks until there is at least one item available, the producer stopped, or a timeout occurs.
 
template<class Policy , class Observer >
std::pair< bool, size_t > pull_unsafe (lock_type &guard, Policy, size_t demand, Observer &dst)
 
- Public Member Functions inherited from caf::detail::atomic_ref_counted
 atomic_ref_counted (const atomic_ref_counted &)
 
atomic_ref_countedoperator= (const atomic_ref_counted &)
 
void ref () const noexcept
 Increases reference count by one.
 
void deref () const noexcept
 Decreases reference count by one and calls request_deletion when it drops to zero.
 
bool unique () const noexcept
 Queries whether there is exactly one reference.
 
size_t get_reference_count () const noexcept
 Queries the current reference count for this object.
 

Related Symbols

(Note that these are not member symbols.)

template<class T >
using spsc_buffer_ptr = intrusive_ptr<spsc_buffer<T>>
 

Additional Inherited Members

- Protected Attributes inherited from caf::detail::atomic_ref_counted
std::atomic< size_t > rc_
 

Detailed Description

template<class T>
class caf::async::spsc_buffer< T >

A Single Producer Single Consumer buffer.

The buffer uses a "soft bound", which means that the producer announces a desired maximum for in-flight items that the buffer uses for its bookkeeping, but the producer may add more than that number of items. Allowing producers to go "beyond the limit" is intended for producer that transform inputs into outputs where one input event can produce multiple output items.

Aside from providing storage, this buffer also resumes the consumer if data is available and signals demand to the producer whenever the consumer takes data out of the buffer.

Member Function Documentation

◆ abort_reason_unsafe()

template<class T >
const error & caf::async::spsc_buffer< T >::abort_reason_unsafe ( ) const
noexcept

Returns the error from the producer.

Precondition
'mtx()' is locked.

◆ available()

template<class T >
size_t caf::async::spsc_buffer< T >::available ( ) const
noexcept

Returns how many items are currently available.

This may be greater than the capacity.

◆ available_unsafe()

template<class T >
size_t caf::async::spsc_buffer< T >::available_unsafe ( ) const
noexcept

Returns how many items are currently available.

Precondition
'mtx()' is locked.

◆ await_consumer_ready() [1/2]

template<class T >
void caf::async::spsc_buffer< T >::await_consumer_ready ( lock_type & guard,
std::condition_variable & cv )

Blocks until there is at least one item available or the producer stopped.

Precondition
the consumer calls cv.notify_all() in its on_producer_wakeup

◆ await_consumer_ready() [2/2]

template<class T >
template<class TimePoint >
bool caf::async::spsc_buffer< T >::await_consumer_ready ( lock_type & guard,
std::condition_variable & cv,
TimePoint timeout )

Blocks until there is at least one item available, the producer stopped, or a timeout occurs.

Precondition
the consumer calls cv.notify_all() in its on_producer_wakeup

◆ pull()

template<class T >
template<class Policy , class Observer >
std::pair< bool, size_t > caf::async::spsc_buffer< T >::pull ( Policy policy,
size_t demand,
Observer & dst )

Consumes up to demand items from the buffer.

Template Parameters
PolicyEither instant_error_t, delay_error_t or ignore_errors_t.
Returns
a tuple indicating whether the consumer may call pull again and how many items were consumed. When returning false for the first tuple element, the function has called on_complete or on_error on the observer.

◆ push()

template<class T >
size_t caf::async::spsc_buffer< T >::push ( span< const T > items)

Appends to the buffer and calls on_producer_wakeup on the consumer if the buffer becomes non-empty.

Returns
the remaining capacity after inserting the items.

The documentation for this class was generated from the following files: