MQTT Broadcaster

Description

This document provides a guide on using the MQTT broadcaster executable. The service forwards signal values from the IPC manager to MQTT topics, providing a connection between IPC and MQTT.

Configuration

In order to configure the executable, locate a file under /etc/tfc/mqtt-broadcaster/def/mqtt_broadcaster.json and set the following variables:

{
   "_mqtt_broker_address": "",
   "_mqtt_broker_port": "8883",
   "_mqtt_broker_username": "",
   "_mqtt_broker_password": "",
   "_sparkplug_b_edge_node_id": "",
   "_sparkplug_b_group_id": "",
   "_client_id": ""
}

The following command line arguments are optional:

--stdout --log-level trace   # enables logging

Testing

In order to see functionality run:

ipc-ruler --stdout --log-level trace
tfcctl --signal bool.test --stdout --log-level trace   # send values on this signal to see functionality

Key elements of coroutines

The executable uses a coroutine which it posts to in order to perform tasks. This is an effective way to have the benefits of multithreading and single-threading combined. There are a number of elements of this feature that need to be understood when developing.

Detached and Awaitable

asio::co_spawn(mqtt_client_->strand(), tfc::base::exit_signals(ctx_), asio::detached);
co_await asio::co_spawn(mqtt_client_->strand(), connect_to_broker(), asio::use_awaitable);

If a coroutine is spawned with asio::detached the coroutine will run in the background and will not be waited on, also known as “fire-and-forget”. These are most typically tasks that are not critical to the operation of the executable and can run alongside other tasks.

If a coroutine is spawned with asio::use_awaitable the coroutine will be waited on and will block the thread until it has completed. An example of this can be when the program is connecting to the broker. Other parts of the program cannot function until the connection has been established and therefore the coroutine must be waited on.

Context and Strand

co_await asio::co_spawn(mqtt_client_->strand(), mqtt_client_->close(asio::use_awaitable), asio::use_awaitable);
co_await asio::co_spawn(ctx_, mqtt_client_->close(asio::use_awaitable), asio::use_awaitable);

If a coroutine is spawned in a strand, it will run in the strand. This means that all routines will run synchronously and the risk of concurrency is avoided. This is useful when you want to ensure that a routine is not interrupted by any other task than the one running. This executable uses a strand in order to guarantee memory safety.

If a coroutine is spawned in a context this consecutive execution is not guaranteed. Multiple jobs can run at the same time and memory safety is not guaranteed.

Key elements of the MQTT protocol

The executable uses MQTT v5 and the Sparkplug B specification.

Assumption: The reader has a basic understanding of the MQTT protocol.

The executable utilizes 2 very important flags:

  • session expiry interval

  • clean session / clean start

When a client connects to the broker it starts a session. This session is kept alive by PING messages. If the client disconnects the broker stores information about the client for a certain amount of time. This is the session expiry interval. If the client doesn’t set a session expiry interval then it is set to 0 and the broker immediately deletes the session as soon as it detects a disconnect.

The clean session flag determines weather the client should use a previous session or start a new one. If the clean session flag is set to true then the broker will always start a new session upon every new connection. Otherwise, it will try to use the previous session if there is one available. This executable sets the clean session flag to false.

SparkPlug B

The SparkPlug B specification is a specification for MQTT payloads. It is a specification that is used by the MQTT broker to determine how to handle the payload, topic structure and other things.

NBIRTH

In order to create a new session, the client must send an NBIRTH message. This message contains information about the available devices and their signal values.

NDEATH

When a client disconnects it sends an NDEATH message, this is handled through MQTT Will messages.

NDATA

When a client wants to update a signal value it sends an NDATA message. This message contains information about the signal value and the device it belongs to. It should only be used for updates, not for repeated values.

Prerequisites for Local Testing

The following components must be available on your host machine for testing, development, and a better understanding of the executable:

  • MQTT broker that supports SparkPlugB (Ignition … etc)

  • socat (For simulating offline state)

  • IPC-ruler

Simulating Offline State

You can simulate an offline state by forwarding the TCP traffic through socat.

socat TCP4-LISTEN:1234,reuseaddr TCP4:<ip>:8883

To simulate offline state, kill the socat process and restart it.