Skip to content

Auxiliary Tools

The Cumulocity Python API's standard packages (see Main API classes and Model classes) represent what the Cumulocity REST API provides. The c8y_tk (for "Cumulocity toolkit") module provides additional auxiliary tools that cover useful functionality beyond the REST API but applicable in many projects.

The c8y_tk.notification2 module provides listener implementations that allow straightforward development of Notification 2.0 applications without additional overhead.

→ Listener
→ QueueListener
→ AsyncListener
→ AsyncQueueListener

The c8y_tk.analytics module provides helper functions that allow parallel query processing to maximise performance when dealing with large datasets as well as easy transformation of a Cumulocity Series to Pandas' data frames and series as well as NumPy arrays.

→ ParallelExecutor
→ to_data_frame
→ to_numpy
→ to_series

The c8y_tk.app module provides auxiliary tools for implementing both interactive and micro service applications.

→ CumulocityApp
→ SubscriptionListener


CumulocityApp

Cumulocity API wrapper to be used for interactive sessions.

As a context manager it ensures that a valid Cumulocity connection is available at runtime. It uses standard environment variables when defined (C8Y_BASEURL, C8Y_TENANT, C8Y_USER, C8Y_PASSWORD, as well as C8Y_TOKEN) and interactively requests updated information in case some data is missing.

with CumulocityApp() as c8y:
    alarms = c8y.alarms.get_all(type='cx_MyAlarm')
    ...

SubscriptionListener

Multi-tenant subscription listener.

When implementing a multi-tenant microservice it is sometimes required to keep track of the tenants which subscribe to the microservice. Effectively, this needs to be done via polling the get_subscribers function of the MultiTenantCumulocityApp class.

The SubscriptionListener class provides a default implementation of such a polling mechanism which can be easily integrated using callbacks.

add_callback(callback: Callable[[Union[str, List[str]]], None], blocking: bool = True, when: str = 'any') -> SubscriptionListener

Add a callback function to be invoked if a tenant subscribes to/unsubscribes from the monitored multi-tenant microservice.

Note: multiple callbacks (even listening to the same event) can be defined. The add_callback function supports a fluent interface, i.e. it can be chained, to ease configuration.

Parameters:

Name Type Description Default
callback Callable

A callback function to invoke in case of a change in subscribers. If parameter when is either "added" or "removed" the function is invoked with a single tenant ID for every added/removed subscriber respectively. Otherwise (or if "always/any"), the callback function is invoked with a list of the current subscriber's tenant IDs.

required
blocking bool

Whether to invoke the callback function in a blocking fashion (default) or not. If False, a thread is spawned for each invocation.

True
when str

When to invoke this particular callback function. If "added" or "removed" the callback function is invoked with a single tenant ID for every added/removed subscriber respectively. Otherwise (or if "always/any"), the callback function is invoked with a list of the current subscriber's tenant IDs.

'any'

await_callbacks(timeout: float = None)

Await running callbacks.

Parameters:

Name Type Description Default
timeout float

Maximum wait time (None to wait indefinitely)

None

Raises:

Type Description
TimeoutError

if there are still running callbacks after the specified timeout.

get_callbacks() -> List[Future]

Get currently running callbacks.

This function can be used to gain direct access to the currently running callback threads. Usually, this is not necessary.

See also

Function await_callbacks to await the completion of all currently running callback threads.

listen()

Start the listener.

This is blocking.

shutdown(timeout: float = None)

Shutdown the listener thread and wait for it to finish.

This function can only be invoked if the listener thread was started using the start function (i.e. the thread is managed by this class). Otherwise, the stop function should be used.

Parameters:

Name Type Description Default
timeout float

Maximum wait time (None to wait indefinitely).

None

Raises:

Type Description
TimeoutError

if the shutdown could not complete within the specified timeout. The shutdown procedure is not interrupted by this and will complete eventually.

start() -> threading.Thread

Start the listener in a separate thread.

This function will return immediately. The listening can be stopped using the shutdown function.

Returns:

Type Description
Thread

The created Thread.

stop()

Signal to stop the listening thread.

This function returns immediately; neither the completion of the listen function, nor potentially running callbacks are awaited. Use this, if the listen function is running in a thread managed by your code.

See also

Function await_callbacks, to await the completion of potentially running callback functions. Functions start and shutdown if you don't want to manage the listening thread on your own.

Listener

Synchronous (blocking) Notification 2.0 listener.

Notification 2.0 events are distributed via Pulsar topics, communicating via websockets.

This class encapsulates the Notification 2.0 communication protocol, providing a standard callback mechanism.

Note: Listening with callback requires some sort of parallelism. This listener is implemented in a blocking fashion, it therefore requires the use of treads or subprocesses to ensure the parallelism. Class AsyncListener implements the same functionality using a non-blocking asynchronous approach.

See also: https://cumulocity.com/guides/reference/notifications/

Message

Represents a Notification 2.0 message.

This class is intended to be used with class Listener only.

ack()

Acknowledge the message.

ack(msg_id: str = None, payload: str = None) -> None

Acknowledge a Notification 2.0 message.

Either a valid Notification 2.0 message ID or payload needs to be provided. The message ID is extracted from the payload if necessary and sends it to the channel to signal the message handling completeness.

Parameters:

Name Type Description Default
msg_id str

Message ID to acknowledge.

None
payload str

Raw Notification 2.0 message payload.

None
See also
  • Function Message.ack to acknowledge a specific Notification 2.0 message directly.
  • Listener parameter auto_ack=True to automatically acknowledge a processed message

listen(callback: Callable[[Message], None])

Listen and handle messages.

This function starts listening for new Notification 2.0 messages on the websocket channel. Each received message is wrapped in a Message object and forwarded to the callback function for handling.

The messages are not automatically acknowledged. This can be done via the Message object's ack function by the callback function.

Note: the callback function is invoked as a task and not awaited.

This function will automatically handle the websocket communication including the authentication via tokens and reconnecting on connection loss. It will end when the listener is closed using its close function.

send(payload: str)

Send a custom message.

Parameters:

Name Type Description Default
payload str

Message payload to send.

required

start(callback: Callable[[Message], None]) -> threading.Thread

Start the listener.

This function will start the listening process (listen function) within a thread and register the callback function to be invoked on every subscribed notification.

Parameters:

Name Type Description Default
callback Callable[[Message], None]

Function to be invoked on notifications

required

Returns:

Type Description
Thread

The listener thread.

stop()

Stop the listener.

unsubscribe()

Unsubscribe the listener.

Manually unsubscribing is required if the listener wasn't created with auto_unsubscribe=True.

See also https://cumulocity.com/api/core/#section/Overview/Consumers-and-tokens

wait(timeout=None) -> bool

Wait for the listener to stop.

Parameters:

Name Type Description Default
timeout float

Timeout in seconds.

None

Returns:

Type Description
bool

Whether the listener has stopped (before timeout).

AsyncListener

Asynchronous Notification 2.0 listener.

Notification 2.0 events are distributed via Pulsar topics, communicating via websockets.

This class encapsulates the Notification 2.0 communication protocol, providing a standard callback mechanism.

Note: Listening with callback requires some sort of parallelism. This listener is implemented in a non-blocking fashion using Python coroutines. Class Listener implements the same functionality using a classic, blocking approach.

See also: https://cumulocity.com/guides/reference/notifications/

Message

Represents a Notification 2.0 message.

This class is intended to be used with class AsyncListener only.

ack() async

Acknowledge the message.

ack(msg_id: str = None, payload: str = None) async

Acknowledge a Notification 2.0 message.

Either a valid Notification 2.0 message ID or payload needs to be provided. The message ID is extracted from the payload if necessary and sends it to the channel to signal the message handling completeness.

Parameters:

Name Type Description Default
msg_id str

Message ID to acknowledge.

None
payload str

Raw Notification 2.0 message payload.

None
See also
  • Function Message.ack to acknowledge a specific Notification 2.0 message directly.
  • Listener parameter auto_ack=True to automatically acknowledge a processed message

listen(callback: Callable[[AsyncListener.Message], Awaitable[None]]) async

Listen and handle messages.

This function starts listening for new Notification 2.0 messages on the websocket channel. Each received message is wrapped in a Message object and forwarded to the callback function for handling.

The messages are not automatically acknowledged. This can be done via the Message object's ack function by the callback function.

Note: the callback function is invoked as a task and not awaited.

This function will automatically handle the websocket communication including the authentication via tokens and reconnecting on connection loss. It will end when the listener is closed using its close function.

send(payload: str) async

Send a custom message.

Parameters:

Name Type Description Default
payload str

Message payload to send.

required

start(callback: Callable[[AsyncListener.Message], Awaitable[None]])

Start the listener.

This function will start the listening process (listen function) and register the callback function to be invoked on every subscribed notification.

Parameters:

Name Type Description Default
callback Callable[[Message], Awaitable[None]]

Function to be invoked on notifications

required

Returns:

Type Description

Created listener task.

stop()

Signal the listener to be stopped.

unsubscribe()

Unsubscribe the listener.

Manually unsubscribing is required if the listener wasn't created with auto_unsubscribe=True.

See also https://cumulocity.com/api/core/#section/Overview/Consumers-and-tokens

wait(timeout=None) async

Wait for the listener task to finish.

Parameters:

Name Type Description Default
timeout int

The number of seconds to wait for the listener to finish. The listener will be cancelled if the timeout occurs.

None

QueueListener

Special listener implementation which pushes notification messages into a standard (sync) queue which can be monitored and read.

start()

Start the listener.

stop()

Stop the listener.

wait(timeout=None) -> bool

Wait for the listener task to finish.

Parameters:

Name Type Description Default
timeout int

The number of seconds to wait for the listener to finish. The listener will be cancelled if the timeout occurs.

None

Returns:

Type Description
bool

Whether the listener has stopped (before timeout).

AsyncQueueListener

Special listener implementation which pushes notification messages into a standard (async) queue which can be monitored and read.

start()

Start the listener.

stop()

Stop the listener.

wait(timeout=None) async

Wait for the listener task to finish.

Parameters:

Name Type Description Default
timeout int

The number of seconds to wait for the listener to finish. The listener will be cancelled if the timeout occurs.

None

ParallelExecutor

Parallel execution context.

Use this class to run multiple select, get_all (batched) or collect (for measurements only) API calls in parallel for better throughput and overall performance by reducing I/O wait time.

The select, get_all, and collect functions can be invoked just as if the corresponding API is invoked directly; the additional parameters will be passed directly as-is. This includes the as_values parameter which can be used to directly parse the JSON into tuples.

This class should be used as a context manager, i.e.

    with ParallelExecutor() as executor:
        queue = executor.select()
        ...

However, it defines multiple static methods which handle the context and can be used synchronously, i.e.

    # read all devices of type 'myType' using threads
    all_devices = ParallelExecutor.as_list(c8y.device_inventory, type='myType'):

See also ParallelExecutor.as_list, ParallelExecutor.as_records, ParallelExecutor.as_dataframe

as_dataframe(api, workers: int = 5, strategy: str = 'pages', columns: list = None, mapping: dict = None, **kwargs) -> pd.DataFrame staticmethod

Read data via a Cumulocity API concurrently.

If mapping is provided, the API call results are mapped as corresponding columns within the result dataframe. Otherwise, it is assumed that the as_values parameter is provided (for the underlying API calls) and the result tuples are directly mapped to columns within the result dataframe. The columns parameter can be used to provide specific column names (default: c0, c1 ...).

Parameters:

Name Type Description Default
api CumulocityResource

An Cumulocity API instances; e.g. Events or Alarms. The API needs to support the get_count and get_all functions.

required
workers int

The number of parallel processes to use

5
strategy str

The strategy to use for parallelization; Currently, only 'pages' is supported.

'pages'
mapping dict

A mapping of simplified JSON paths to columns.

None
columns list

A list of column names.

None
**kwargs

Additional keyword arguments to pass to the underlying API calls.

{}

Returns:

Type Description
DataFrame

The collected data as Pandas DataFrame.

See also c8y_api.model.as_ for more information about the mapping syntax.

as_list(api, workers: int = 5, strategy: str = 'pages', **kwargs) -> list staticmethod

Read data via a Cumulocity API concurrently.

Parameters:

Name Type Description Default
api CumulocityResource

An Cumulocity API instances; e.g. Events or Alarms. The API needs to support the get_count and get_all functions.

required
workers int

The number of parallel processes to use

5
strategy str

The strategy to use for parallelization; Currently, only 'pages' is supported.

'pages'
**kwargs

Additional keyword arguments to pass to the underlying API calls.

{}

Returns:

Type Description
list

The collected data as list; These can Python objects or tuples

list

if the as_values parameter is utilized to parse the objects.

as_records(api, workers: int = 5, strategy: str = 'pages', mapping: dict = None, **kwargs) -> list[dict] staticmethod

Read data via a Cumulocity API concurrently.

Parameters:

Name Type Description Default
api CumulocityResource

An Cumulocity API instances; e.g. Events or Alarms. The API needs to support the get_count and get_all functions.

required
workers int

The number of parallel processes to use

5
strategy str

The strategy to use for parallelization; Currently, only 'pages' is supported.

'pages'
mapping dict

A mapping of simplified JSON paths to record field names.

None
**kwargs

Additional keyword arguments to pass to the underlying API calls.

{}

Returns:

Type Description
list[dict]

The collected data as list of records/dictionaries.

See also c8y_api.model.as_records for more information about the mapping syntax.

get_all(api: CumulocityResource, strategy: str = 'pages', **kwargs) -> Queue

Perform multiple get_all API calls in parallel.

Parameters:

Name Type Description Default
api CumulocityResource

An Cumulocity API instances, e.g. Events or Alarms; the API needs to support the get_count and get_all functions.

required
strategy str

The strategy to use for parallelization; Currently, strategy 'pages' and 'dates' are supported. The 'dates' strategy required a defined date range; the upper bound is assumed to be 'now' of omitted.

'pages'
**kwargs

Additional keyword arguments to pass to get_all.

{}

Returns:

Type Description
Queue

A Queue instance which is filled asynchronously with the list

Queue

results returned by the get_all function. This may also be a

Queue

list of tuples if the as_values parameter is utilized to parse

Queue

the JSON documents.

parallel(*functions) -> ParallelExecutorResult

Perform a collection of functions in parallel.

Parameters:

Name Type Description Default
functions

Iterable of functions to run in parallel; this can be variable args or any iterable including a generator.

()

Returns:

Type Description
ParallelExecutorResult

A ParallelExecutorResult object which provides functionality

ParallelExecutorResult

conveniently collect the results.

Note: Do not use lambda to define function with bound variables; use functools.partial instead. See also: https://stackoverflow.com/questions/23400785

select(api: CumulocityResource, strategy: str = 'pages', **kwargs) -> Queue

Perform multiple select API calls in parallel.

Parameters:

Name Type Description Default
api CumulocityResource

An Cumulocity API instances, e.g. Events or Alarms; the API needs to support the get_count and select functions.

required
strategy str

The strategy to use for parallelization; Currently, only 'pages' is supported.

'pages'
**kwargs

Additional keyword arguments to pass to select.

{}

Returns:

Type Description
Queue

A Queue instance which is filled asynchronously with the results

Queue

yielded by the select function. This may also be a tuple if the

Queue

as_values parameter is utilized to parse the JSON documents.

to_data_frame(data: Series, series: str | list[str] = None, value: str = None, timestamps: bool | str = None)

Build a Pandas DataFrame from a Cumulocity Series object.

Parameters:

Name Type Description Default
data Series

A c8y_api Series object

required
series str | list

A series' name or a collection of series names; If omitted, all available series are extracted. The series names will be used as column names (special characters will be replaced)

None
value str

The value (min/max) to extract; If omitted, both min and max will be extract and the column names will be suffixed accordingly.

None
timestamps bool | str

Whether to extract the series timestamps as index; If True, the timestamp string will be used; Use 'datetime' or 'epoch' to parse the timestamp string.

None

Returns:

Type Description

A Pandas DataFrame object.

to_numpy(data: Series, series: str | list[str] = None, value: str = None, timestamps: bool | str = None)

Build a NumPy array from a Cumulocity Series object.

This functions extracts the min and/or max values or one or multiple series define within the Series object. The result is either a 1-dimensional array if only a single series and value is extracted or a 2-dimensional array if either multiple series and/or multiple values are extracted.

The arrays 'columns' are ordered as defined via the series argument or as defined in the source Series object. If both min and max values are extracted, they will be grouped adjacent to each other in the result.

If the timestamps argument is set, the result is a tuple of two NumPy arrays; the first holding the data the second the isolated timestamps as 1-dimensional array.

Parameters:

Name Type Description Default
data Series

A c8y_api Series object

required
series str | list

A series' name or a collection of series names; If omitted, all available series are extracted.

None
value str

The value (min/max) to extract; If omitted, both min and max will be extracted.

None
timestamps bool | str

Whether to extract the series timestamps; If True, the timestamp strings will be used and this function returns a tuple (data, timestamps); Use 'datetime' or 'epoch' to parse the timestamp strings.

None

Returns:

Type Description

A NumPy array or a 2-tuple of NumPy arrays if timestamps are included.

to_series(data: Series, series: str = None, value: str = 'min', timestamps: bool | str = None)

Build a Pandas Series from a Cumulocity Series object.

Parameters:

Name Type Description Default
data Series

A c8y_api Series object

required
series str

A series' name; can be left blank if data holds only the values of one series

None
value str

The value (min/max) to extract; defaults to 'min'

'min'
timestamps bool | str

Whether to extract the series' timestamps as index; If True, the timestamp string will be used; Use 'datetime' or 'epoch' to parse the timestamp string.

None

Returns:

Type Description

A Pandas Series object.