Reference¶
API¶
-
class
datastream.api.
Datastream
(backend)¶ Initializes the Datastream API.
Parameters: backend – Backend instance -
append
(stream_id, value, timestamp=None, check_timestamp=True)¶ Appends a datapoint into the datastream.
Parameters: - stream_id – Stream identifier
- value – Datapoint value
- timestamp – Datapoint timestamp, must be equal or larger (newer) than the latest one, monotonically increasing (optional)
- check_timestamp – Check if timestamp is equal or larger (newer) than the latest one (default: true)
Returns: A dictionary containing stream_id, granularity, and datapoint
-
backprocess_streams
(query_tags=None)¶ Requests the backend to backprocess any derived streams.
Parameters: query_tags – Tags that should be matched to streams
Removes (clears) all non-readonly stream tags.
Care should be taken that some tags are set immediately afterwards which uniquely identify a stream to be able to query the stream, in for example, ensure_stream.
Parameters: stream_id – Stream identifier
-
delete_streams
(query_tags=None)¶ Deletes datapoints for all streams matching the specified query tags. If no query tags are specified, all datastream-related data is deleted from the backend.
Parameters: query_tags – Tags that should be matched to streams
-
downsample_streams
(query_tags=None, until=None, return_datapoints=False)¶ Requests the backend to downsample all streams matching the specified query tags. Once a time range has been downsampled, new datapoints cannot be added to it anymore.
Parameters: - query_tags – Tags that should be matched to streams
- until – Timestamp until which to downsample, not including datapoints at a timestamp (optional, otherwise all until the current time)
- return_datapoints – Should newly downsampled datapoints be returned, this can potentially create a huge temporary list and memory consumption when downsampling many streams and datapoints
Returns: A list of dictionaries containing stream_id, granularity, and datapoint for each datapoint created while downsampling, if return_datapoints was set
-
ensure_stream
(query_tags, tags, value_downsamplers, highest_granularity, derive_from=None, derive_op=None, derive_args=None, value_type=None, value_type_options=None)¶ Ensures that a specified stream exists.
Parameters: - query_tags – A dictionary of tags which uniquely identify a stream
- tags – A dictionary of tags that should be used (together with query_tags) to create a stream when it doesn’t yet exist
- value_downsamplers – A set of names of value downsampler functions for this stream
- highest_granularity – Predicted highest granularity of the data the stream will store, may be used to optimize data storage
- derive_from – Create a derivate stream
- derive_op – Derivation operation
- derive_args – Derivation operation arguments
- value_type – Optional value type (defaults to numeric)
- value_type_options – Options specific to the value type
Returns: A stream identifier
-
find_streams
(query_tags=None)¶ Finds all streams matching the specified query tags.
Parameters: query_tags – Tags that should be matched to streams Returns: A Streams iterator over matched stream descriptors
-
get_data
(stream_id, granularity, start=None, end=None, start_exclusive=None, end_exclusive=None, reverse=False, value_downsamplers=None, time_downsamplers=None)¶ Retrieves data from a certain time range and of a certain granularity.
Parameters: - stream_id – Stream identifier
- granularity – Wanted granularity
- start – Time range start, including the start
- end – Time range end, excluding the end (optional)
- start_exclusive – Time range start, excluding the start
- end_exclusive – Time range end, excluding the end (optional)
- reverse – Should datapoints be returned in oldest to newest order (false), or in reverse (true)
- value_downsamplers – The list of downsamplers to limit datapoint values to (optional)
- time_downsamplers – The list of downsamplers to limit timestamp values to (optional)
Returns: A Datapoints iterator over datapoints
Returns the tags for the specified stream.
Parameters: stream_id – Stream identifier Returns: A dictionary of tags for the stream
-
remove_tag
(stream_id, tag)¶ Removes a stream tag.
Parameters: - stream_id – Stream identifier
- tag – Dictionary describing the tag(s) to remove (values are ignored)
Updates stream tags with new tags, overriding existing ones.
Parameters: - stream_id – Stream identifier
- tags – A dictionary of new tags
-
Backends¶
API operations are implemented in backends, which are responsible for storing datapoints, performing downsampling, deriving streams, and executing queries.
-
class
datastream.backends.mongodb.
Backend
(database_name, **connection_settings)¶ Initializes the MongoDB backend.
Parameters: - database_name – MongoDB database name
- connection_settings – Extra connection settings as defined for mongoengine.register_connection
Implementation Details¶
Streams are stored in the streams
collection, datapoints are stored in the datapoints.<granularity>
collections,
where <granularity>
is one of the possible granularity levels.
When performing downsampling, we have to differentiate between two timestamps:
- Datapoint timestamp is the timestamp of the datapoint that has been inserted for a given granularity level. On the highest granularity level it is always second precision. On lower granularity levels it is a dictionary of multiple values, depending on time downsamplers settings for a given stream.
- Internal datapoint timestamp (stored in datapoint’s
_id
) is based on a timespan for the given granularity level. For example, if a datapoint was inserted at 31-07-2012 12:23:52, then the downsampled internal timestamp for the timespan this datapoint is in for hour granularity would be 31-07-2012 12:00:00 and for month granularity would be 01-07-2012 00:00:00.
Based on highest_granularity
value, appended datapoints are stored in the collection configured by
highest_granularity
and only lower granularity values are downsampled. Requests for granularity
higher than highest_granularity
simply return values from highest_granularity
collection.
highest_granularity
is just an optimization to not store unnecessary datapoints for granularity levels
which would have at most one datapoint for their granularity timespans.
Value Downsamplers¶
-
mean
(key: m)¶ Average of all datapoints.
-
sum
(key: s)¶ Sum of all datapoints.
-
min
(key: l, for lower)¶ Minimum value of all dataponts.
-
max
(key: u, for upper)¶ Maximum value of all datapoints.
-
sum_squares
(key: q)¶ Sum of squares of all datapoints.
-
std_dev
(key: d)¶ Standard deviation of all datapoints.
-
count
(key: c)¶ Number of all datapoints.
-
most_often
(key: o, for often)¶ The most often occurring value of all datapoints.
-
least_often
(key: r, for rare)¶ The least often occurring value of all datapoints.
-
frequencies
(key: f)¶ For each value number of occurrences in all datapoints.
Time Downsamplers¶
-
mean
(key: m) Average of all timestamps.
-
first
(key: a, is the first in the alphabet)¶ The first timestamp of all datapoints.
-
last
(key: z, is the last in the alphabet)¶ The last timestamp of all datapoints.
Derive Operators¶
-
sum
(src_streams, dst_stream) Sum of multiple streams.
-
derivative
(src_stream, dst_stream)¶ Derivative of a stream.
-
counter_reset
(src_stream, dst_stream)¶ Generates a counter reset stream.
-
counter_derivative
([{'name': 'reset', 'stream': reset_stream_id}, {'stream': data_stream_id}, ]dst_stream, max_value=None)¶ Derivative of a monotonically increasing counter stream.
Exceptions¶
-
exception
datastream.exceptions.
DatastreamException
(*args, **kwargs)¶ The base class for all datastream API exceptions.
-
exception
datastream.exceptions.
StreamNotFound
(*args, **kwargs)¶ Raised when stream queried for is not found.
-
exception
datastream.exceptions.
MultipleStreamsReturned
(*args, **kwargs)¶ Raised when multiple streams found when queried for operations which operate on only one stream, like
ensure_stream()
. Specify more specific query tags.
-
exception
datastream.exceptions.
InconsistentStreamConfiguration
(*args, **kwargs)¶ Raised when stream configuration passed to
ensure_stream()
is inconsistent and/or conflicting.
-
exception
datastream.exceptions.
OutstandingDependenciesError
(*args, **kwargs)¶ Raised when stream cannot be deleted because it is a dependency for another stream.
-
exception
datastream.exceptions.
UnsupportedDownsampler
(*args, **kwargs)¶ Raised when downsampler requested is unsupported.
-
exception
datastream.exceptions.
UnsupportedGranularity
(*args, **kwargs)¶ Raised when granularity level requested is unsupported.
-
exception
datastream.exceptions.
UnsupportedDeriveOperator
(*args, **kwargs)¶ Raised when derive operator requested is unsupported.
-
exception
datastream.exceptions.
UnsupportedValueType
(*args, **kwargs)¶ Raised when value type requested is unsupported.
-
exception
datastream.exceptions.
ReservedTagNameError
(*args, **kwargs)¶ Raised when updating tags with a reserved tag name.
-
exception
datastream.exceptions.
InvalidTimestamp
(*args, **kwargs)¶ Raised when an invalid timestamp was provided.
-
exception
datastream.exceptions.
IncompatibleGranularities
(*args, **kwargs)¶ Raised when derived stream’s granularity is incompatible with source stream’s granularity.
-
exception
datastream.exceptions.
IncompatibleTypes
(*args, **kwargs)¶ Raised when derived stream’s value type is incompatible with source stream’s value type.
-
exception
datastream.exceptions.
AppendToDerivedStreamNotAllowed
(*args, **kwargs)¶ Raised when attempting to append to a derived stream.
-
exception
datastream.exceptions.
InvalidOperatorArguments
(*args, **kwargs)¶ Raised when derive operators received invalid arguments.
-
exception
datastream.exceptions.
LockExpiredMidMaintenance
(*args, **kwargs)¶ Raised when a maintenance lock expires inside a maintenance operation.
-
exception
datastream.exceptions.
StreamAppendContended
(*args, **kwargs)¶ Raised when too many processes are trying to append to the same stream.
-
exception
datastream.exceptions.
DatastreamWarning
(*args, **kwargs)¶ The base class for all datastream API runtime warnings.
-
exception
datastream.exceptions.
InvalidValueWarning
(*args, **kwargs)¶ Warning used when an invalid value is encountered.
-
exception
datastream.exceptions.
InternalInconsistencyWarning
(*args, **kwargs)¶ Warning used when an internal inconsistency is detected.
-
exception
datastream.exceptions.
DownsampleConsistencyNotGuaranteed
(*args, **kwargs)¶ Warning used when consistency of downsampled values with original datapoints is no longer guaranteed due to some condition. Reseting downsample state and redoing downsampling could be necessary.