[Submission] MQTT Library and Sample

@bpr, No, I’ve stopped using the spark because of this…

FWIW,
I’ve stumbled upon what seems like a stable means of communicating between 2 cores with the MQTT IDE library. As far as I can tell,the two critical elements appears to be 1) omitting client.loop() from the loop section of the receiving core and 2) using the retain flag version of the client.publish function. I don’t know why this gets it to be stable but it’s managed to stay stable after over 3000 transmissions. (also #include “MQTT/MQTT.h” didn’t work for me). Code for both cores below:

// This #include statement was automatically added by the Spark IDE.
#include "MQTT.h"

/* mqttreceiver.ini
employs MQTT library Version 0.1: initial version 2014 Hirotaka Niisato in spark core IDE
This core receives messages, from another core running mqttsender.ino, to cycle it's rgb led through red-green-blue 
using mosquitto broker on raspberry pi
Can also display messages on the broker terminal using:
mosquitto_sub -v -h 192.168.1.99 -t sparkcore/RGBled       //192.168.1.99 is ip of rpi broker
*/

// receive message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
    String message(p);

    if (message.equals("RED")) 
        RGB.color(255, 0, 0);

    else if (message.equals("GREEN")) 
        RGB.color(0, 255, 0);

    else if (message.equals("BLUE")) 
        RGB.color(0, 0, 255);

    else    
        RGB.color(255, 255, 255);
}//callback

byte server[] = { 192,168,1,99 }; //address of rpi mosquitto server
MQTT client(server, 1883, callback);

void setup() {
    RGB.control(true);
    // connect to the server
    client.connect("spark_client");  //works with local
    // publish/subscribe
    if (client.isConnected()) 
        client.subscribe("sparkcore/RGBled");
}//setup

void loop() {
    //NOTE: client.loop() NOT USED
}//loop

Now the Sender code

// This #include statement was automatically added by the Spark IDE.
#include "MQTT.h"

/* mqttsender.ino 
employs MQTT library Version 0.1: initial version 2014 Hirotaka Niisato in spark core IDE
mqttsender.ino on this core sends messages to another core running mqttreceiver.ino 
to cause that core to cycle it's rgb led to cycle through red-green-blue
via the mosquitto broker running on a raspberry pi
*/

void callback(char* topic, byte* payload, unsigned int length);

byte server[] = { 192,168,1,99 }; //address of mosquitto server on raspberry pi
MQTT client(server, 1883,callback);
byte bytebuffer[30];
long loopcounter;

// receive message nothing to do here - we're only sending
void callback(char* topic, byte* payload, unsigned int length) {
}//callback

void setup() {
    // connect to the server and send message to mosquitto and other core
    client.connect("spark_client2");  //works with local
    Time.zone(-7); 
    loopcounter = 0;
}//setup

void loop() {
        if (client.isConnected()) {
         String message3 = "RED";
        message3.getBytes(bytebuffer, 4);
        client.publish("sparkcore/RGBled",bytebuffer, 6, true);
}
delay(500);

        if (client.isConnected()) {
         String message4 = "GREEN";
        message4.getBytes(bytebuffer, 6);
        client.publish("sparkcore/RGBled",bytebuffer, 6, true);
}
delay(500);

        if (client.isConnected()) {
         String message5 = "BLUE";
        message5.getBytes(bytebuffer, 5);
        client.publish("sparkcore/RGBled",bytebuffer, 6, true);
}
delay(500);

        if (client.isConnected()) {
           String message6 = Time.timeStr();         
           char  cstr[24]; 
           strcpy(cstr, message6.c_str());
           client.publish("sparkcore/lasttime", (uint8_t *)cstr,24,true); //publish time message sent
}
delay(500);

        if (client.isConnected()) {
           loopcounter++;
           String string2(loopcounter,(uint8_t)10);
           string2.getBytes(bytebuffer, string2.length()+1); 
           client.publish("sparkcore/loopcounter",bytebuffer,string2.length()+1,true); //publish number of loop cycles
}
delay(500);

}//loop
1 Like

well, I spoke too soon. A simple brief wifi outage completely shuts it down and only a hard rest of both cores resuscitates it. System.reset programmatically doesn’t even restore the connection. Cannot figure out why.

Not wanting to give up on MQTT, I fiddled around some more and now have some code that has been quite stable for over 350,000 loops and can recover from wifi outages after adding the reconnect() code and a few other things. Still not clear what changes made the difference (sorry). I restored the client.loop() call, threw in some Spark.process() calls and left the variable declarations in the callback in mqttsender.ino. Using the persistence flag didn’t seem to be important. (Wish I knew more!). In case it will help anyone else the code for the two cores’ is below:

// This #include statement was automatically added by the Spark IDE.
#include "MQTT.h"

/* mqttsender.ino 
employs MQTT library Version 0.1: initial version 2014 Hirotaka Niisato in spark core IDE
mqttsender.ino on this core sends messages to another core running mqttreceiver.ino 
to cause that core to cycle it's rgb led to cycle through red-green-blue
via the mosquitto broker running on a raspberry pi
Can also display messages on the broker terminal using:
mosquitto_sub -v -h 192.168.1.xx -t RGBled       //192.168.1.xx is ip of rpi mosquitto broker
*/

void callback(char* topic, byte* payload, unsigned int length);
byte server[] = { 192,168,1,81 }; //192.168.1.xx is ip of rpi mosquitto broker
MQTT client(server, 1883, callback);
byte bytebuffer[30];
long loopcounter;
// recieve message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
    String message(p);
}//callback

void reconnect() {
  while (Spark.connected() == false) {
    Spark.connect();
    delay(1000);
  }//while (Spark.connected()
}//reconnect()

void setup() {
    Spark.syncTime();
    Time.zone(-7); 
    // connect to the server
    client.connect("clientS");
    Spark.process();
    // publish/subscribe
    if (client.isConnected()) {
        //String message = "SparkCore Connected";
    Spark.process();
    String message6 = Time.timeStr();         
           char  cstr[24]; 
           strcpy(cstr, message6.c_str());
    Spark.process(); 
    client.publish("lasttime", (uint8_t *)cstr,24,true); //publish time starts up
    delay(100);
    }//if (client.isConnected())
    loopcounter=0;
}//setup

void loop() {
    Spark.process();    
    if (client.isConnected()) {
        client.loop();
        delay(100);
        String message = "RED";
        message.getBytes(bytebuffer, 4);
        client.publish("RGBled",bytebuffer, 4);
        delay(100);
        message = "GREEN";
        message.getBytes(bytebuffer, 6);
        client.publish("RGBled",bytebuffer, 6);
        delay(100);
        Spark.process();
        message = "BLUE";
        message.getBytes(bytebuffer, 5);
        client.publish("RGBled",bytebuffer, 5);
        delay(100);
        loopcounter++;
        String string2(loopcounter,(uint8_t)10);
        string2.getBytes(bytebuffer, string2.length()+1); 
        client.publish("loopcounter",bytebuffer,string2.length()+1,true); //publish number of loop cycles
        delay(100);
    }
else
    {
    Spark.process();
    reconnect();
    Spark.process();
    client.connect("clientS");
    Spark.process();
    String message7 = Time.timeStr();         
        char  cstr[24]; 
        strcpy(cstr, message7.c_str());
    Spark.process(); 
    client.publish("lasttime", (uint8_t *)cstr,24,true); //publish time core reconnected
    delay(100);
    }//else
}//loop

// This #include statement was automatically added by the Spark IDE.
#include "MQTT.h"

/* mqttreceiver.ini
employs MQTT library Version 0.1: initial version 2014 Hirotaka Niisato in spark core IDE
This core receives messages, from another core running mqttsender.ino, to cycle it's rgb led through red-green-blue 
using mosquitto broker on raspberry pi
Can also display messages on the broker terminal using:
mosquitto_sub -v -h 192.168.1.xx -t sparkcore/RGBled       //192.168.1.xx is ip of rpi mosquitto broker
*/

// receive message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
    String message(p);

    if (message.equals("RED")) 
        RGB.color(255, 0, 0);
    else if (message.equals("GREEN")) 
        RGB.color(0, 255, 0);
    else if (message.equals("BLUE")) 
        RGB.color(0, 0, 255);
    else    
        RGB.color(255, 255, 255);
}//callback

void reconnect() {
  while (Spark.connected() == false) {
    Spark.connect();
    delay(1000);
  }
}//reconnect

byte server[] = { 192,168,1,81 }; //address of rpi mosquitto server
MQTT client(server, 1883, callback);

void setup() {
    RGB.control(true);
    // connect to the server
    client.connect("clientR"); 
    //subscribe
    if (client.isConnected()) 
        client.subscribe("RGBled");
}//setup

void loop() {

    if (client.isConnected()) {
        client.loop(); 
}//if (client.isConnected())
else
{
    Spark.process();
    reconnect();
    Spark.process();
    client.connect("clientR");
    Spark.process();
    delay(100);
 }    
}//loop
1 Like

Any update if your changes fixed the problem? I have this same problem. My code will run for about a day before it stops communicating with the broker. My loop does includes a client.isConnected check:

if(client.isConnected()) {
    // set the LED green  
    RGB.color(0,255,0); 
} else {
    // set the LED red
    RGB.color(255,0,0); 
}

And the light stays green even though it is no longer sending messages to the broker.
Wifi is still connected, because I can reprogram the Spark Core, so I don’t see how the reconnect() is going to help. Besides the reconnect() and the liberal use of Spark.process(), what other changes did you make?

Has anyone had long term connections using the Spark MQTT library and any broker?
My full code is here: https://github.com/ctung/plant-monitor/blob/master/spark_core_client.ino

@kyngston , I haven’t used this in awhile, but what I’d do is something like my reconnect function above and I’d also replace all the delays I had in my last code samples above with :

            lastTime = millis();
            while(millis() - lastTime < 1000)  {Spark.process();}

The 1000 would be for 1 sec delay. Use 100 for 100 ms delay, etc. lastTime is defined as

uint32_t lastTime;

Also I’ve found my photons are extremely sensitive to my microwave oven and so I’ve had to devise a way for them to gracefully connect back up w/ the cloud. I think I more or less solved that (
Microwave hardening). Some of my photons would go into fast cyan blink when the microwave is used. Are you using the cloud at all?
Let me know if this helps

Thanks, I tried that and it didn’t help. However I wrote a simple MQTT example that sends packets I log into my SQL server.

// This #include statement was automatically added by the Spark IDE.
#include "elapsedMillis/elapsedMillis.h"

// This #include statement was automatically added by the Spark IDE.
#include "MQTT/MQTT.h"

void callback(char* topic, byte* payload, unsigned int length);
byte server[] = { 192,168,1,116 }; //192.168.1.xx is ip of rpi mosquitto broker
MQTT client(server, 1883, callback);

elapsedMillis elapsed = 0;
unsigned int interval = 1000*60;

// recieve message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
}//callback

void setup() {
    client.connect("test_core");
    client.publish("test", "3"); //3 indicates a connection
}//setup


void loop() {
    if (elapsed > interval) {
        client.publish("test","1"); // 1 indicates a heartbeat
        elapsed = 0; // reset interval timer
    }

    if (!client.loop()) {
        client.connect("test_core");
        client.publish("test","2"); // 2 indicates a reconnect
    }
}//loop

So far it’s run for over a day without losing connection. If this continues to work, I can slowly add features to it until it stops working. Then I can identify the code that causes the problem.

1 Like

Has anyone tried to use MQTT with TLS yet?

@kyngston, So I’ve been having the same problem as you since way back in February of last year. I eventually shut that project down due to the disconnects, but would like to try working on it again. Have you had any further success with not having it disconnect after a day?

Thanks.

1 Like

I’ve started playing with Particle and MQTT again.
I’ve got two projects with very similar code running at the moment.
One is running on a Core and is needing to be physically reset every 12 hours or so.
The other one is running on a Photon and has been up for a week with no sign of problems.

I intend to post more on this shortly.

1 Like

Hi,

I’m pretty new to particle but have been playing with mqtt for a while. I have a photon and have followed through the examples in this thread connecting to my mqtt server which is on a public address but it fails to connect every time. I’ve got it down to the

client.Connect(mqtt_client);
if(client.isConnected) {

}
else {
This is where I end up
}

I’m using MQTTlens and am subscribed to the same topic with or without the same credentials (tried both with and without). I can send messages and see them arriving in MQTTlens and have also had no issue with arduinos doing the same thing, but can’t for the life of me get my photon to connect.

Does anyone have any clues?

Many thanks

Andy

I think I’ve figured it out…

So whenever I tried connecting using the public url for my server my photon was failing to connect. So I tried my internal IP and that was fine. Knowing there are no firewall issues I tested with my public IP address… Again I had no problem.

So my conclusion is that there is a problem resolving urls in the connect method.

Can anyone else confirm this or has anyone else had success connecting to a url and if so what format are you using?

tcp://mymqttserver.com
mymqttserver.com
http://mymqttserver.com
etc

Below is the code I used to connect successfully (taken from a previous post and tweaked).

Many thanks

Andy

// This #include statement was automatically added by the Spark IDE.
#include "MQTT/MQTT.h"

void callback(char* topic, byte* payload, unsigned int length);
byte server[] = { 192,168,1,3 }; //192.168.1.xx is ip of rpi mosquitto broker
MQTT client(server, 1883, callback);

//elapsedMillis elapsed = 0;
unsigned int interval = 1000*60;

// recieve message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
    String message(p);

    if (message.equals("RED"))    
        RGB.color(255, 0, 0);
    else if (message.equals("GREEN"))    
        RGB.color(0, 255, 0);
    else if (message.equals("BLUE"))    
        RGB.color(0, 0, 255);
    else    
        RGB.color(255, 255, 255);
    delay(1000);
}//callback

void reconnect() {
  while (Spark.connected() == false) {
    RGB.color(255, 0, 0);
    Spark.connect();
    delay(1000);
  }
  RGB.color(0, 255, 0);
  delay(1000);
}//reconnect

void setup() {
    RGB.control(true);
    // connect to the server
    client.connect("clientR"); 
    //subscribe
    if (client.isConnected()) 
        client.subscribe("test");
        
    client.publish("test", "hello"); //3 indicates a connection
}//setup


void loop() {
    if (client.isConnected()) {
        client.loop(); 
    }//if (client.isConnected())
    else
    {
        Spark.process();
        reconnect();
        Spark.process();
        client.connect("clientR");
        Spark.process();
        delay(100);
     }    
}//loop

Hi @mcinnes01

Seems the library asks for an IP or hostname, not an URL.
I tried with an hostname and it works, but your examples are domains, not FQDNs.
What happens trying with mymqttserver.mydomain.com ?

Claudio

Hi Claudio,

It fails with FQDNs, hostnames are probably fine, I haven’t tried but I wanted my device to be able to connect outside of my domain so the FQDN is most useful to me.

I have it working without an issue with the public IP, but it would be nice to use FQDNs.

Many thanks

Andy

Ok @mcinnes01 :
Which is the public URL of the server you are trying to connect to (if you can disclose it) ?

Hi @duffo64

I can’t disclose my server however this is open and using the exact same setup that I have running:

http://test.mosquitto.org/ >> tcp://test.mosquitto.org/ If you use the chrome extension MQTTlens you can subscribe and publish so you can see what is going on with your particle.

Minus the use of FQDN I’ve put together a cool little test with a universal windows 10 app running on my phone interpreting voice commands and publish to an mqtt topic. Then my photon is subscribed to the topic and processes the messages from the queue controlling the colour of the onboard led.

It would be interesting to know if you can get the FQDN working?

Many thanks

Andy

@mcinnes01

test.mosquitto.org (lean and mean) IS an FQDN, so at this point some sort of misunderstanding is around…

@duffo64

That is the same structure as the FQDN / url that is assigned for a particular port on my server. If I try to connect to my equivalent of test.mosquitto.org it fails, but if I connect to the public IP of my server everything is fine.

Many thanks

Andy

Have tried every example in this thread and I’m always getting the same error!

In file included from ../hal/src/stm32f2xx/platform_headers.h:19:0,
             from ./inc/application.h:32,
             from MQTT.cpp:2:
../hal/src/stm32f2xx/deepsleep_hal_impl.h:22:80: warning: 'externally_visible' attribute ignored [-Wattributes]
#define retained  __attribute__((externally_visible, section(".retained_user")))
                                                                                ^
MQTT.cpp:231:78: note: in expansion of macro 'retained'
bool MQTT::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {

		../hal/src/stm32f2xx/deepsleep_hal_impl.h:22:80: error: section attribute not allowed for '<anonymous>'
 #define retained  __attribute__((externally_visible, section(".retained_user")))
                                                                                ^
	MQTT.cpp:231:78: note: in expansion of macro 'retained'
 bool MQTT::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained) {
                                                                          ^
MQTT.cpp: In member function 'bool MQTT::publish(char*, uint8_t*, unsigned int, bool)':

		../hal/src/stm32f2xx/deepsleep_hal_impl.h:22:19: error: expected primary-expression before '__attribute__'
 #define retained  __attribute__((externally_visible, section(".retained_user")))
               ^
	MQTT.cpp:241:13: note: in expansion of macro 'retained'
     if (retained) {
         ^
		../hal/src/stm32f2xx/deepsleep_hal_impl.h:22:19: error: expected ')' before '__attribute__'
 #define retained  __attribute__((externally_visible, section(".retained_user")))
               ^
	MQTT.cpp:241:13: note: in expansion of macro 'retained'
     if (retained) {
         ^
make[1]: *** [../build/target/user/platform-6MQTT.o] Error 1
make: *** [user] Error 2

Error: Could not compile. Please review your code.

Hi @DanielSheard

This happened because there is a macro in deepsleep_hal_impl.h that has the same name (retained) of a parameter in the MQTT::publish method.

In MQTT.cpp rename it from

bool MQTT::publish(char* topic, uint8_t* payload, unsigned int plength, bool retained)

to

bool MQTT::publish(char* topic, uint8_t* payload, unsigned int plength, bool mqretained)

Do the same thing in the body of the method, of course.

An unfortunate case of name clash.

1 Like