Hi all,
I have been playing around with Nick O’leary’s PubSubClient for MQTT (Arduino based) and now have it running on the
. Only a small number of mods to get it up and running.
For the broker I used Mosquitto (mosquitto.org) which supports several OS’s and is very quick to get up and running. I have the library working for unauthenticated pub/sub to the broker as well as user id and password based authentication.
Once connected it is very simple to publish to a queue using
client.publish("Spark Status","I'm Alive...");
Or subscribe to an exiting queue / topic using
client.subscribe("led");
There is a call back function that is used to trap incoming messages for a subscribed topic which can be tailored to do what you need.
I have attached the libary and an sample with callback below. Right now all it does is allow you to control the led on D7. Send a mesage to topic led and use on, off or flash as the payload.
Publishing to a queue
To post a message to a topic you can use the mosquitto_pub tool and specify the topic and payload as follows
mosquitto_pub -h 192.168.0.50 led -m flash
or if using authentication
mosquitto_pub -h 192.168.0.50 -u userid -P password -t led -m flash
Subscribing to a queue
You can the mosquitto_sub program as a method to listen to topics - this might be a reasonable alternative to printing via the serial port for logging purposed. # is all topics.
mosquitto_sub -h 192.168.0.50 -t “#” -v
Library Code and Sample
Code below - you need to amend the ip address (byte arrange ip[]) and the client.connect call for your own use.
/*
PubSubClient.cpp - A simple client for MQTT.
Original Code - Nicholas O'Leary
http://knolleary.net
Adapted for Spark Core by Chris Howard - chris@kitard.com
Based on PubSubClient 1.9.1
Changes
- Added gcc pragma to avoid warnings being thrown as errors (deprecated conversion from string constant to 'char*')
- publish_P function removed due to lack of Arduino PROGMEN support on the Spark Core
- Obvious includes commented out
- Using Spark TCPClient instead of Arduino EthernetClient
*/
#pragma GCC diagnostic ignored "-Wwrite-strings"
// #include "PubSubClient.h"
// #include <Arduino.h>
#define ARDUINO_H
#include <stdint.h>
#include <stddef.h>
#include <stdlib.h>
//#include "Client.h"
// MQTT_MAX_PACKET_SIZE : Maximum packet size
#define MQTT_MAX_PACKET_SIZE 128
// MQTT_KEEPALIVE : keepAlive interval in Seconds
#define MQTT_KEEPALIVE 15
#define MQTTPROTOCOLVERSION 3
#define MQTTCONNECT 1 << 4 // Client request to connect to Server
#define MQTTCONNACK 2 << 4 // Connect Acknowledgment
#define MQTTPUBLISH 3 << 4 // Publish message
#define MQTTPUBACK 4 << 4 // Publish Acknowledgment
#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1)
#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2)
#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3)
#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request
#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment
#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request
#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment
#define MQTTPINGREQ 12 << 4 // PING Request
#define MQTTPINGRESP 13 << 4 // PING Response
#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting
#define MQTTReserved 15 << 4 // Reserved
#define MQTTQOS0 (0 << 1)
#define MQTTQOS1 (1 << 1)
#define MQTTQOS2 (2 << 1)
class PubSubClient {
private:
//Client* _client;
TCPClient* _client; // CH 14Jan2014 - changed Client* to TCPClient*
uint8_t buffer[MQTT_MAX_PACKET_SIZE];
uint16_t nextMsgId;
unsigned long lastOutActivity;
unsigned long lastInActivity;
bool pingOutstanding;
void (*callback)(char*,uint8_t*,unsigned int);
uint16_t readPacket(uint8_t*);
uint8_t readByte();
bool write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(char* string, uint8_t* buf, uint16_t pos);
uint8_t *ip;
char* domain;
uint16_t port;
public:
PubSubClient();
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),TCPClient& client); // CH 14Jan2014 - changed Client& to TCPClient&
PubSubClient(char *, uint16_t, void(*)(char *,uint8_t*,unsigned int),TCPClient& client); // CH 14Jan2014 - changed Client& to TCPClient&
//bool connect(const char *);
bool connect(char *);
bool connect(char *, char *, char *);
bool connect(char *, char *, uint8_t, uint8_t, char *);
bool connect(char *, char *, char *, char *, uint8_t, uint8_t, char *);
void disconnect();
bool publish(char *, char *);
bool publish(char *, uint8_t *, unsigned int);
bool publish(char *, uint8_t *, unsigned int, bool);
bool subscribe(char *);
bool subscribe(char *, uint8_t qos);
bool unsubscribe(char *);
bool puback(uint16_t msgId);
bool loop();
bool connected();
};
#include <string.h>
PubSubClient::PubSubClient() {
this->_client = NULL;
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), TCPClient& client) { // CH 14Jan2014 - Changed Client& to TCPClient&
this->_client = &client;
this->callback = callback;
this->ip = ip;
this->port = port;
this->domain = NULL;
}
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), TCPClient& client) { // CH 14Jan2014 - Changed Client& to TCPClient&
this->_client = &client;
this->callback = callback;
this->domain = domain;
this->port = port;
}
// CONNECT
//bool PubSubClient::connect(const char *id) {
bool PubSubClient::connect(char *id) {
return connect(id,NULL,NULL,0,0,0,0);
}
bool PubSubClient::connect(char *id, char *user, char *pass) {
return connect(id,user,pass,0,0,0,0);
}
bool PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage)
{
return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
}
bool PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
if (!connected()) {
int result = 0;
if (domain != NULL) {
result = _client->connect(this->domain, this->port);
} else {
result = _client->connect(this->ip, this->port);
}
if (result) {
nextMsgId = 1;
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
unsigned int j;
for (j = 0;j<9;j++) {
buffer[length++] = d[j];
}
uint8_t v;
if (willTopic) {
v = 0x06|(willQos<<3)|(willRetain<<5);
} else {
v = 0x02;
}
if(user != NULL) {
v = v|0x80;
if(pass != NULL) {
v = v|(0x80>>1);
}
}
buffer[length++] = v;
buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
length = writeString(id,buffer,length);
if (willTopic) {
length = writeString(willTopic,buffer,length);
length = writeString(willMessage,buffer,length);
}
if(user != NULL) {
length = writeString(user,buffer,length);
if(pass != NULL) {
length = writeString(pass,buffer,length);
}
}
write(MQTTCONNECT,buffer,length-5);
lastInActivity = lastOutActivity = millis();
while (!_client->available()) {
unsigned long t = millis();
if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
_client->stop();
return false;
}
}
uint8_t llen;
uint16_t len = readPacket(&llen);
if (len == 4 && buffer[3] == 0) {
lastInActivity = millis();
pingOutstanding = false;
return true;
}
}
_client->stop();
}
return false;
}
uint8_t PubSubClient::readByte() {
while(!_client->available()) {}
return _client->read();
}
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
buffer[len++] = readByte();
uint32_t multiplier = 1;
uint16_t length = 0;
uint8_t digit = 0;
do {
digit = readByte();
buffer[len++] = digit;
length += (digit & 127) * multiplier;
multiplier *= 128;
} while ((digit & 128) != 0);
*lengthLength = len-1;
for (uint16_t i = 0;i<length;i++)
{
if (len < MQTT_MAX_PACKET_SIZE) {
buffer[len++] = readByte();
} else {
readByte();
len = 0; // This will cause the packet to be ignored.
}
}
return len;
}
bool PubSubClient::loop() {
if (connected()) {
unsigned long t = millis();
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
if (pingOutstanding) {
_client->stop();
return false;
} else {
buffer[0] = MQTTPINGREQ;
buffer[1] = 0;
_client->write(buffer,2);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
}
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
char topic[tl+1];
for (uint16_t i=0;i<tl;i++) {
topic[i] = buffer[llen+3+i];
}
topic[tl] = 0;
// msgId only present for QOS>0
if (buffer[0]&MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
puback(msgId);
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
}
}
return true;
}
return false;
}
// PUBLISH
bool PubSubClient::publish(char* topic, char* payload) {
return publish(topic,(uint8_t*)payload,strlen(payload),false);
}
bool PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength) {
return publish(topic, payload, plength, false);
}
bool PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
length = writeString(topic,buffer,length);
uint16_t i;
for (i=0;i<plength;i++) {
buffer[length++] = payload[i];
}
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
return write(header,buffer,length-5);
}
return false;
}
bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint8_t lenBuf[4];
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint8_t rc;
uint8_t len = length;
do {
digit = len % 128;
len = len / 128;
if (len > 0) {
digit |= 0x80;
}
lenBuf[pos++] = digit;
llen++;
} while(len>0);
buf[4-llen] = header;
for (int i=0;i<llen;i++) {
buf[5-llen+i] = lenBuf[i];
}
rc = _client->write(buf+(4-llen),length+1+llen);
lastOutActivity = millis();
return (rc == 1+llen+length);
}
bool PubSubClient::subscribe(char* topic) {
return subscribe(topic, 0);
}
// SUBSCRIBE
bool PubSubClient::subscribe(char* topic, uint8_t qos) {
if (qos < 0 || qos > 1)
return false;
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
}
buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length);
buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
}
return false;
}
bool PubSubClient::puback(uint16_t msgId) {
if(connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
buffer[length++] = (msgId >> 8);
buffer[length++] = (msgId & 0xFF);
return write(MQTTPUBACK,buffer,length-5);
}
return false;
}
// HELPERS
//bool PubSubClient::unsubscribe(char* topic) {
bool PubSubClient::unsubscribe(char* topic) {
if (connected()) {
uint16_t length = 5;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
}
buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
}
return false;
}
void PubSubClient::disconnect() {
buffer[0] = MQTTDISCONNECT;
buffer[1] = 0;
_client->write(buffer,2);
_client->stop();
lastInActivity = lastOutActivity = millis();
}
uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
char* idp = string;
uint16_t i = 0;
pos += 2;
while (*idp) {
buf[pos++] = *idp++;
i++;
}
buf[pos-i-2] = (i >> 8);
buf[pos-i-1] = (i & 0xFF);
return pos;
}
bool PubSubClient::connected() {
bool rc;
if (_client == NULL ) {
rc = false;
} else {
rc = (int)_client->connected();
if (!rc) _client->stop();
}
return rc;
}
// MAIN APPLICATOIN CODE STARTS HERE
// Update these with values suitable for your network.
byte ip[] = { 192, 168, 0, 50 };
int LED = D7; // for demo only
void callback(char* topic, byte* payload, unsigned int length) {
// handle message arrived - we are only subscribing to one topic so assume all are led related
byte ledOn[] = {0x6F, 0x6E}; // hex for on
byte ledOff[] = {0x6F, 0x66, 0x66}; // hex for off
byte ledFlash[] ={0x66, 0x6C, 0x61, 0x73, 0x68}; // hex for flash
if (!memcmp(ledOn, payload, sizeof(ledOn)))
digitalWrite(LED, HIGH);
if (!memcmp(ledOff, payload, sizeof(ledOff)))
digitalWrite(LED, LOW);
if (!memcmp(ledFlash, payload, sizeof(ledFlash))) {
for (int flashLoop=0;flashLoop < 3; flashLoop++) {
digitalWrite(LED, HIGH);
delay(250);
digitalWrite(LED, LOW);
delay(250);
}
}
}
TCPClient tcpClient;
PubSubClient client(ip, 1883, callback, tcpClient);
// Simple MQTT demo to allow the blue led (D7) to be turned on or off. Send message to topic "led" with payload of "on" or "off"
void setup()
{
pinMode(LED, OUTPUT); // Use for a simple test of the led on or off by subscribing to a topical called led
//if (client.connect("Spark")) { // Anonymous authentication enabled
if (client.connect("spark", "userid", "password")) { // uid:pwd based authentication
client.publish("Spark Status","I'm Alive...");
client.subscribe("led");
}
}
void loop()
{
client.loop();
}
I am sure the callback can be improved on by somebody with more experience, but it works for now. Full credit to Nick for his original MQTT library.
Cheers
Chris
Hoping my problem might be due to whatever you guys are working with TI to fix and it’ll resolve itself once that’s been implemented.