Reference

API

class datastream.api.Datastream(backend)

Initializes the Datastream API.

Parameters:backend – Backend instance
append(stream_id, value, timestamp=None, check_timestamp=True)

Appends a datapoint into the datastream.

Parameters:
  • stream_id – Stream identifier
  • value – Datapoint value
  • timestamp – Datapoint timestamp, must be equal or larger (newer) than the latest one, monotonically increasing (optional)
  • check_timestamp – Check if timestamp is equal or larger (newer) than the latest one (default: true)
Returns:

A dictionary containing stream_id, granularity, and datapoint

backprocess_streams(query_tags=None)

Requests the backend to backprocess any derived streams.

Parameters:query_tags – Tags that should be matched to streams
clear_tags(stream_id)

Removes (clears) all non-readonly stream tags.

Care should be taken that some tags are set immediately afterwards which uniquely identify a stream to be able to query the stream, in for example, ensure_stream.

Parameters:stream_id – Stream identifier
delete_streams(query_tags=None)

Deletes datapoints for all streams matching the specified query tags. If no query tags are specified, all datastream-related data is deleted from the backend.

Parameters:query_tags – Tags that should be matched to streams
downsample_streams(query_tags=None, until=None, return_datapoints=False)

Requests the backend to downsample all streams matching the specified query tags. Once a time range has been downsampled, new datapoints cannot be added to it anymore.

Parameters:
  • query_tags – Tags that should be matched to streams
  • until – Timestamp until which to downsample, not including datapoints at a timestamp (optional, otherwise all until the current time)
  • return_datapoints – Should newly downsampled datapoints be returned, this can potentially create a huge temporary list and memory consumption when downsampling many streams and datapoints
Returns:

A list of dictionaries containing stream_id, granularity, and datapoint for each datapoint created while downsampling, if return_datapoints was set

ensure_stream(query_tags, tags, value_downsamplers, highest_granularity, derive_from=None, derive_op=None, derive_args=None, value_type=None, value_type_options=None)

Ensures that a specified stream exists.

Parameters:
  • query_tags – A dictionary of tags which uniquely identify a stream
  • tags – A dictionary of tags that should be used (together with query_tags) to create a stream when it doesn’t yet exist
  • value_downsamplers – A set of names of value downsampler functions for this stream
  • highest_granularity – Predicted highest granularity of the data the stream will store, may be used to optimize data storage
  • derive_from – Create a derivate stream
  • derive_op – Derivation operation
  • derive_args – Derivation operation arguments
  • value_type – Optional value type (defaults to numeric)
  • value_type_options – Options specific to the value type
Returns:

A stream identifier

find_streams(query_tags=None)

Finds all streams matching the specified query tags.

Parameters:query_tags – Tags that should be matched to streams
Returns:A Streams iterator over matched stream descriptors
get_data(stream_id, granularity, start=None, end=None, start_exclusive=None, end_exclusive=None, reverse=False, value_downsamplers=None, time_downsamplers=None)

Retrieves data from a certain time range and of a certain granularity.

Parameters:
  • stream_id – Stream identifier
  • granularity – Wanted granularity
  • start – Time range start, including the start
  • end – Time range end, excluding the end (optional)
  • start_exclusive – Time range start, excluding the start
  • end_exclusive – Time range end, excluding the end (optional)
  • reverse – Should datapoints be returned in oldest to newest order (false), or in reverse (true)
  • value_downsamplers – The list of downsamplers to limit datapoint values to (optional)
  • time_downsamplers – The list of downsamplers to limit timestamp values to (optional)
Returns:

A Datapoints iterator over datapoints

get_tags(stream_id)

Returns the tags for the specified stream.

Parameters:stream_id – Stream identifier
Returns:A dictionary of tags for the stream
remove_tag(stream_id, tag)

Removes a stream tag.

Parameters:
  • stream_id – Stream identifier
  • tag – Dictionary describing the tag(s) to remove (values are ignored)
update_tags(stream_id, tags)

Updates stream tags with new tags, overriding existing ones.

Parameters:
  • stream_id – Stream identifier
  • tags – A dictionary of new tags

Backends

API operations are implemented in backends, which are responsible for storing datapoints, performing downsampling, deriving streams, and executing queries.

class datastream.backends.mongodb.Backend(database_name, **connection_settings)

Initializes the MongoDB backend.

Parameters:
  • database_name – MongoDB database name
  • connection_settings – Extra connection settings as defined for mongoengine.register_connection

Implementation Details

Streams are stored in the streams collection, datapoints are stored in the datapoints.<granularity> collections, where <granularity> is one of the possible granularity levels.

When performing downsampling, we have to differentiate between two timestamps:

  • Datapoint timestamp is the timestamp of the datapoint that has been inserted for a given granularity level. On the highest granularity level it is always second precision. On lower granularity levels it is a dictionary of multiple values, depending on time downsamplers settings for a given stream.
  • Internal datapoint timestamp (stored in datapoint’s _id) is based on a timespan for the given granularity level. For example, if a datapoint was inserted at 31-07-2012 12:23:52, then the downsampled internal timestamp for the timespan this datapoint is in for hour granularity would be 31-07-2012 12:00:00 and for month granularity would be 01-07-2012 00:00:00.

Based on highest_granularity value, appended datapoints are stored in the collection configured by highest_granularity and only lower granularity values are downsampled. Requests for granularity higher than highest_granularity simply return values from highest_granularity collection. highest_granularity is just an optimization to not store unnecessary datapoints for granularity levels which would have at most one datapoint for their granularity timespans.

Value Downsamplers

mean(key: m)

Average of all datapoints.

sum(key: s)

Sum of all datapoints.

min(key: l, for lower)

Minimum value of all dataponts.

max(key: u, for upper)

Maximum value of all datapoints.

sum_squares(key: q)

Sum of squares of all datapoints.

std_dev(key: d)

Standard deviation of all datapoints.

count(key: c)

Number of all datapoints.

most_often(key: o, for often)

The most often occurring value of all datapoints.

least_often(key: r, for rare)

The least often occurring value of all datapoints.

frequencies(key: f)

For each value number of occurrences in all datapoints.

Time Downsamplers

mean(key: m)

Average of all timestamps.

first(key: a, is the first in the alphabet)

The first timestamp of all datapoints.

last(key: z, is the last in the alphabet)

The last timestamp of all datapoints.

Derive Operators

sum(src_streams, dst_stream)

Sum of multiple streams.

derivative(src_stream, dst_stream)

Derivative of a stream.

counter_reset(src_stream, dst_stream)

Generates a counter reset stream.

counter_derivative([{'name': 'reset', 'stream': reset_stream_id}, {'stream': data_stream_id}, ]dst_stream, max_value=None)

Derivative of a monotonically increasing counter stream.

Exceptions

exception datastream.exceptions.DatastreamException(*args, **kwargs)

The base class for all datastream API exceptions.

exception datastream.exceptions.StreamNotFound(*args, **kwargs)

Raised when stream queried for is not found.

exception datastream.exceptions.MultipleStreamsReturned(*args, **kwargs)

Raised when multiple streams found when queried for operations which operate on only one stream, like ensure_stream(). Specify more specific query tags.

exception datastream.exceptions.InconsistentStreamConfiguration(*args, **kwargs)

Raised when stream configuration passed to ensure_stream() is inconsistent and/or conflicting.

exception datastream.exceptions.OutstandingDependenciesError(*args, **kwargs)

Raised when stream cannot be deleted because it is a dependency for another stream.

exception datastream.exceptions.UnsupportedDownsampler(*args, **kwargs)

Raised when downsampler requested is unsupported.

exception datastream.exceptions.UnsupportedGranularity(*args, **kwargs)

Raised when granularity level requested is unsupported.

exception datastream.exceptions.UnsupportedDeriveOperator(*args, **kwargs)

Raised when derive operator requested is unsupported.

exception datastream.exceptions.UnsupportedValueType(*args, **kwargs)

Raised when value type requested is unsupported.

exception datastream.exceptions.ReservedTagNameError(*args, **kwargs)

Raised when updating tags with a reserved tag name.

exception datastream.exceptions.InvalidTimestamp(*args, **kwargs)

Raised when an invalid timestamp was provided.

exception datastream.exceptions.IncompatibleGranularities(*args, **kwargs)

Raised when derived stream’s granularity is incompatible with source stream’s granularity.

exception datastream.exceptions.IncompatibleTypes(*args, **kwargs)

Raised when derived stream’s value type is incompatible with source stream’s value type.

exception datastream.exceptions.AppendToDerivedStreamNotAllowed(*args, **kwargs)

Raised when attempting to append to a derived stream.

exception datastream.exceptions.InvalidOperatorArguments(*args, **kwargs)

Raised when derive operators received invalid arguments.

exception datastream.exceptions.LockExpiredMidMaintenance(*args, **kwargs)

Raised when a maintenance lock expires inside a maintenance operation.

exception datastream.exceptions.StreamAppendContended(*args, **kwargs)

Raised when too many processes are trying to append to the same stream.

exception datastream.exceptions.DatastreamWarning(*args, **kwargs)

The base class for all datastream API runtime warnings.

exception datastream.exceptions.InvalidValueWarning(*args, **kwargs)

Warning used when an invalid value is encountered.

exception datastream.exceptions.InternalInconsistencyWarning(*args, **kwargs)

Warning used when an internal inconsistency is detected.

exception datastream.exceptions.DownsampleConsistencyNotGuaranteed(*args, **kwargs)

Warning used when consistency of downsampled values with original datapoints is no longer guaranteed due to some condition. Reseting downsample state and redoing downsampling could be necessary.