Particle not responding to MQTT topic

Using Mosquitto on my RPi and this library to control a GPIO that turns on a siren. I got this working the other day and now it stopped working as in the photon stopped responding to my command. How do I debug? Where should I look? I tested my broker and it can receive the topics and it’s connected.

Nvm I got it. Realized that I just needed to be publishing a heartbeat signal so that the particle board is able to receive a subscribed payload

1 Like

Actually it just happened again! I think that the particle somewhere down the line times out. I am not certain why this is happening? Under what conditions does the particle disconnect from the broker? Could this be an issue related to the wifi signal? maybe the sensor get’s disconnected, but then why won’t the particle board get reconnected once it establishes a reconnection on the wifi signal?

I see two issues here …

  1. What is causing your specific outage. That is anyone’s guess without more information. I normally start by monitoring the Mosquitto log with the command: “sudo tail /var/log/mosquitto/mosquitto.log -f”.

  2. The second issue is “How do you recover from any outage?”. In short, the client is responsible for reconnecting and resubscribing to topics after a disconnect. I clipped this code from one of my apps. I’m not claiming it to be the “best solution”, but I have found it to be very reliable.

Global variables

    // MQTT VARIABLES
    const unsigned int MAX_MQTT_PACKET_LENGTH = 512;    // sets maximum MQTT packet size (in bytes)
    byte mqttBrokerIP[] = {192,168,86,53};   
    bool mqttSessionActive = false;
    time_t mqttLastConnectTime = 0;
    MQTT* mqtt = new MQTT(mqttBrokerIP, 1883, 30, mqttCallback, MAX_MQTT_PACKET_LENGTH);

This runs in loop()

/*
--------------------------------------------------------------------------------------------------------------------------------------------------
MQTT Session Manager
 *      - Establishes and maintains communications with the MQTT broker
 *          State logic is used, and each state is processed in a separate loop() cycle to minimize
 *                  the performance impact on the primary application.
 *      - Includes a callback function to accept messages from the MQTT broker
 * ------------------------------------------------------------------------------------------------------------
 *	State 1: An MQTT session has been established and the connection still exists.
 *	State 2: An MQTT session has been established but the connection has dropped.
 *	State 3: An MQTT session has not been established, but a connection with the broker is now present
 *	State 4: An MQTT session has not been established, and it is time to perform a connection attempt
 *	State 5: An MQTT session has not been established, and it is too soon to perform a connection attempt
 * ------------------------------------------------------------------------------------------------------------------------------------------------*/

 void mqttSessionManager(void) {
    static const unsigned int MQTT_CONNECT_DELAY = 30;         // Sets number of seconds between MQTT connect attempts
	const char mqttClientName[] = "edison";                    // MQTT Client Name
    static time_t mqttLastDropTime = 0;
    static time_t mqttNextConnectTime = 0;

    if (mqttSessionActive) { // we are in an active session ...
        if (mqtt->isConnected()) { // and we are still connected ...								// State 1
            mqtt->loop();
        } else { // we lost our connection															// State 2
            mqttSessionActive = false;
            mqttLastDropTime = Time.now();
            delete mqtt;
            mqtt = new MQTT(mqttBrokerIP, 1883, 30, mqttCallback, MAX_MQTT_PACKET_LENGTH);
        }
    }else if (mqtt->isConnected()) { // we have a brand new connection ...							// State 3
        mqttSessionActive = true;
        mqttLastConnectTime = Time.now();
        mqttNextConnectTime = 0;
        // ---------- INSERT MQTT SUBSCRIPTIONS HERE -----------
			mqtt->subscribe("time/zone");
			mqtt->subscribe("edison/relay/set");
		// -----------------------------------------------------
    }else if (Time.now() > mqttNextConnectTime) { // it's time to make a connection attempt ...		// State 4
        mqtt->connect(mqttClientName);																
        mqttNextConnectTime = Time.now() + MQTT_CONNECT_DELAY;
    }   																							// State 5
    return;
}

PS … I just noticed that the comments refer to the " callback function" that is not included in this snippet. You can tell from the last line in the global variables that the function is named mqttCallback(). The code in that function parses/processes messages received from the broker related to the two subscriptions listed in the code. I intentionally omitted this function because the code is application specific and irrelevant to this conversation.

1 Like

Just out of interest: Have you had any issues with using a global or a “permanent” dynamic MQTT instance that “forced” you to repeatedly use new and delete?
new and delete are also increasing the risk of heap fragmentation.

I used a permanent instance when I developed the initial version of this code. That version would often get stuck in the recovery cycle. I don’t remember the details, but I do remember feeling like the root problem was a “dirty” MQTT object. That’s when I decided to try the nuclear new/delete approach. Since then, I’ve seen nothing but clean recoveries.

Is it possible that something else was going on? Yes, it is definitely possible. Every time I see the new/delete I grimace and think: “I should go back and retest this”. Then I think about the work to set up a test MQTT broker and how infrequently real recoveries take place. The risk seems small, so I end up kicking the can down the road. You can probably hear it : thud, clang, clang , rattle rattle.

2 Likes

I’m having a similar problem (I think so anyway). My code runs fine and posts MQTT messages to the broker, but only if the interval between posts is ~<15 sec. If more than that the connection seems to get dropped and no more messages get sent to the broker.
Here’s my code;

// This #include statement was automatically added by the Particle IDE.
#include <CE_BME280.h>

// This #include statement was automatically added by the Particle IDE.
#include <OneWire.h>

// Test code for Fridge and Cabin environmentals. Adding MQTT functionality

// This #include statement was automatically added by the Particle IDE.
#include <MQTT.h>

#include "DS18.h"

#define Addr 0x76                            //BME280 address for Cabin Sensor 
#define TEMP_CHECK_INTERVAL 10000    // Check sensors every X ms 300000ms = 5 min
// Define Sea level pressure
#define SEALEVELPRESSURE_HPA (1013.25)

DS18 sensor(D3);    //  Puts (Fridge) DS18B20 sensor on D3

double fridgeTemp = 0;
double cabinTempC;
double cabinTempF;
double cabinPressureMBAR;
double cabinPressureINHG;
double cabinHumidity;
unsigned long lastTempCheck = 0;

void GetBME280SensorData(void);                //Get BME280 Sensor data
byte BME280SensorConnected=0;
CE_BME280 bme;                                      // I2C


// recieve message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;

}

byte server[] = { 192,168,1,109 }; //192.168.1.116 is ip of RPi MQTT broker
MQTT client(server, 1883, callback);


void setup() {
  Serial1.begin(9600);
/* //Temporary suspension of reporting into cloud
  Particle.variable("CabinTemperature",cabinTempF);
  Particle.variable("Pressure", cabinPressureMBAR);
  Particle.variable("Humidity", cabinHumidity);
  Particle.variable("FridgeTemp", fridgeTemp);
*/
  checkTemp();      //  Checks fridge temps
    
  client.connect("sparkclient");  // connect to the server

    
    if (client.isConnected())  // test publish/subscribe
   {
        client.publish("outTopic/message","hello world");
        client.subscribe("inTopic/message");
    }

}  //  END SETUP

void loop() {
    
 if (lastTempCheck + TEMP_CHECK_INTERVAL < millis()) {
   lastTempCheck = millis();
   
    if(!bme.begin(Addr)){          //If a BME280 sensor is not available at that Address than print this 
    
        Serial1.println("BME280 sensor not connected");
        BME280SensorConnected=0;   
        
    }
    else if(bme.begin(Addr)){
       
       if(BME280SensorConnected==0){                                        
        
        Serial1.println("BME280 sensor is connected and setup is complete"); 
        Serial1.println("");                                                 
        BME280SensorConnected=1;                                            
       }

        GetBME280SensorData();  //Get data from Cabin Sensor (bme280) 
        checkTemp();  // Checks Fridge sensor (DS1820)

        displayCabinData();     //  Pushes data to Nextion
        displayFridgeData();    //  Pushes data to Nextion
        /* Temporary suspension of publish
        Particle.publish("CabinTemperature", String(cabinTempF), PRIVATE);
        Particle.publish("FridgeTemperature", String(fridgeTemp), PRIVATE);
        */
    }


    if (client.isConnected())
        {
  	updateMQTT();  //  Updates MQTT broker with sensor values
        }

}
}//  END LOOP

//FUNCTIONS SECTION

// Check DS1820 Fridge sensor
void checkTemp(){
 if (sensor.read()) {
   fridgeTemp = (sensor.fahrenheit() * 100) / 100;
   }
 }

void GetBME280SensorData()	// Check Cabin sensor
    {
    cabinTempC = bme.readTemperature();
    cabinTempF = ((cabinTempC * 9/5) + 32);

    cabinHumidity = bme.readHumidity();


    cabinPressureMBAR = (bme.readPressure()/100);
    cabinPressureINHG = (cabinPressureMBAR * 0.0295301);//To convert to inHg

    }

void displayCabinData()  // updates Cabin data on Nextion
    {
    String command = "page1.t3.txt=\""+String(cabinTempF, 1)+"\"";  //takes the var cabinMedian or Median value, converts it to a string and sends it to the display
    Serial1.print(command);
    endNextionCommand();
    
    String command1 = "page1.t5.txt=\""+String(cabinHumidity, 1)+"\"";  //takes the var cabinHumid or highest, converts it to a string and sends it to the display
    Serial1.print(command1);
    endNextionCommand();

    String command2 = "page1.t9.txt=\""+String(cabinPressureMBAR, 1)+"\"";  //takes the var cabinHumid or highest, converts it to a string and sends it to the display
    Serial1.print(command2);
    endNextionCommand();
    
    String command3 = "page1.t7.txt=\""+String(cabinPressureINHG, 2)+"\"";  //takes the var cabinHumid or highest, converts it to a string and sends it to the display
    Serial1.print(command3);
    endNextionCommand();
    }

void displayFridgeData()   //  updats Fridge stats on Nextion
    {
    String command = "page1.t6.txt=\""+String(fridgeTemp, 1)+"\"";  //takes the var fridgeMedian or Median value, converts it to a string and sends it to the display
    Serial1.print(command);
    endNextionCommand();
    }

    //  function for sending escape char after string to Nextion
void endNextionCommand()
    {
     Serial1.write(0xff);
     Serial1.write(0xff);
     Serial1.write(0xff);
    }

void updateMQTT()                 //Update the temp(F), humidity(%), and pressure(hPa) readings
{
    String fT = String (fridgeTemp);
    String cTF = String(cabinTempF);
    String cH = String(cabinHumidity);
    String cP = String(cabinPressureINHG);

  String payloadout = "{";                                //Create JSON payloadout to send to MQTT server
  payloadout += "\"cabinHumidity\":";                          //Refer to: JSON Style Guide
  payloadout += cH;
  payloadout += ",";
  
  payloadout += "\"cabinPressureINHG\":";
  payloadout += cP;
  payloadout += ",";

  payloadout += "\"cabinTempF\":";
  payloadout += cTF;
  payloadout += ",";
  
  payloadout += "\"fridgeTemp\":";
  payloadout += fT;
  payloadout += "}";
  
  Serial.println(payloadout);      
  char attributes[200];
  payloadout.toCharArray(attributes, 200);            //Create character array
  client.publish("sensor/temps", attributes);             //Publish the name and value pairs to server                                                     // in JSON format              
}

Any thoughts on how to extend the interval and not drop the connection?

Hi,
I do not have any issue to publish to my mosquitto broker on Ubuntu 18.04LTS with range from 200 ms to 10 min. works like charm.
Could you first get rid of all your String as what I learned from @ScruffR Strings are bad bad bad and can make heap fragmentation and even more dangerous stuff
Are you ruining MQTT 0.4.31 available from Web IDE ? if yes could you try to use in your code SYSTEM_THREAD(ENABLED);
and instead of
MQTT client(server, 1883, callback);
MQTT client(server, 1883, callback, true);

4th parameter : bool thread(default false.) SYSTEM_THREAD(ENABLED) settings : thread is true.

Also could you move this part of your code to setup() as IMHO you don't need this in your loop()

Here is some tested example how to get rid of String with valid JSON:

#include <stdio.h>


double fridgeTemp = 7.32;
double cabinTempC = -44.07;
double cabinTempF = -3.76;
double cabinPressureMBAR = 102.3;
double cabinPressureINHG = 74.87;
double cabinHumidity = 88.12;

char playloadout_template[] = "{\"cabinHumidity\":%.02f,\"cabinPressureINHG\":%.02f,\"cabinTempF\":%.02f,\"fridgeTemp\":%.02f}";
char playloadout[sizeof(playloadout_template)];
char command_template[] = "page1.t3.txt=\"%.02f\"";
char command[sizeof(command_template)];

int main()
{
snprintf(command, sizeof(command), command_template, cabinTempF);

snprintf(playloadout, sizeof(playloadout), playloadout_template
                  , cabinHumidity
                  , cabinPressureINHG
                  , cabinTempF
                  , fridgeTemp);
    printf("command -> %s\n",command);
    printf("playloadout -> %s",playloadout);
    return 0;
}


1 Like

Thanks for the reply. I’m a noob so I’ve been parsing through your post trying to catch up.

  1. Yes I’m using MQTT 0.4.31 from the web IDE.
  2. I updated my code with your suggestions about adding SYSTEM_THREAD(ENABLED); and adding “true” to the MQTT client statement.
  3. I’m not sure what you meant by the “4th parameter…” statement. Is that informational or is that supposed to be added somewhere?
  4. I moved the BME sensor check to setup as suggested.
  5. Now to the good stuff - getting rid of strings in JSON;
    Is the stdio.h lib required?
    Can you walk me through what is going on here? Generally I think I get that you’re constructing a JSON “string” with a particular format, then stuffing values into the placeholders, but I’m not sure how it all works. Is all of this part of its own function? In void loop()?
char playloadout_template[] = "{\"cabinHumidity\":%.02f,\"cabinPressureINHG\":%.02f,\"cabinTempF\":%.02f,\"fridgeTemp\":%.02f}";
char playloadout[sizeof(playloadout_template)];
char command_template[] = "page1.t3.txt=\"%.02f\"";
char command[sizeof(command_template)];

int main()
{
snprintf(command, sizeof(command), command_template, cabinTempF);

snprintf(playloadout, sizeof(playloadout), playloadout_template
                  , cabinHumidity
                  , cabinPressureINHG
                  , cabinTempF
                  , fridgeTemp);
    printf("command -> %s\n",command);
    printf("playloadout -> %s",playloadout);
    return 0;
}

Any clarification you could provide would be appreciated. Trying to understand, not just cut and paste…
Thx

  1. perfect :+1:
  2. perfect :+1:

you can find this info on official example in web IDE and on Github here

  1. perfect :+1:
  2. The stdio.h lib is not required, is used on OnlineGDB just for the example how you can build your JSON object and commands for Nextion display (I'm not sure how is the correct format for Nextion :see_no_evil:)

You are absolutely right so I declare global char arrays and then use snprintf() function in main() function (equivalent Loop() in Particle environment) to assign the values into the placeholders. You should use it in your functions:

e.g:

/**
global declaration on the top of your code
**/

char playloadout_template[] = "{\"cabinHumidity\":%.02f,\"cabinPressureINHG\":%.02f,\"cabinTempF\":%.02f,\"fridgeTemp\":%.02f}";
char playloadout[sizeof(playloadout) + 32 ];

/**
other stuff e.g setup(), loop(), rest of your functions
.......................................................
**/

void updateMQTT()                 
{
  snprintf(playloadout, sizeof(playloadout), playloadout_template
                  , cabinHumidity
                  , cabinPressureINHG
                  , cabinTempF
                  , fridgeTemp);             
  client.publish("sensor/temps", playloadout);                                                                              
}

the additional 32 chars in char playloadout[sizeof(playloadout) + 32 ]; is just in case to be safe and have enough memory reserved you should consider this aslo when you gonna build your commands for Nextion display :stuck_out_tongue_winking_eye:
If you have any other questions don't hesitate to ask :slight_smile:

Best Regards,
Arek

Thanks dreamER, this has been very instructive. I’ve been catching up on work :slight_smile:
When I make the changes you suggested I get compile errors “‘payloadout’ was not declared in this scope” for the global declaration here:

char payloadout[sizeof(payloadout) + 32 ];

and further down in the code, here

void updateMQTT()                 
{
  snprintf(payloadout, sizeof(payloadout), payloadout_template
                  , cabinHumidity
                  , cabinPressureINHG
                  , cabinTempF
                  , fridgeTemp);             
  client.publish("sensor/temps", payloadout);                                                                              
}

I declared payloadout prior to setup() making it a global variable. I don’t understand why I’m getting this error. Do I have to declare “payloadout” somewhere else?
Thanks

Hi, you have to declarate both:

char playloadout_template[] = "{\"cabinHumidity\":%.02f,\"cabinPressureINHG\":%.02f,\"cabinTempF\":%.02f,\"fridgeTemp\":%.02f}";
char playloadout[sizeof(playloadout) + 32 ];

On the top of your code right after all of the:
#include
to make it global
Also check for all mistyping :+1: I mean that the declarated vars name on the top has to be exactly the same as in a function call here:

I guess it should rather be

char playloadout[sizeof(playload_template) + 32 ];

You cannot use the size of payloadout in the definition of itself as it isn't defined yet and hence hasn't got a size :wink:

BTW, payload_template can and should be a const char to save RAM space.

1 Like

:man_facepalming:t3: Right thank you @ScruffR as always for your help :+1:

1 Like

Ok, not much luck. I’m going back to basics and will build back up from there. I put together a test MQTT sketch that will publish an incrementing number (0-10) to an MQTT topic to validate 1) MQTT communication, 2) basic elements of sketch. Here’s the code -

// This #include statement was automatically added by the Particle IDE.
#include <MQTT.h>

//  Code Description
//  Tests MQTT communication with rPI

#define TEMP_CHECK_INTERVAL 10000    // do something every X ms
unsigned long lastTempCheck = 0;

int count = 0;
int countLimit =10;  // sets counter upper limit to 10


// recieve MQTT message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
}

//SYSTEM_THREAD(ENABLED);  // If enabled and fourth value is set to "true" nothing works
byte server[] = { 192,168,1,109 }; //192.168.1.109 is ip of RPi MQTT broker
MQTT client(server, 1883, callback);

const char payloadout_template[] = "{\"count\":%.02f}";
char payloadout[sizeof(payloadout_template) + 32 ];

void setup() {
  Serial1.begin(9600);

  client.connect("sparkclient");  // connect to the server
    if (client.isConnected())  // test publish/subscribe
   {
        client.publish("outTopic/message","hello world");  // Send "hello world" to broker
        client.subscribe("inTopic/message");
    }

}  //  END SETUP

void loop() {

 if (lastTempCheck + TEMP_CHECK_INTERVAL < millis()) {
   lastTempCheck = millis();

    if (client.isConnected())
        {
  	updateMQTT();  //  Updates MQTT broker with values
        }
    }

}  // END LOOP

   
void updateMQTT()
{
  count++; // increment counter
  
if(count >= countLimit)  // check to see if counter is => countLimit (10)
  {
    count = 0;  // if it is then reset counter to 0
  }
  
  snprintf(payloadout, sizeof(payloadout), payloadout_template, count);   // put count value in template          
  client.publish("sensor/count", payloadout);  // publish in JSON format              
}

This successfully connects to the MQTT broker and publishes the “hello world” under the outTopic/messge topic, followed 10 seconds later by a series of identical MQTT publishes under the sensor/count topic. The count variable does not appear to increment and only publishes the 0.00 value over and over every ten seconds.

6/28/2021, 11:17:03 AMnode: 3b9858ce.4f8758
sensor/count : msg.payload : string[14]
"{"count":0.00}"

Note: I discovered that if I un-remark the SYSTEM_THREAD(ENABLED) statement and include the “true” value in “MQTT client(server, 1883, callback, true)”
the code does not work.

My assumption is that I’m not formatting the JSON payload correctly. Any thoughts? After I get this working I’ll scale up to my previous challenge of publishing sensor values.
Thanks in advance.

you going to get 0.00 as your placeholder is float/double type "{\"count\":%.02f}" so either you gonna change this to "{\"count\":%d}" either change your

int count = 0;
int countLimit =10;

to float or double type

also try to put SYSTEM_THREAD(ENABLED); at the very top (right after #include <MQTT.h>)
also you are missing a forwarded declaration of MQTT callback
void callback(char* topic, byte* payload, unsigned int length);
and move your callback after client initialization something like this:

// This #include statement was automatically added by the Particle IDE.
#include <MQTT.h>
SYSTEM_THREAD(ENABLED);

//  Code Description
//  Tests MQTT communication with rPI

#define TEMP_CHECK_INTERVAL 10000    // do something every X ms
unsigned long lastTempCheck = 0;
int count = 0;
int countLimit =10;  // sets counter upper limit to 10

const char payloadout_template[] = "{\"count\":%d}";
char payloadout[sizeof(payloadout_template) + 32 ];


void callback(char* topic, byte* payload, unsigned int length); // forvard declaration of callback
byte server[] = { 192,168,1,109 }; //192.168.1.109 is ip of RPi MQTT broker
MQTT client(server, 1883, callback, true); //initialize client

// recieve MQTT message
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
}

void setup() {
  Serial1.begin(9600);

  client.connect("sparkclient");  // connect to the server
    if (client.isConnected())  // test publish/subscribe
   {
        client.publish("outTopic/message","hello world");  // Send "hello world" to broker
        client.subscribe("inTopic/message");
    }

}  //  END SETUP

void loop() {

 if (lastTempCheck + TEMP_CHECK_INTERVAL < millis()) {
   lastTempCheck = millis();

    if (client.isConnected())
        {
  	updateMQTT();  //  Updates MQTT broker with values
        }
    }

}  // END LOOP

   
void updateMQTT()
{
  count++; // increment counter
  
if(count >= countLimit)  // check to see if counter is => countLimit (10)
  {
    count = 0;  // if it is then reset counter to 0
  }
  
  snprintf(payloadout, sizeof(payloadout), payloadout_template, count);   // put count value in template          
  client.publish("sensor/count", payloadout);  // publish in JSON format              
}

Can you try this instead

const char payloadout_template[] = "{\"count\":%.2f}";

But as @dreamER indicated, if your variable isn't a floating point you shouldn't use %f anyhow :wink:

Also why would you do this

but never use p?

And finally, you do check for if (client.isConnected()) but never try to reconnect if this should happen to be not true.

BTW, the safer way to do the time check would be like this

if (millis() - lastTempCheck >= TEMP_CHECK_INTERVAL)

for future usage/functionality I guess :slight_smile:

IMHO it's not going to work :wink:

That's why I added this just before your post showed up :wink:

However, when using floating points the extra 0 isn't of any use there.

1 Like

Thanks all, I made the changes you both suggested:

  1. changed the %.2f to %d. :white_check_mark:
  2. played around with double and float var types. Ended up with int :white_check_mark:
  3. forward declared the callback and removed the actions from the function (I guess I could have deleted it… :white_check_mark:
  4. modified the time check :white_check_mark:
    and

Up until this point everything works as expected. When I made this next change everything broke…

  1. moved SYSTEM_THREAD(ENABLED) to the very top and added “true” to the initialize client statement.
MQTT client(server, 1883, callback, true); //initialize client

The reason I started this post originally was that my MQTT publish would get stuck if my publish intervals were less than ~15 sec. And I’m not sure how reliable it was in any case. I’ll continue to monitor this sketch for the next day and see if things hold up.
Any thoughts on #5?
Thanks again, I’m learning a lot.