I’ve been experimenting with Home Assistant for the past few weeks, and I wanted to use MQTT to publish data from some of my Particle devices. Having never used MQTT before, and being new to Home Assistant, I decided that I needed to configure Home Assistant with test data. That led to the development of the code listed below.
I compiled and flashed the code to my Photon, and I now have an easy way to publish and subscribe to any MQTT broker via the Particle Console as seen in the following screen-shot:
- The SetBrokerIP function allows me to enter the MQTT broker’s IP address or DNS name.
- The SetPubTopic function allows me to enter an MQTT topic that will be used when I publish a message.
- The Publish-Msg function allows me to publish an MQTT message.
- The Subscribe function allows me to subscribe to a MQTT topic.
- The getMessage variable allows me to display the last MQTT subscription message received.
I’ve found this to be very useful tool and wanted to share. Perhaps someone with more MQTT experience can take this and make it into something even more useful. In the meantime, you are more than welcome to use my code. To date, I’ve only tested it on a Photon.
#include "Particle.h"
#include "MQTT.h"
MQTT* mqtt;
char clientName[64] = ""; // obtained from particle/device/name
byte brokerIP[] = {0,0,0,0}; // obtained via Particle.function when an IP address is entered
char brokerDNS[256]; // obtained via the same Particle.function as brokerIP, when a DNS name is entered
char pubTopic[256] = ""; // obtained via Particle.function
char buffer[512] = ""; // obtained via Particle.function
char subTopic[256] = ""; // obtained via Particle.function
char subMessage[512] = ""; // obtained via MQTT subscribe, exposed via Particle.variable
Timer connectTimer(15000, displayConnectionState); // a simple + or - is displayed in the Serial console
bool mqttConnectViaDNS = false; // true: connect via brokerDNS, false: connect via brokerIP
bool mqttIsInitialized = false; // true: mqtt points to an initialized MQTT instance
bool mqttIsActive = false; // true: mqttIsInitialized and mqtt->isConnected are true (maintained in loop())
void setup() {
Serial.begin();
Particle.function("SetBrokerIP", SetBrokerIP);
Particle.function("SetPubTopic", SetPubTopic);
Particle.function("Publish-Msg", Publish);
Particle.function("Subscribe", Subscribe);
Particle.variable("GetMessage", subMessage);
Particle.subscribe("particle/device/name", handler, MY_DEVICES);
Particle.publish("particle/device/name", NULL, 60, PRIVATE);
}
void loop() {
if (mqttIsInitialized && mqtt->isConnected()) {
mqttIsActive = true;
mqtt->loop();
} else {
mqttIsActive = false;
}
}
// --------------------------------------------- Connect to the MQTT Broker via DNS name or IP address
// triggered by Particle.function
int SetBrokerIP(String s) {
// determine if IP address or DNS name was entered ...
mqttConnectViaDNS = false;
s.toCharArray(buffer, sizeof(s));
for (int i = 0; i < strlen(buffer); i++) {
if (( !isDigit(buffer[i])) && (buffer[i] != '.') && (buffer[i] != '\0')) {
mqttConnectViaDNS = true;
break;
}
}
if (mqttConnectViaDNS) {
// connect via DNS Name -----------------------------
strncpy(brokerDNS, buffer, sizeof(brokerDNS));
Serial.printf("brokerDNS: %s\n\r", brokerDNS);
mqtt = new MQTT(brokerDNS, 1883, 15, callback);
mqttIsInitialized = true;
} else {
// connect via IP Address ---------------------------
for (int i = 0; i < 4; i++) brokerIP[i] = 0;
sscanf( buffer, "%u.%u.%u.%u", &brokerIP[0], &brokerIP[1], &brokerIP[2], &brokerIP[3] );
Serial.printf("brokerIP: %u.%u.%u.%u\r\n", brokerIP[0], brokerIP[1], brokerIP[2], brokerIP[3] );
mqtt = new MQTT(brokerIP, 1883, 15, callback);
mqttIsInitialized = true;
}
mqtt->connect(clientName);
connectTimer.start();
return 0;
}
// --------------------------------------------- Set the Publish Topic
// triggered by Particle.function
int SetPubTopic(String s) {
s.toCharArray(pubTopic, sizeof(buffer));
return 0;
}
// --------------------------------------------- Publish an MQTT Message
// triggered by Particle.function
int Publish(String s) {
if (mqtt->isConnected()) {
s.toCharArray(buffer,sizeof(buffer));
mqtt->publish(pubTopic, buffer);
return 0;
} else return 1;
}
// --------------------------------------------- Subscribe to a Topic
// triggered by Particle.function
int Subscribe(String s) {
if (mqtt->isConnected()) {
s.toCharArray(subTopic,sizeof(s));
mqtt->subscribe(subTopic);
return 0;
} else return 1;
}
// --------------------------------------------- Receive an MQTT Message
// triggered when a message is received from the MQTT Broker
void callback(char* topic, byte* payload, unsigned int length) {
strncpy(subTopic, topic, sizeof(subTopic));
memcpy(subMessage, payload, length);
subMessage[length] = '\0';
}
// --------------------------------------------- Display MQTT connection State
// triggered by connectTimer
void displayConnectionState(void) {
if (mqttIsActive) Serial.print("+"); else {
Serial.print("-");
}
return;
}
// -------------------------------------------- Get ClientName
// triggered by the Particle.subscribe and Particle.publish run in setup()
void handler(const char *topic, const char *data) {
strncpy(clientName, data, sizeof(clientName));
}