Auxiliary Tools
The Cumulocity Python API's standard packages (see
Main API classes and
Model classes) represent what the
Cumulocity REST API provides. The c8y_tk (for "Cumulocity
toolkit") module provides additional auxiliary tools that cover useful
functionality beyond the REST API but applicable in many projects.
The c8y_tk.notification2 module provides listener
implementations that allow straightforward development of Notification
2.0 applications without additional overhead.
→ Listener
→ QueueListener
→ AsyncListener
→ AsyncQueueListener
The c8y_tk.analytics module provides helper functions that
allow parallel query processing to maximise performance when dealing
with large datasets as well as easy transformation of a Cumulocity
Series to Pandas' data frames and series as well as NumPy arrays.
→ ParallelExecutor
→ to_data_frame
→ to_numpy
→ to_series
The c8y_tk.app module provides auxiliary tools for
implementing both interactive and micro service applications.
→ CumulocityApp
→ SubscriptionListener
CumulocityApp
Cumulocity API wrapper to be used for interactive sessions.
As a context manager it ensures that a valid Cumulocity connection is available at runtime. It uses standard environment variables when defined (C8Y_BASEURL, C8Y_TENANT, C8Y_USER, C8Y_PASSWORD, as well as C8Y_TOKEN) and interactively requests updated information in case some data is missing.
with CumulocityApp() as c8y:
alarms = c8y.alarms.get_all(type='cx_MyAlarm')
...
SubscriptionListener
Multi-tenant subscription listener.
When implementing a multi-tenant microservice it is sometimes required to
keep track of the tenants which subscribe to the microservice.
Effectively, this needs to be done via polling the get_subscribers
function of the MultiTenantCumulocityApp class.
The SubscriptionListener class provides a default implementation of
such a polling mechanism which can be easily integrated using callbacks.
add_callback(callback: Callable[[Union[str, List[str]]], None], blocking: bool = True, when: str = 'any') -> SubscriptionListener
Add a callback function to be invoked if a tenant subscribes to/unsubscribes from the monitored multi-tenant microservice.
Note: multiple callbacks (even listening to the same event) can
be defined. The add_callback function supports a fluent interface,
i.e. it can be chained, to ease configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable
|
A callback function to invoke in case
of a change in subscribers. If parameter |
required |
blocking
|
bool
|
Whether to invoke the callback function in a blocking fashion (default) or not. If False, a thread is spawned for each invocation. |
True
|
when
|
str
|
When to invoke this particular callback function. If "added" or "removed" the callback function is invoked with a single tenant ID for every added/removed subscriber respectively. Otherwise (or if "always/any"), the callback function is invoked with a list of the current subscriber's tenant IDs. |
'any'
|
await_callbacks(timeout: float = None)
Await running callbacks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum wait time (None to wait indefinitely) |
None
|
Raises:
| Type | Description |
|---|---|
TimeoutError
|
if there are still running callbacks after the specified timeout. |
get_callbacks() -> List[Future]
Get currently running callbacks.
This function can be used to gain direct access to the currently running callback threads. Usually, this is not necessary.
See also
Function await_callbacks to await the completion of all
currently running callback threads.
listen()
Start the listener.
This is blocking.
shutdown(timeout: float = None)
Shutdown the listener thread and wait for it to finish.
This function can only be invoked if the listener thread was started
using the start function (i.e. the thread is managed by this class).
Otherwise, the stop function should be used.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum wait time (None to wait indefinitely). |
None
|
Raises:
| Type | Description |
|---|---|
TimeoutError
|
if the shutdown could not complete within the specified timeout. The shutdown procedure is not interrupted by this and will complete eventually. |
start() -> threading.Thread
Start the listener in a separate thread.
This function will return immediately. The listening can be stopped
using the shutdown function.
Returns:
| Type | Description |
|---|---|
Thread
|
The created Thread. |
stop()
Signal to stop the listening thread.
This function returns immediately; neither the completion of the
listen function, nor potentially running callbacks are awaited.
Use this, if the listen function is running in a thread managed
by your code.
See also
Function await_callbacks, to await the completion of potentially
running callback functions.
Functions start and shutdown if you don't want to manage the
listening thread on your own.
Listener
Synchronous (blocking) Notification 2.0 listener.
Notification 2.0 events are distributed via Pulsar topics, communicating via websockets.
This class encapsulates the Notification 2.0 communication protocol, providing a standard callback mechanism.
Note: Listening with callback requires some sort of parallelism. This
listener is implemented in a blocking fashion, it therefore requires
the use of treads or subprocesses to ensure the parallelism.
Class AsyncListener implements the same functionality using a
non-blocking asynchronous approach.
See also: https://cumulocity.com/guides/reference/notifications/
Message
Represents a Notification 2.0 message.
This class is intended to be used with class Listener only.
ack()
Acknowledge the message.
ack(msg_id: str = None, payload: str = None) -> None
Acknowledge a Notification 2.0 message.
Either a valid Notification 2.0 message ID or payload needs to be provided. The message ID is extracted from the payload if necessary and sends it to the channel to signal the message handling completeness.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg_id
|
str
|
Message ID to acknowledge. |
None
|
payload
|
str
|
Raw Notification 2.0 message payload. |
None
|
See also
- Function
Message.ackto acknowledge a specific Notification 2.0 message directly. Listenerparameterauto_ack=Trueto automatically acknowledge a processed message
listen(callback: Callable[[Message], None])
Listen and handle messages.
This function starts listening for new Notification 2.0 messages on
the websocket channel. Each received message is wrapped in a Message
object and forwarded to the callback function for handling.
The messages are not automatically acknowledged. This can be done
via the Message object's ack function by the callback function.
Note: the callback function is invoked as a task and not awaited.
This function will automatically handle the websocket communication
including the authentication via tokens and reconnecting on
connection loss. It will end when the listener is closed using its
close function.
send(payload: str)
Send a custom message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
str
|
Message payload to send. |
required |
start(callback: Callable[[Message], None]) -> threading.Thread
Start the listener.
This function will start the listening process (listen function)
within a thread and register the callback function to be invoked
on every subscribed notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[Message], None]
|
Function to be invoked on notifications |
required |
Returns:
| Type | Description |
|---|---|
Thread
|
The listener thread. |
stop()
Stop the listener.
unsubscribe()
Unsubscribe the listener.
Manually unsubscribing is required if the listener wasn't created
with auto_unsubscribe=True.
See also https://cumulocity.com/api/core/#section/Overview/Consumers-and-tokens
wait(timeout=None) -> bool
Wait for the listener to stop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Timeout in seconds. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
Whether the listener has stopped (before timeout). |
AsyncListener
Asynchronous Notification 2.0 listener.
Notification 2.0 events are distributed via Pulsar topics, communicating via websockets.
This class encapsulates the Notification 2.0 communication protocol, providing a standard callback mechanism.
Note: Listening with callback requires some sort of parallelism. This
listener is implemented in a non-blocking fashion using Python coroutines.
Class Listener implements the same functionality using a classic,
blocking approach.
See also: https://cumulocity.com/guides/reference/notifications/
Message
Represents a Notification 2.0 message.
This class is intended to be used with class AsyncListener only.
ack()
async
Acknowledge the message.
ack(msg_id: str = None, payload: str = None)
async
Acknowledge a Notification 2.0 message.
Either a valid Notification 2.0 message ID or payload needs to be provided. The message ID is extracted from the payload if necessary and sends it to the channel to signal the message handling completeness.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg_id
|
str
|
Message ID to acknowledge. |
None
|
payload
|
str
|
Raw Notification 2.0 message payload. |
None
|
See also
- Function
Message.ackto acknowledge a specific Notification 2.0 message directly. Listenerparameterauto_ack=Trueto automatically acknowledge a processed message
listen(callback: Callable[[AsyncListener.Message], Awaitable[None]])
async
Listen and handle messages.
This function starts listening for new Notification 2.0 messages on
the websocket channel. Each received message is wrapped in a Message
object and forwarded to the callback function for handling.
The messages are not automatically acknowledged. This can be done
via the Message object's ack function by the callback function.
Note: the callback function is invoked as a task and not awaited.
This function will automatically handle the websocket communication
including the authentication via tokens and reconnecting on
connection loss. It will end when the listener is closed using its
close function.
send(payload: str)
async
Send a custom message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
str
|
Message payload to send. |
required |
start(callback: Callable[[AsyncListener.Message], Awaitable[None]])
Start the listener.
This function will start the listening process (listen function)
and register the callback function to be invoked on every subscribed
notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[Message], Awaitable[None]]
|
Function to be invoked on notifications |
required |
Returns:
| Type | Description |
|---|---|
|
Created listener task. |
stop()
Signal the listener to be stopped.
unsubscribe()
Unsubscribe the listener.
Manually unsubscribing is required if the listener wasn't created
with auto_unsubscribe=True.
See also https://cumulocity.com/api/core/#section/Overview/Consumers-and-tokens
wait(timeout=None)
async
Wait for the listener task to finish.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
int
|
The number of seconds to wait for the listener to finish. The listener will be cancelled if the timeout occurs. |
None
|
QueueListener
Special listener implementation which pushes notification messages into a standard (sync) queue which can be monitored and read.
start()
Start the listener.
stop()
Stop the listener.
wait(timeout=None) -> bool
Wait for the listener task to finish.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
int
|
The number of seconds to wait for the listener to finish. The listener will be cancelled if the timeout occurs. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
Whether the listener has stopped (before timeout). |
AsyncQueueListener
Special listener implementation which pushes notification messages into a standard (async) queue which can be monitored and read.
start()
Start the listener.
stop()
Stop the listener.
wait(timeout=None)
async
Wait for the listener task to finish.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
int
|
The number of seconds to wait for the listener to finish. The listener will be cancelled if the timeout occurs. |
None
|
ParallelExecutor
Parallel execution context.
Use this class to run multiple select, get_all (batched) or collect
(for measurements only) API calls in parallel for better throughput and
overall performance by reducing I/O wait time.
The select, get_all, and collect functions can be invoked just as
if the corresponding API is invoked directly; the additional parameters
will be passed directly as-is. This includes the as_values parameter
which can be used to directly parse the JSON into tuples.
This class should be used as a context manager, i.e.
with ParallelExecutor() as executor:
queue = executor.select()
...
However, it defines multiple static methods which handle the context and can be used synchronously, i.e.
# read all devices of type 'myType' using threads
all_devices = ParallelExecutor.as_list(c8y.device_inventory, type='myType'):
See also ParallelExecutor.as_list, ParallelExecutor.as_records, ParallelExecutor.as_dataframe
as_dataframe(api, workers: int = 5, strategy: str = 'pages', columns: list = None, mapping: dict = None, **kwargs) -> pd.DataFrame
staticmethod
Read data via a Cumulocity API concurrently.
If mapping is provided, the API call results are mapped as
corresponding columns within the result dataframe. Otherwise, it is
assumed that the as_values parameter is provided (for the
underlying API calls) and the result tuples are directly mapped to
columns within the result dataframe. The columns parameter can
be used to provide specific column names (default: c0, c1 ...).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api
|
CumulocityResource
|
An Cumulocity API instances; e.g.
Events or Alarms. The API needs to support the |
required |
workers
|
int
|
The number of parallel processes to use |
5
|
strategy
|
str
|
The strategy to use for parallelization; Currently, only 'pages' is supported. |
'pages'
|
mapping
|
dict
|
A mapping of simplified JSON paths to columns. |
None
|
columns
|
list
|
A list of column names. |
None
|
**kwargs
|
Additional keyword arguments to pass to the underlying API calls. |
{}
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
The collected data as Pandas DataFrame. |
See also c8y_api.model.as_ for more information about the
mapping syntax.
as_list(api, workers: int = 5, strategy: str = 'pages', **kwargs) -> list
staticmethod
Read data via a Cumulocity API concurrently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api
|
CumulocityResource
|
An Cumulocity API instances; e.g.
Events or Alarms. The API needs to support the |
required |
workers
|
int
|
The number of parallel processes to use |
5
|
strategy
|
str
|
The strategy to use for parallelization; Currently, only 'pages' is supported. |
'pages'
|
**kwargs
|
Additional keyword arguments to pass to the underlying API calls. |
{}
|
Returns:
| Type | Description |
|---|---|
list
|
The collected data as list; These can Python objects or tuples |
list
|
if the |
as_records(api, workers: int = 5, strategy: str = 'pages', mapping: dict = None, **kwargs) -> list[dict]
staticmethod
Read data via a Cumulocity API concurrently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api
|
CumulocityResource
|
An Cumulocity API instances; e.g.
Events or Alarms. The API needs to support the |
required |
workers
|
int
|
The number of parallel processes to use |
5
|
strategy
|
str
|
The strategy to use for parallelization; Currently, only 'pages' is supported. |
'pages'
|
mapping
|
dict
|
A mapping of simplified JSON paths to record field names. |
None
|
**kwargs
|
Additional keyword arguments to pass to the underlying API calls. |
{}
|
Returns:
| Type | Description |
|---|---|
list[dict]
|
The collected data as list of records/dictionaries. |
See also c8y_api.model.as_records for more information about the
mapping syntax.
get_all(api: CumulocityResource, strategy: str = 'pages', **kwargs) -> Queue
Perform multiple get_all API calls in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api
|
CumulocityResource
|
An Cumulocity API instances, e.g. Events
or Alarms; the API needs to support the |
required |
strategy
|
str
|
The strategy to use for parallelization; Currently, strategy 'pages' and 'dates' are supported. The 'dates' strategy required a defined date range; the upper bound is assumed to be 'now' of omitted. |
'pages'
|
**kwargs
|
Additional keyword arguments to pass to |
{}
|
Returns:
| Type | Description |
|---|---|
Queue
|
A Queue instance which is filled asynchronously with the list |
Queue
|
results returned by the |
Queue
|
list of tuples if the |
Queue
|
the JSON documents. |
parallel(*functions) -> ParallelExecutorResult
Perform a collection of functions in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
functions
|
Iterable of functions to run in parallel; this can be variable args or any iterable including a generator. |
()
|
Returns:
| Type | Description |
|---|---|
ParallelExecutorResult
|
A ParallelExecutorResult object which provides functionality |
ParallelExecutorResult
|
conveniently collect the results. |
Note: Do not use lambda to define function with bound variables;
use functools.partial instead.
See also: https://stackoverflow.com/questions/23400785
select(api: CumulocityResource, strategy: str = 'pages', **kwargs) -> Queue
Perform multiple select API calls in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api
|
CumulocityResource
|
An Cumulocity API instances, e.g. Events
or Alarms; the API needs to support the |
required |
strategy
|
str
|
The strategy to use for parallelization; Currently, only 'pages' is supported. |
'pages'
|
**kwargs
|
Additional keyword arguments to pass to |
{}
|
Returns:
| Type | Description |
|---|---|
Queue
|
A Queue instance which is filled asynchronously with the results |
Queue
|
yielded by the |
Queue
|
|
to_data_frame(data: Series, series: str | list[str] = None, value: str = None, timestamps: bool | str = None)
Build a Pandas DataFrame from a Cumulocity Series object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Series
|
A c8y_api Series object |
required |
series
|
str | list
|
A series' name or a collection of series names; If omitted, all available series are extracted. The series names will be used as column names (special characters will be replaced) |
None
|
value
|
str
|
The value (min/max) to extract; If omitted, both min and max will be extract and the column names will be suffixed accordingly. |
None
|
timestamps
|
bool | str
|
Whether to extract the series timestamps as index; If True, the timestamp string will be used; Use 'datetime' or 'epoch' to parse the timestamp string. |
None
|
Returns:
| Type | Description |
|---|---|
|
A Pandas DataFrame object. |
to_numpy(data: Series, series: str | list[str] = None, value: str = None, timestamps: bool | str = None)
Build a NumPy array from a Cumulocity Series object.
This functions extracts the min and/or max values or one or multiple series define within the Series object. The result is either a 1-dimensional array if only a single series and value is extracted or a 2-dimensional array if either multiple series and/or multiple values are extracted.
The arrays 'columns' are ordered as defined via the series argument
or as defined in the source Series object. If both min and max values
are extracted, they will be grouped adjacent to each other in the result.
If the timestamps argument is set, the result is a tuple of two NumPy
arrays; the first holding the data the second the isolated timestamps
as 1-dimensional array.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Series
|
A c8y_api Series object |
required |
series
|
str | list
|
A series' name or a collection of series names; If omitted, all available series are extracted. |
None
|
value
|
str
|
The value (min/max) to extract; If omitted, both min and max will be extracted. |
None
|
timestamps
|
bool | str
|
Whether to extract the series timestamps; If True, the timestamp strings will be used and this function returns a tuple (data, timestamps); Use 'datetime' or 'epoch' to parse the timestamp strings. |
None
|
Returns:
| Type | Description |
|---|---|
|
A NumPy array or a 2-tuple of NumPy arrays if timestamps are included. |
to_series(data: Series, series: str = None, value: str = 'min', timestamps: bool | str = None)
Build a Pandas Series from a Cumulocity Series object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Series
|
A c8y_api Series object |
required |
series
|
str
|
A series' name; can be left blank if |
None
|
value
|
str
|
The value (min/max) to extract; defaults to 'min' |
'min'
|
timestamps
|
bool | str
|
Whether to extract the series' timestamps as index; If True, the timestamp string will be used; Use 'datetime' or 'epoch' to parse the timestamp string. |
None
|
Returns:
| Type | Description |
|---|---|
|
A Pandas Series object. |