MQTT + Sparkplug Support¶
While NF provides native APIs for exchanging data with other systems, it also implements Sparkplug over MQTT.
- MQTT is a standardized pub/sub protocol widely adopted in the IoT space. Azure IoT, AWS IoT, and GCP IoT all implement portions of the standard; although none are full implementations and have various restrictions. MQTT does not define any particular structure for topics or message payloads.
- Sparkplug B is a specification for how to implement IoT and Industrial IoT application on top of Sparkplug. It is not possible to fully implement when using a cloud provider as a broker, due to restictions on topic format and lack of support for certain protocol features.
Developers Guide¶
Protocol Overview¶
The Sparkplug protocol defines a topic and payload format on top of MQTT, specified when the messages should be sent.
In the Sparkplug protocol, there are main participants:
- Nodes are essentially gateways which read from underlying devices and communicate over MQTT.
- Devices are devices which are actually generating the data; for instance, controllers or other sensors.
In this architecture, NF is a node, and BACnet devices are exposed as devices. When a node or device starts up, it publishes an NBIRTH or DBIRTH message which contains the metadata for all points available on that node or device -- in Sparkplug language, these are called Metrics.
In NF's implementation, each BACnet object is mapped to a Sparkplug Metric, whose name is the UUID identifier in NF. The DBIRTH message contains all BACnet properties which were read during the latest scan within the metric's PropertySet.
After the DBIRTH has been sent, NF will send DDATA messages as
new data become available. After an outage when the MQTT broker was
not reachable, NF can also send buffered data. In this case those
values will have the is_historical flag set to indicate they are no
longer current.`
Payload Format¶
The "Sparkplug B" protocol defines its protocol format using a protocol buffer. Protocol buffers generate an efficient binary format from a simple descriptoin file, called a proto file. The full set of definitions is available in sparkplug_b.proto.
Normal Framework supports several different encodings of this format,
controlled by the SPARKPLUG_PAYLOAD_FORMAT environment variable.
The options are:
proto+gzip(default): binary protobuf format, then gzipped. This generates the most compact messages.proto: binary protobufjson: the json-equivalent encoding of the protobuf message
Info
If you are using either of the binary formats, you will need to use the protocol libraries for your language in order to decode the payload.
To use a proto file, you first use the protocol buffer compiler for your target language to generate bindings. If you don't have the compiler installed already, you can get it here.
$ protoc --js_out=import_style=commonjs,binary:. sparkplug_b.proto
$ protoc --python_out=. sparkplug_b.proto
If you don't want want to install the compiler you can also just download the results: sparkplug_b_pb.js sparkplug_b_pb2.py.
In addition to the protobuf encoding, NF also compresses the payload using GZip. To unpack a message, you need to decompress it and then parse it using the protocol buffer definition.
// only dependency for node is google-protobuf
var fs = require("fs");
var zlib = require("zlib")
var sparkplug_b = require("./sparkplug_b_pb.js")
fs.readFile("payload.pb", (err, data) => {
var buf = zlib.unzipSync(data)
var message = new sparkplug_b.Payload.deserializeBinary(buf)
console.log(JSON.stringify(message.toObject()))
})
});
# need pip install protobuf
import gzip
import sparkplug_b_pb2
with gzip.open("payload.pb") as fp:
payload = sparkplug_b_pb2.Payload()
payload.ParseFromString(fp.read())
print (payload)
Reliability Considerations¶
Because MQTT is a message bus, data can be published with no guarantee that a consumer is available to insert it into a database. If data are sent and no one is listening, that data will typically be lost. Therefore NF contains several mechanisms to help ensure that all data collected can be reliably saved to a database.
-
Sender-based reliability: if the MQTT broker or receiver can be run in a high-availability, persistent configuration so that all data which are sent are guaranteed to be archived, NF can be run with the
SPARKPLUG_AUTO_RECOVERYflag. In this mode, NF will ensure all data are sent to the broker at least once; after an outage where the broker isn't reachable, it will send buffered data which was generated during the outage. -
Receiver-based reliability: if the broker or upstream system isn't highly-available, NF can run where data are only sent while the broker is available. After an outage, the consumer is responsible for tracking which data have been inserted and generating a retry request asking NF to resend data which occurred during the outage.
As a rule of thumb, sender-based reliabilty is a good fit if using services like Azure IoT since they already contain mechanisms for persisting messages and are highly available. If you are running your own database and broker, receiver-based reliability allows you to guarantee all data will be archived without needing complex system configurations to run an HA broker and database.
Considerations when using a Cloud Provider¶
Cloud providers vary slighly, but they impose certain restrictions on the use of MQTT:
- They all require provider-specific authentication logic, specifying their use of X.509 certificates, MQTT client IDs, and passwords;
- Payload size restictions make compressing messages highly desirable;
- Lack of support for retained messages means other mechanisms should be used to record device databases.
- Restrictions on topic structure break conformance with the specification.
Sparkplug over HTTP¶
Although Sparkplug is defined over MQTT, we also support a simplified version over HTTP for convenience in environments where an MQTT connection is not possible or troublesome.
When enabled, the Sparkplug driver will send each Sparkplug paylod to an HTTP server in the body of a POST request. The request in influenced by a number of environment variables; see the service configuration page for details.
Writing to Points via DCMD¶
By default, the Sparkplug service only publishes data outbound (DBIRTH/DDATA). With DCMD support enabled, external SCADA systems and applications can write to points by publishing Sparkplug DCMD (Device Command) messages.
Writes received via DCMD are dispatched through NF's Command service, which handles protocol-specific details (BACnet WriteProperty, Modbus write, etc.) automatically.
Enabling DCMD¶
Set the following environment variable to enable inbound DCMD handling:
SPARKPLUG_DCMD_ENABLED=true
DCMD is disabled by default to ensure existing deployments are unaffected.
DCMD Topic Format¶
DCMD messages follow the standard Sparkplug topic format:
spBv1.0/{group_id}/DCMD/{node_id}/{device_id}
For example, with the default configuration:
spBv1.0/normalgw/DCMD/001/260001
The service subscribes to spBv1.0/{group_id}/DCMD/{node_id}/+ to receive commands for all devices.
Metric Resolution¶
When a DCMD message arrives, each metric in the payload must be mapped back to an NF point UUID. The resolution order is:
- Alias lookup: If the metric has a non-zero
aliasfield, it is looked up in a reverse alias map built during DBIRTH. - Name as UUID: If
SPARKPLUG_USE_POINT_NAMEisfalse(default), the metric'snamefield is treated as the point UUID directly. - Name lookup: If
SPARKPLUG_USE_POINT_NAMEistrue, the metric'snamefield is looked up in a reverse name map.
Metrics that cannot be resolved are logged and skipped.
Configuration Reference¶
| Variable | Default | Description |
|---|---|---|
SPARKPLUG_DCMD_ENABLED |
false |
Enable DCMD subscription for inbound writes |
SPARKPLUG_DCMD_COMMAND_CONTEXT |
false |
Wrap writes in a command context (enables write reversion) |
SPARKPLUG_DCMD_PRIMARY_HOST_ID |
"" |
Primary Host application ID for STATE tracking |
SPARKPLUG_DCMD_COMMAND_DURATION |
5m |
Lifetime for command contexts created by DCMD |
Example: Publishing a DCMD¶
First, parse the DBIRTH message to build an alias-to-name mapping. Then construct a DCMD payload referencing those aliases.
import json
import paho.mqtt.client as mqtt
# Step 1: Parse DBIRTH to get alias mappings
alias_map = {}
def on_message(client, userdata, msg):
if "/DBIRTH/" in msg.topic:
payload = json.loads(msg.payload)
for m in payload.get("metrics", []):
if "alias" in m and "name" in m:
alias_map[m["alias"]] = m["name"]
client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("spBv1.0/normalgw/DBIRTH/#")
client.loop_start()
# ... wait for DBIRTH messages ...
# Step 2: Publish DCMD with a known alias
target_alias = list(alias_map.keys())[0]
dcmd = {
"metrics": [{
"alias": target_alias,
"datatype": 9, # Float
"floatValue": 72.5
}]
}
client.publish(
"spBv1.0/normalgw/DCMD/001/260001",
json.dumps(dcmd)
)
# Subscribe to DBIRTH to see alias mappings
mosquitto_sub -t 'spBv1.0/normalgw/DBIRTH/#' -C 1 | jq '.metrics[] | {alias, name}'
# Publish a DCMD with alias 11 and float value
mosquitto_pub -t 'spBv1.0/normalgw/DCMD/001/260001' \
-m '{"metrics":[{"alias":11,"datatype":9,"floatValue":72.5}]}'
Command Context (Advanced)¶
When SPARKPLUG_DCMD_COMMAND_CONTEXT is enabled, DCMD writes are wrapped in NF command contexts. This provides automatic write reversion: when the command expires or is cancelled, all writes made through it are reverted.
Primary Host STATE tracking: If SPARKPLUG_DCMD_PRIMARY_HOST_ID is set, the service subscribes to the Sparkplug STATE topic (spBv1.0/STATE/{host_id}). When the primary host goes online, a command context is started. When it goes offline, the command is cancelled and writes are reverted.
| Event | Action |
|---|---|
STATE {"online": true} |
Start command context |
| DCMD received | Write with active command, auto-extend lifetime |
STATE {"online": false} |
Cancel command, revert all writes |
| Command timeout | Writes revert after SPARKPLUG_DCMD_COMMAND_DURATION |
If no primary host is configured but command context is enabled, a session-scoped command is created on the first DCMD and extended on each subsequent DCMD.
Error Handling¶
- Unknown alias/name: Metric is skipped, counter incremented, warning logged.
- Write failure: Error logged per-point; other metrics in the same DCMD are still processed.
- Command service unavailable: DCMD handler initialization fails; service starts without DCMD support and logs an error.
Security Considerations¶
DCMD allows external systems to write to field devices. Consider:
- Configure MQTT broker ACLs to restrict which clients can publish to
DCMDtopics. - Use TLS (
MQTT_PROTOCOL=tls) and authentication (MQTT_USERNAME/MQTT_PASSWORD) for broker connections. - The command context feature provides automatic reversion if the controlling application goes offline.
Cloud Providers¶
Here are provider-specific setup instructions: