class datastream.api.Datastream(backend)

Initializes the Datastream API.

Parameters:backend – Backend instance
append(stream_id, value, timestamp=None, check_timestamp=True)

Appends a datapoint into the datastream.

  • stream_id – Stream identifier
  • value – Datapoint value
  • timestamp – Datapoint timestamp, must be equal or larger (newer) than the latest one, monotonically increasing (optional)
  • check_timestamp – Check if timestamp is equal or larger (newer) than the latest one (default: true)

A dictionary containing stream_id, granularity, and datapoint


Appends multiple datapoints into the datastream. Each datapoint should be described by a dictionary with fields stream_id, value and timestamp, which are the same as in append.

Parameters:datapoints – A list of datapoints to append

Requests the backend to backprocess any derived streams.

Parameters:query_tags – Tags that should be matched to streams

Removes (clears) all non-readonly stream tags.

Care should be taken that some tags are set immediately afterwards which uniquely identify a stream to be able to query the stream, in for example, ensure_stream.

Parameters:stream_id – Stream identifier

Deletes datapoints for all streams matching the specified query tags. If no query tags are specified, all datastream-related data is deleted from the backend.

Parameters:query_tags – Tags that should be matched to streams
downsample_streams(query_tags=None, until=None, return_datapoints=False, filter_stream=None)

Requests the backend to downsample all streams matching the specified query tags. Once a time range has been downsampled, new datapoints cannot be added to it anymore.

  • query_tags – Tags that should be matched to streams
  • until – Timestamp until which to downsample, not including datapoints at a timestamp (optional, otherwise all until the current time)
  • return_datapoints – Should newly downsampled datapoints be returned, this can potentially create a huge temporary list and memory consumption when downsampling many streams and datapoints
  • filter_stream – An optional callable which returns false for streams that should be skipped

A list of dictionaries containing stream_id, granularity, and datapoint for each datapoint created while downsampling, if return_datapoints was set

ensure_stream(query_tags, tags, value_downsamplers, highest_granularity, derive_from=None, derive_op=None, derive_args=None, value_type=None, value_type_options=None, derive_backprocess=True)

Ensures that a specified stream exists.

  • query_tags – A dictionary of tags which uniquely identify a stream
  • tags – A dictionary of tags that should be used (together with query_tags) to create a stream when it doesn’t yet exist
  • value_downsamplers – A set of names of value downsampler functions for this stream
  • highest_granularity – Predicted highest granularity of the data the stream will store, may be used to optimize data storage
  • derive_from – Create a derivate stream
  • derive_op – Derivation operation
  • derive_args – Derivation operation arguments
  • value_type – Optional value type (defaults to numeric)
  • value_type_options – Options specific to the value type
  • derive_backprocess – Should a derived stream be backprocessed

A stream identifier


Finds all streams matching the specified query tags.

Parameters:query_tags – Tags that should be matched to streams
Returns:A Streams iterator over matched stream descriptors
get_data(stream_id, granularity, start=None, end=None, start_exclusive=None, end_exclusive=None, reverse=False, value_downsamplers=None, time_downsamplers=None)

Retrieves data from a certain time range and of a certain granularity.

  • stream_id – Stream identifier
  • granularity – Wanted granularity
  • start – Time range start, including the start
  • end – Time range end, excluding the end (optional)
  • start_exclusive – Time range start, excluding the start
  • end_exclusive – Time range end, excluding the end (optional)
  • reverse – Should datapoints be returned in oldest to newest order (false), or in reverse (true)
  • value_downsamplers – The list of downsamplers to limit datapoint values to (optional)
  • time_downsamplers – The list of downsamplers to limit timestamp values to (optional)

A Datapoints iterator over datapoints


Returns the tags for the specified stream.

Parameters:stream_id – Stream identifier
Returns:A dictionary of tags for the stream
remove_tag(stream_id, tag)

Removes a stream tag.

  • stream_id – Stream identifier
  • tag – Dictionary describing the tag(s) to remove (values are ignored)
update_tags(stream_id, tags)

Updates stream tags with new tags, overriding existing ones.

  • stream_id – Stream identifier
  • tags – A dictionary of new tags


API operations are implemented in backends, which are responsible for storing datapoints, performing downsampling, deriving streams, and executing queries.

Implementation Details

Streams are stored in the streams collection, datapoints are stored in the datapoints.<granularity> collections, where <granularity> is one of the possible granularity levels.

When performing downsampling, we have to differentiate between two timestamps:

  • Datapoint timestamp is the timestamp of the datapoint that has been inserted for a given granularity level. On the highest granularity level it is always second precision. On lower granularity levels it is a dictionary of multiple values, depending on time downsamplers settings for a given stream.
  • Internal datapoint timestamp (stored in datapoint’s _id) is based on a timespan for the given granularity level. For example, if a datapoint was inserted at 31-07-2012 12:23:52, then the downsampled internal timestamp for the timespan this datapoint is in for hour granularity would be 31-07-2012 12:00:00 and for month granularity would be 01-07-2012 00:00:00.

Based on highest_granularity value, appended datapoints are stored in the collection configured by highest_granularity and only lower granularity values are downsampled. Requests for granularity higher than highest_granularity simply return values from highest_granularity collection. highest_granularity is just an optimization to not store unnecessary datapoints for granularity levels which would have at most one datapoint for their granularity timespans.

Value Downsamplers

mean(key: m)

Average of all datapoints.

sum(key: s)

Sum of all datapoints.

min(key: l, for lower)

Minimum value of all dataponts.

max(key: u, for upper)

Maximum value of all datapoints.

sum_squares(key: q)

Sum of squares of all datapoints.

std_dev(key: d)

Standard deviation of all datapoints.

count(key: c)

Number of all datapoints.

most_often(key: o, for often)

The most often occurring value of all datapoints.

least_often(key: r, for rare)

The least often occurring value of all datapoints.

frequencies(key: f)

For each value number of occurrences in all datapoints.

Time Downsamplers

mean(key: m)

Average of all timestamps.

first(key: a, is the first in the alphabet)

The first timestamp of all datapoints.

last(key: z, is the last in the alphabet)

The last timestamp of all datapoints.

Derive Operators

sum(src_streams, dst_stream)

Sum of multiple streams.

derivative(src_stream, dst_stream)

Derivative of a stream.

counter_reset(src_stream, dst_stream)

Generates a counter reset stream.

counter_derivative([{'name': 'reset', 'stream': reset_stream_id}, {'stream': data_stream_id}, ]dst_stream, max_value=None)

Derivative of a monotonically increasing counter stream.


exception datastream.exceptions.DatastreamException(*args, **kwargs)

The base class for all datastream API exceptions.

exception datastream.exceptions.StreamNotFound(*args, **kwargs)

Raised when stream queried for is not found.

exception datastream.exceptions.MultipleStreamsReturned(*args, **kwargs)

Raised when multiple streams found when queried for operations which operate on only one stream, like ensure_stream(). Specify more specific query tags.

exception datastream.exceptions.InconsistentStreamConfiguration(*args, **kwargs)

Raised when stream configuration passed to ensure_stream() is inconsistent and/or conflicting.

exception datastream.exceptions.OutstandingDependenciesError(*args, **kwargs)

Raised when stream cannot be deleted because it is a dependency for another stream.

exception datastream.exceptions.UnsupportedDownsampler(*args, **kwargs)

Raised when downsampler requested is unsupported.

exception datastream.exceptions.UnsupportedGranularity(*args, **kwargs)

Raised when granularity level requested is unsupported.

exception datastream.exceptions.UnsupportedDeriveOperator(*args, **kwargs)

Raised when derive operator requested is unsupported.

exception datastream.exceptions.UnsupportedValueType(*args, **kwargs)

Raised when value type requested is unsupported.

exception datastream.exceptions.ReservedTagNameError(*args, **kwargs)

Raised when updating tags with a reserved tag name.

exception datastream.exceptions.InvalidTimestamp(*args, **kwargs)

Raised when an invalid timestamp was provided.

exception datastream.exceptions.IncompatibleGranularities(*args, **kwargs)

Raised when derived stream’s granularity is incompatible with source stream’s granularity.

exception datastream.exceptions.IncompatibleTypes(*args, **kwargs)

Raised when derived stream’s value type is incompatible with source stream’s value type.

exception datastream.exceptions.AppendToDerivedStreamNotAllowed(*args, **kwargs)

Raised when attempting to append to a derived stream.

exception datastream.exceptions.InvalidOperatorArguments(*args, **kwargs)

Raised when derive operators received invalid arguments.

exception datastream.exceptions.LockExpiredMidMaintenance(*args, **kwargs)

Raised when a maintenance lock expires inside a maintenance operation.

exception datastream.exceptions.StreamAppendContended(*args, **kwargs)

Raised when too many processes are trying to append to the same stream.

exception datastream.exceptions.StreamAppendFailed(*args, **kwargs)

Raised when a backend fails while inserting into a stream.

exception datastream.exceptions.DatastreamWarning(*args, **kwargs)

The base class for all datastream API runtime warnings.

exception datastream.exceptions.InvalidValueWarning(*args, **kwargs)

Warning used when an invalid value is encountered.

exception datastream.exceptions.InternalInconsistencyWarning(*args, **kwargs)

Warning used when an internal inconsistency is detected.

exception datastream.exceptions.DownsampleConsistencyNotGuaranteed(*args, **kwargs)

Warning used when consistency of downsampled values with original datapoints is no longer guaranteed due to some condition. Reseting downsample state and redoing downsampling could be necessary.