C++ Docs

class StreamReader

The main entry point for River for reading an existing stream. This class is initialized with a stream name corresponding to an existing stream, and allows for batch consumption of the stream. Reads requesting more data than is present in the stream will block. Any attempt to read into a typed buffer will be checked against the stream’s schema to ensure compatibility.

After constructing a StreamReader, you must call initialize with the name of the stream you wish to read.

Public Functions

explicit StreamReader(const RedisConnection &connection, const int max_fetch_size = 10000)

Construct an instance of a StreamReader. One StreamReader should be used with at most one underlying stream.

Parameters
  • connection – parameters to connect to Redis

  • max_fetch_size – maximum number of elements to fetch from redis at a time (to prevent untenably large batches if a large number of bytes are consumed).

void Initialize(const std::string &stream_name, int timeout_ms = -1)

Initialize this reader to a particular stream. If timeout_ms is positive, this call will wait for up to timeout_ms milliseconds for the stream to be created. When the timeout is exceeded or if no timeout was given and the stream does not exist, a StreamReaderException will be raised.

template<class DataT>
inline int64_t Read(DataT *buffer, int64_t num_samples, int **sizes = nullptr, std::string **keys = nullptr, int timeout_ms = -1)

Read from the stream from where was last consumed. This call blocks until the desired number of samples is available in the underlying stream. The return value indicates how many samples were written to the buffer. If EOF has been reached, then #good() will return false, and any attempts to #read() will return -1.

Template Parameters

DataT – The data type of the buffer. The sizeof() of this type should match the stream’s sample size as governed by its schema.

Parameters
  • num_samplesMaximum number of samples to read from the underlying stream.

  • sizes – If given, <return value> entries will be written into this array containing the sizes of each corresponding sample. Particularly useful for VARIABLE_WIDTH_BYTES fields. Pass nullptr to ignore.

  • keys – If given, <return value> char * entries will be written into this array containing the NULL-terminated unique std::string keys in the underlying database. Pass nullptr to ignore.

  • timeout_ms – If positive, the maximum length of time this entire call can block while waiting for samples. After the timeout, the stream can be partially read, and the return value is needed to determine samples read.

Returns

the number of elements read. This will always be less than or equal to num_samples. For example, if there is a timeout, this could be a partially read buffer and so can be less than num_samples; this number can be less than num_samples even if there is no timeout given. Returns -1 if EOF is encountered; if -1 is returned, buffer is guaranteed to not have been touched (nor the other buffer objects like sizes/keys).

int64_t ReadBytes(char *buffer, int64_t num_samples, int **sizes = nullptr, std::string **keys = nullptr, int timeout_ms = -1)

Read from the stream from where was last consumed. This call blocks until the desired number of samples is available in the underlying stream. The return value indicates how many samples were written to the buffer. If EOF has been reached, then #good() will return false, and any attempts to #read() will return -1.

Parameters
  • buffer – The buffer into which data will be read from the stream. The return value of this call (if nonnegative) tells how many samples, each of sample_size bytes as told by the schema, were written into the buffer. For VARIABLE_WIDTH_BYTES fields, ensure this buffer is large enough to capture the maximum possible size of num_samples samples.

  • num_samplesMaximum number of samples to read from the underlying stream.

  • sizes – If given, <return value> entries will be written into this array containing the sizes of each corresponding sample. Particularly useful for VARIABLE_WIDTH_BYTES fields. Pass nullptr to ignore.

  • keys – If given, <return value> std::string entries will be written into this array containing the NULL-terminated unique std::string keys in the underlying database. Pass nullptr to ignore.

  • timeout_ms – If positive, the maximum length of time this entire call can block while waiting for samples. After the timeout, the stream can be partially read, and the return value is needed to determine samples read.

Returns

the number of elements read. This will always be less than or equal to num_samples. For example, if there is a timeout, this could be a partially read buffer and so can be less than num_samples; this number can be less than num_samples even if there is no timeout given. Returns -1 if EOF is encountered; if -1 is returned, buffer is guaranteed to not have been touched (nor the other buffer objects like sizes/keys).

template<class DataT>
inline int64_t Tail(DataT *buffer, int timeout_ms = -1, char *key = nullptr, int64_t *sample_index = nullptr)

Returns the last element in the stream after the previously seen elements. Blocks until there’s at least one element available in the stream after the current cursor.

Parameters

timeout_ms – If positive, the maximum length of time this entire call can block while waiting for a sample. After the timeout there can be 0 or 1 elements read, and so the return value is needed to determine samples read.

Returns

the number of elements skipped and/or read, including the last element that might be written into the buffer. Thus, this will return 0 in the event of a timeout; this will return >= 1 iff buffer is changed. Returns -1 if there is an EOF in the stream.

int64_t TailBytes(char *buffer, int timeout_ms = -1, char *key = nullptr, int64_t *sample_index = nullptr)

Returns the last element in the stream after the previously seen elements. Blocks until there’s at least one element available in the stream after the current cursor if no timeout is given; else, waits for the timeout.

Parameters

timeout_ms – If positive, the maximum length of time this entire call can block while waiting for a sample. After the timeout there will be 0 or 1 elements read, and so the return value is needed to determine samples read.

Returns

the number of elements skipped and/or read, including the last element that might be written into the buffer. Thus, this will return 0 in the event of a timeout; this will return >= 1 iff buffer is changed. Returns -1 if there is an EOF in the stream.

int64_t Seek(const std::string &key)

Seeks the internal cursor to the given key. Any elements returned by read/tail will be after this element.

If the key that’s given is in the past &#8212; i.e., this StreamReader has already consumed past this key &#8212; then the cursor will not be moved, and no exception will be thrown.

Returns

the number of elements skipped. Thus, it returns 0 if the key given is in the past of the stream or if it is the current key. Returns -1 if EOF is hit while attempting to seek to this key (indicating the key given is greater than any key in the stream).

inline bool is_initialized()

Whether this stream has been initialized.

inline bool Good() const

Whether this stream is “good” for reading (similar to std::ifstream’s #good()). Synonymous with casting to bool.

inline explicit operator bool() const

Synonym for #good().

inline std::string eof_key()

If the stream has reached EOF, this will be the key that contained the EOF signal. Empty if not reached EOF.

Returns

inline int64_t initialized_at_us()

Time in microseconds since epoch of when this stream was initialized, with respect to the server time.

inline int64_t total_samples_read()

Number of samples that have been read since initialization of this stream.

void AddListener(internal::StreamReaderListener *listener)

Add a listener to this reader. Can be called at any point, even before initialization of the stream. See StreamReaderListener for more details.

const StreamSchema &schema()

The schema of this stream; only valid after #initialize() has been called. This schema contains useful information on the fields, in particular the name, order, and size of the fields within a single sample of this stream.

std::unordered_map<std::string, std::string> Metadata()

User metadata attached to this stream.

int64_t local_minus_server_clock_us()

Get the difference between the “local” clock with respect to the WRITER of the stream and the server clock, i.e. the redis server. Difference returned in microseconds.

void Stop()

Stops this reader from being used in the future. Redis connections are freed; read() will no longer work; good() will return false.

class StreamWriter

The main entry point for River for writing a new stream. Streams are defined by a schema and a stream name, both of which are given in the initialize() call. All samples written to this stream must belong to the same schema. Once there are no more elements in this stream, call stop(); this will signal to any other readers that the stream has ended.

Public Functions

explicit StreamWriter(const RedisConnection &connection, int64_t keys_per_redis_stream = int64_t{1LL << 24}, int batch_size = 1536)

Construct an instance of StreamWriter. One StreamWriter belongs to at most one stream.

Parameters
  • connection – Parameters to connect to Redis.

  • batch_size – Number of samples in a batch that will be written/read from redis. Increasing this number makes batches bigger and thus reduces the number of writes/reads to redis, but then also increases the average latency of the stream.

  • keys_per_redis_stream – the number of keys in each underlying redis stream. Default value reasoning is: 2^24 = 17M keys per stream => ~350MB of memory on 64-bit redis with 8-byte fields

void Initialize(const std::string &stream_name, const StreamSchema &schema, const std::unordered_map<std::string, std::string> &user_metadata = std::unordered_map<std::string, std::string>())

Initialize this stream for writing. The given stream name must be unique within the Redis used. This initialization puts necessary information (e.g. schemas and timestamps) into redis. Optionally, it can accept an unordered_map of user metadata to put in to Redis atomically.

template<class DataT>
inline void Write(DataT *data, int64_t num_samples, const int *sizes = nullptr)

Writes data to the stream. The given data buffer of type DataT will be recast to a raw (e.g. char *) array and written to redis according to each sample size. If the schema has only fixed-width fields, then the data buffer will be advanced according to the fixed-width size given in #initialize(); otherwise (i.e. if it has variable- width fields), the sizes buffer is necessary to determine the size of each sample.

void WriteBytes(const char *data, int64_t num_samples, const int *sizes = nullptr)

Writes raw bytes to the stream. For fixed-width fields, each sample will be assumed to be of the size defined in the schema from initialize(); otherwise, for variable-width fields, the sizes array is necessary.

const StreamSchema &schema()

A copy of the stream’s schema that was provided on initialize().

const std::string &stream_name()

The stream name belonging to this stream. Empty if it has not been initialized.

int64_t total_samples_written()

Number of samples written to this stream since initialization.

int64_t initialized_at_us()

Time in microseconds since epoch of when this stream was initialized, with respect to the server time.

std::unordered_map<std::string, std::string> Metadata()

User metadata attached to this stream.

void SetMetadata(const std::unordered_map<std::string, std::string> &metadata)

Sets the user metadata attached to this stream.

void Stop()

Stops this stream permanently. This method must be called once the stream is finished in order to notify readers that the stream has terminated.

class RedisConnection
class StreamSchema

The schema for a particular stream. A stream has exactly one schema over its lifetime; this schema defines both the writing and reading structure of the stream (and, if in use, the on-disk representation of the stream).

struct FieldDefinition

One or more fields that are present in each sample of a particular stream. This definition governs how this will be serialized to Redis and the columns in the persisted file.

While most field definitions are fixed-width, the VARIABLE_WIDTH_BYTES field is a bit different. If you want to use a variable-width bytes (e.g. a dynamic-length string or byte array), then specify VARIABLE_WIDTH_BYTES but this must be your only field; this is for simplicity for handling serialization/deserialization. In this case, the size should correspond to the MAX size possible for this field, which is needed when serializing/deserializing.

enum river::FieldDefinition::type

Values:

enumerator DOUBLE
enumerator FLOAT
enumerator INT16
enumerator INT32
enumerator INT64
enumerator FIXED_WIDTH_BYTES
enumerator VARIABLE_WIDTH_BYTES
class RedisException : public std::exception