Python Docs

class river.StreamReader

The main entry point for River for reading an existing stream. This class is initialized with a stream name corresponding to an existing stream, and allows for batch consumption of the stream. Reads requesting more data than is present in the stream will block.

After constructing a StreamReader, you must call initialize with the name of the stream you wish to read.

good

Whether this stream is “good” for reading (similar to std::ifstream’s #good()). Synonymous with casting to bool. Indicates whether more samples can be read from this stream via this StreamReader.

initialize(self, unicode stream_name: str, timeout_ms: int = -1)

Initialize this reader to a particular stream. If timeout_ms is positive, this call will wait for up to timeout_ms milliseconds for the stream to be created. When the timeout is exceeded or if no timeout was given and the stream does not exist, a StreamReaderException will be raised.

initialized_at_us
metadata
new_buffer(self, n: int) np.ndarray

Returns an empty NumPy buffer of size n with a dtype matching the stream’s schema. Note that the returned buffer is simply allocated and not also zeroed, so there’s likely to be “junk” seen in the returned array.

read(self: StreamReader, ndarray arr: np.ndarray, timeout_ms: int = -1) int

Read from the stream from where was last consumed. This call blocks until the desired number of samples is available in the underlying stream. The return value indicates how many samples were written to the buffer. If EOF has been reached, then #good() will return false, and any attempts to #read() will return -1.

Parameters
  • arr – The buffer into which data will be read from the stream. At most arr.size samples will be read from the stream and written into arr. The return value of this call (if nonnegative) tells how many samples, each of sample_size bytes as told by the schema, were written into the buffer. For VARIABLE_WIDTH_BYTES fields, ensure this buffer is large enough to capture the maximum possible read size.

  • timeout_ms – If positive, the maximum length of time this entire call can block while waiting for samples. After the timeout, the stream can be partially read, and the return value is needed to determine samples read.

Returns

the number of elements read. This will always be less than or equal to arr.size. For example, if there is a timeout, this could be a partially read buffer and so can be less than arr.size; this number can be less than arr.size even if there is no timeout given, in the case of an EOF on the stream. Returns -1 if EOF is encountered; if -1 is returned, buffer is guaranteed to not have been touched.

schema
stop(self) None

Stops this reader from being used in the future. Redis connections are freed; read() will no longer work; good() will return false.

stream_name
tail(self, ndarray arr, int timeout_ms=-1) int

Returns the last element in the stream after the previously seen elements. Blocks until there’s at least one element available in the stream after the current cursor if no timeout is given; else, waits for the timeout.

Parameters

timeout_ms – If positive, the maximum length of time this entire call can block while waiting for a sample. After the timeout there will be 0 or 1 elements read, and so the return value is needed to determine samples read.

Returns

the number of elements skipped and/or read, including the last element that might be written into the buffer. Thus, this will return 0 in the event of a timeout; this will return >= 1 iff buffer is changed. Returns -1 if there is an EOF in the stream.

total_samples_read
class river.StreamWriter

The main entry point for River for writing a new stream. Streams are defined by a schema and a stream name, both of which are given in the initialize() call. All samples written to this stream must belong to the same schema. Once there are no more elements in this stream, call stop(); this will signal to any other readers that the stream has ended.

initialize(self: StreamWriter, unicode stream_name: str, StreamSchema schema: StreamSchema, dict user_metadata: dict = None)

Initialize this stream for writing. The given stream name must be unique within the Redis used. This initialization puts necessary information (e.g. schemas and timestamps) into redis. Optionally, it can accept a dict of user metadata to put in to Redis atomically.

initialized_at_us
metadata

this does a call to Redis under-the-hood and so can incur a little bit of overhead.

Type

Returns all metadata set for this stream. Implementation note

new_buffer(self, n: int) np.ndarray

Returns an empty NumPy buffer of size n with a dtype matching the stream’s schema. Note that the returned buffer is simply allocated and not also zeroed, so there’s likely to be “junk” seen in the returned array.

schema
stop(self) None

Stops this stream permanently. This method must be called once the stream is finished in order to notify readers that the stream has terminated.

stream_name
total_samples_written
write(self, ndarray arr: np.ndarray) None

Writes data to the stream. Assumes that each element of the array arr is of the correct sample size (as defined by the stream’s schema’s sample size), and will write bytes in order of the schema fields. arr.size samples will be written to the stream.

class river.RedisConnection
redis_hostname
redis_password
redis_port
class river.StreamSchema

The schema for a particular stream. A stream has exactly one schema over its lifetime; this schema defines both the writing and reading structure of the stream (and, if in use, the on-disk representation of the stream).

dtype(self) np.dtype

Returns the equivalent NumPy dtype for this schema instance.

field_definitions
static from_dtype(dtype dtype: np.dtype)

Creates a StreamSchema from a given NumPy dtype. This dtype should be a “structured array” dtype, where the dtype is a collection of named fields. See https://numpy.org/doc/stable/user/basics.rec.html for more details.

class river.FieldDefinition

One or more fields that are present in each sample of a particular stream. This definition governs how this will be serialized to Redis and the columns in the persisted file. A collection of field definitions are housed within a StreamSchema.

While most field definitions are fixed-width( e.g. doubles, floats, etc.), the VARIABLE_WIDTH_BYTES field is a bit different. If you want to use a variable-width bytes (e.g. a dynamic-length string or byte array), then specify VARIABLE_WIDTH_BYTES but this must be your only field; this is for simplicity for handling serialization/deserialization. In this case, the size should correspond to the MAX size possible for this field, which is needed when serializing/deserializing.

name
type
class river.FieldType(value)

An enumeration.

DOUBLE = 0
FIXED_WIDTH_BYTES = 5
FLOAT = 1
INT16 = 2
INT32 = 3
INT64 = 4
VARIABLE_WIDTH_BYTES = 6
class river.RedisException
class river.StreamReaderException
class river.StreamWriterException
class river.StreamDoesNotExistException
class river.StreamExistsException