Mqtt + particle functions + particle variables seem to conflict

I had an application running perfectly with Particle Functions and Variables accessible via the device dashboard. I then needed to add MQTT to the application so that I could report the state of the device to my broker. But after adding this feature, the online functions keep disappearing from the dashboard and sometimes the device is completely unreachable and needs a hard reset to recover or (that one time) a factory reset.

Am I doing it wrong?

We cannot tell without seeing your code.

I do exactly this and can confirm it can work well - so as @ScruffR says - show us some code?

Thank you @shanevanj and @ScruffR for responding, Here is the code, all variables are declared in a separate header file

// This #include statement was automatically added by the Particle IDE.
#include <MQTT.h>


// This #include statement was automatically added by the Particle IDE.
#include <neopixel.h>
#include "clo2_config.h"

// IMPORTANT: Set pixel COUNT, PIN and TYPE
Adafruit_NeoPixel strip(PIXEL_COUNT, PIXEL_PIN, PIXEL_TYPE);
void mqtt_callback(char* topic, byte* payload, unsigned int length);
MQTT client(ip_address, port, mqtt_callback);

enum {
  MQTT_IDLE,
  MQTT_PROCESS_COMMAND,
  MQTT_TOGGLE_SYSTEM,
  MQTT_PREPARATION_TIME,
  MQTT_CHLORINATION_TIME,
  MQTT_RESET_SYSTEM
};

int mqtt_state = MQTT_IDLE;
char mqtt_command[16];


int led_heartbeat() {
    digitalWrite(onboard_led, switch_flag);
}

void get_device_ID() {
    device_ID = System.deviceID();
}

void setup_variables() {
    Particle.variable("trg_src", trigger_source);
    Particle.variable("pixel_color", pixel_color_string);
    Particle.variable("fan_state", fan_state);
    Particle.variable("buzzer_state", buzzer_state);
    Particle.variable("current_state", triggered_event);
    Particle.variable("prep_time", prep_time);
    Particle.variable("chlorination_time", chlorination_time);
}

void setup_neopixel()
{
  strip.begin();
  strip.show(); // Initialize all pixels to 'off'
}

void convert_color_to_string() {
    switch (pixel_color) {
        case RED:
            pixel_color_string = "RED";
            break;
        case GREEN:
            pixel_color_string = "GREEN";
            break;
        case BLUE:
            pixel_color_string = "BLUE";
            break;
        case YELLOW:
            pixel_color_string = "YELLOW";
            break;
        default:
            pixel_color_string = "UNKNOWN";
            break;
    }
}

void set_pixel_color(uint32_t c) {
  for(int i=0; i<strip.numPixels(); i++) {
    strip.setPixelColor(i, c);
  }
  strip.show();
}

void flash_neopixel(unsigned long _pixel_period, uint32_t color) {
    if((millis() - pixel_timer) > _pixel_period) {
        pixel_flag = !pixel_flag;
        if(_pixel_period == ALWAYS_ON) {
            pixel_flag = true;
        }
        if(pixel_flag) {
            set_pixel_color(color);
        }
        else{
            set_pixel_color(0);
        }
        pixel_timer = millis();
    }
}

void toggle_fans(bool _state) {
    digitalWrite(fan_1, _state);
    digitalWrite(fan_2, _state);
}

void toggle_buzzer(bool _state) {
    if(!buzzing_state) {
        digitalWrite(buzzer, !_state);
    }
}

void process() {
      /*
   * start
   * Check power
   * check start button press


   * flash yellow light in preparation
   * wait 2 hours
   * start both fans
   * flash green for chlorination
   * trigger buzzer
   * wait 2 hours
   * flash blue to show end of process and trigger buzzer
   * check for power button press
   * end
   */

   if( switch_flag ) { // start system only if button is pressed

       unsigned long _system_timer_local = millis() - systme_timer;

       if( _system_timer_local <= prep_time ) { // system is off, pulsing every two seconds
            buzzer_state = false;
            fan_state = false;
            pixel_color = GREEN;
            pixel_period = TWO_SECONDS;
            triggered_event = "Preparation start";
            if(publish_counter = 0) {
                ready_to_send_data = true;
                publish_counter = publish_counter + 1;
            }
       }

       else if( (_system_timer_local <= (chlorination_time + prep_time)) && (_system_timer_local > prep_time) ) { // system is on blinks every half a second
            buzzer_state = true;
            fan_state = true;
            pixel_color = GREEN;
            pixel_period = ALWAYS_ON;
            triggered_event = "Chlorination start";
            if(publish_counter = 1) {
                ready_to_send_data = true;
                publish_counter = publish_counter + 1;
            }
       }
       else if( (_system_timer_local <= (chlorination_time + ONE_SECOND)) && (_system_timer_local > chlorination_time) ) { // system is off at the end  blinks blue
            buzzer_state = false;
            fan_state = false;
            pixel_color = GREEN;
            pixel_period = HALF_A_SECOND;
            triggered_event = "Process conclusion";
            if(publish_counter = 2) {
                ready_to_send_data = true;
                publish_counter = publish_counter + 1;
            }
            // systme_timer = millis();
       }
       else {
        //   systme_timer = millis();
            switch_flag = false;
            triggered_event = "Process complete";
            if(publish_counter = 3) {
                ready_to_send_data = true;
                publish_counter = publish_counter + 1;
            }
       }
   }

   else { // keep all systems off and pixel flashing red
            buzzer_state = false;
            fan_state = false;
            pixel_color = GREEN;
            pixel_period = HALF_A_SECOND;
            publish_counter = 0;
            systme_timer = millis();
   }

}

/**** Online Functions ***/
int toggle_system_remotely(String _state) {
    _state.toUpperCase();
    if( _state == "ON" ) {
        if(!switch_flag) { // only turn on if system is not runing
            switch_flag = true;
            systme_timer = millis();
            triggered_event = "System remote turn on";
            ready_to_send_data = true;
            buzzer_counter = 4;
            buzzing_state = true;
        }
    }
    else if( _state == "OFF" ) {
        if(switch_flag) { // only turn off if system is runing
            switch_flag = false;
            triggered_event = "System remote turned off";
            ready_to_send_data = true;
            buzzer_counter = 4;
            buzzing_state = true;
        }
    }
    trigger_source = PARTICLE_TRIG;
    return switch_flag; // return the switch_state.
}

int fan_test(String _state) {
  _state.toUpperCase();
  if( _state == "ON" ) {
    fan_state = 1;
  }
  else {
    fan_state = 0;
  }
  ready_to_send_data = true;
  return fan_state;
}

int ptcl_tgl_sys(String _tgl) {
  trigger_source = PARTICLE_TRIG;
  return toggle_system_remotely(_tgl);
}

int mqtt_tgl_sys() {
  String _tgl = read_command_payload(mqtt_command);
  trigger_source = MQTT_TRIG;
  return toggle_system_remotely(_tgl);
}



int change_preparation_time(String _prep_time) { // this will stop a process that's running
    switch_flag = false; // switch off system first
    int _num_of_mins = _prep_time.toInt();
    prep_time =  ONE_MIN * _num_of_mins;// change the time;
    systme_timer = millis(); // reset the system time
    triggered_event = "prep time updated";
    ready_to_send_data = true;

    return prep_time;
}

int ptcl_prep_t(String _prep) {
  trigger_source = PARTICLE_TRIG;
  return change_preparation_time(_prep);
}

int mqtt_prep_t() {
  String _prep = read_command_payload(mqtt_command);
  trigger_source = MQTT_TRIG;
  return change_preparation_time(_prep);
}


int change_chlorination_time(String _cl_time) { // this will stop a process that's running
    switch_flag = false; // switch off system first
    int _num_of_mins = _cl_time.toInt();
    chlorination_time =  ONE_MIN * _num_of_mins;// change the time;
    systme_timer = millis(); // reset the system time
    triggered_event = "Chlorination time updated";
    ready_to_send_data = true;

    return chlorination_time;
}

int ptcl_chnge_cl(String _cl) {
  trigger_source = PARTICLE_TRIG;
  return change_chlorination_time(_cl);
}

int mqtt_chnge_cl() {
  String _cl = read_command_payload(mqtt_command);
  trigger_source = MQTT_TRIG;
  return change_chlorination_time(_cl);
}



int reset_system(String _reset) { // reset system to defaults
    _reset.toUpperCase();
    if( (_reset == "RESET") || (_reset == "R") ) {
        switch_flag = false; // switch off system first
        prep_time =  ONE_MIN;
        chlorination_time =  ONE_MIN;
        triggered_event = "System Reset";
        ready_to_send_data = true;
    }

    return 1;
}

int ptcl_rst_sys(String _reset) {
  trigger_source = PARTICLE_TRIG;
  return reset_system(_reset);
}

int mqtt_rst_sys() {
  String _reset = read_command_payload(mqtt_command);
  trigger_source = MQTT_TRIG;
  return reset_system(_reset);
}

void publish_data() {
    if(ready_to_send_data) {
    //   publish_to_particle();
      pulish_to_mqtt();

      ready_to_send_data = false;
    }
}

void publish_to_particle() {
  Particle.publish("system_event", triggered_event, PRIVATE);
}

void pulish_to_mqtt() {
    String _payload_;

    String system_event = "\"system_event\":";
    String id = "\"id\":\"";
    String publish_counter_str = "\",\"publish_counter\":\"";
    String trig = "\",\"trigger_source\":\"";
    String state = "\",\"state\":\"";
    String led = "\",\"led\":\"";
    String fan = "\",\"fan\":";
    String buzzer = ",\"buzzer\":";
    String _prep_time = ",\"prep_time\":";
    String _chlorination_time = ",\"chorination_time\":";

    String trig_source_str;

    switch (trigger_source) {
        case    PARTICLE_TRIG:
            trig_source_str = "PARTICLE";
            break;
        case    BUTTON_TRIG:
            trig_source_str = "BUTTON";
            break;
        case    MQTT_TRIG:
            trig_source_str = "MQTT";
            break;
        default:
            trig_source_str = "-";
            break;
    }

    String fan_state_str;
    fan_state_str = fan_state ? "ON" : "OFF";

    String buzzer_state_str;
    buzzer_state_str = buzzer_state ? "ON" : "OFF";

    _payload_.concat('{');_payload_.concat(system_event);
    _payload_.concat('{');

    _payload_.concat(id);_payload_.concat(device_ID);
    _payload_.concat(trig);_payload_.concat(trig_source_str);
    _payload_.concat(publish_counter_str);_payload_.concat(publish_counter);
    _payload_.concat(state);_payload_.concat(triggered_event);
    _payload_.concat(led);_payload_.concat(pixel_color_string);
    _payload_.concat(fan);_payload_.concat(fan_state_str);
    _payload_.concat(buzzer);_payload_.concat(buzzer_state_str);
    _payload_.concat(_prep_time);_payload_.concat(prep_time);
    _payload_.concat(_chlorination_time);_payload_.concat(chlorination_time);

    _payload_.concat('}');_payload_.concat('}');

    mqtt_connect();

    for(int i = 0; i < 10; i++) {
        if (client.isConnected()) {
            break;
        }
        delay(100); //wait 100mS
    }

    if (!client.isConnected()) {
      return;
    }
    client.publish(topic, _payload_);
    mqtt_disconnect();
}

void init_online_functions() {
    Particle.function("toggle_system_remotely", ptcl_tgl_sys);
    Particle.function("change_preparation_time", ptcl_prep_t);
    Particle.function("change_chlorination_time", ptcl_chnge_cl);
    Particle.function("reset_system", ptcl_rst_sys);
    Particle.function("test_fans", fan_test);
}

/* MQTT functions */
void mqtt_state_machine() {
  switch (mqtt_state) {
    case MQTT_IDLE:
      //get new commands
      client.subscribe(topic);
      break;
    case MQTT_PROCESS_COMMAND:
      mqtt_state = read_mqtt_state(mqtt_command);
      break;
    case MQTT_TOGGLE_SYSTEM:
      mqtt_tgl_sys();
      clear_mqtt_buffer();
      break;
    case MQTT_PREPARATION_TIME:
      mqtt_prep_t();
      clear_mqtt_buffer();
      break;
    case MQTT_CHLORINATION_TIME:
      mqtt_chnge_cl();
      clear_mqtt_buffer();
      break;
    case MQTT_RESET_SYSTEM:
      mqtt_rst_sys();
      clear_mqtt_buffer();
      break;
    default:
      //unknown state
      mqtt_state = MQTT_IDLE;
      break;
  }
  mqtt_state = MQTT_IDLE;
}

void mqtt_callback(char* topic, byte* payload, unsigned int length) {
    if (length <= sizeof(mqtt_command)) {
      memcpy(mqtt_command, payload, length);
      mqtt_state = MQTT_PROCESS_COMMAND;
    }
    else {
      //ignore everything too large to handle
      mqtt_state = MQTT_IDLE;
    }
}

void clear_mqtt_buffer() {
  memset(mqtt_command, 0, sizeof(mqtt_command));
}

int read_mqtt_state(char * _mqtt_command) {
  return _mqtt_command[0] - '0';
}

String read_command_payload(char * _mqtt_command) {
  String _cmd = String(_mqtt_command);
  _cmd.trim();
  return _cmd.substring(2); // cut off the state and comma
}


void mqtt_subscribe_only_when_idle() {
  if(mqtt_state != MQTT_IDLE) {
    client.unsubscribe(topic);
  }
}

void mqtt_connect() {
    // int id_len = device_ID.length();
    // char _id[id_len];
    // sprintf(_id, "%s", device_ID);
    client.connect(device_ID); //connect to device ID
}

void mqtt_disconnect() {
    client.disconnect();
}

void keep_mqtt_alive() {
  if (!client.isConnected()) {
      mqtt_connect();
  }
}

void setup_mqtt(void)
{
    mqtt_connect();
    if (client.isConnected())
    {
        client.publish(topic,"DEVICE RESTART");
        client.subscribe(topic);
    }
}

void mqtt_loop() {
//   keep_mqtt_alive();
//   mqtt_state_machine();
//   mqtt_subscribe_only_when_idle();
}



void button_poll() {
  if((millis() - button_timer) > button_poll_interval) { // check button state every half a second
    if(!digitalRead(_button)) { //if the button is pressed, increase button counter
      button_counter++;
      actionable_button_counter = button_counter;
    }
    else {
      button_counter = 0; // reset to zero
    }
    button_timer = millis(); //reset_timer;
  }
}

void action_button_press() {
  if(button_counter == 0) { //check if there has been a button press not yet actioned
    if(actionable_button_counter < 1) {
      actionable_button_counter = 0; //not held long enough, reset
    }
    else if(actionable_button_counter >= 1) {
      switch_flag = !switch_flag;
      trigger_source = BUTTON_TRIG;

      buzzer_counter = 4;
      buzzing_state = true;
      actionable_button_counter = 0; // reset actionable counter to zero
    }
  }
}

void buzzer_handle() {
    if(buzzing_state) {
        if((millis() - buzzer_timer) > buzzer_period) {
            digitalWrite(buzzer, !digitalRead(buzzer));
            buzzer_counter--;
            buzzer_timer = millis();
        }
        if(buzzer_counter <= 0) {
            digitalWrite(buzzer, HIGH);
            buzzing_state = false;
        }
    }
}

/**** System functions ****/

void setup() {
    // Get the DEvice ID
    get_device_ID();

    // Setup online variables
    setup_variables();

    //init remote functions
    init_online_functions();

    //init the neopixel
    setup_neopixel();

    //init mqtt
    setup_mqtt();

    //init the onboard led
    pinMode(onboard_led, OUTPUT);

    //init fan relay pins
    pinMode(fan_1, OUTPUT);
    pinMode(fan_2, OUTPUT);

    //init buzzer pin
    pinMode(buzzer, OUTPUT);
    digitalWrite(buzzer, HIGH);

    //init button pin
    pinMode(_button, INPUT_PULLUP);
}

void loop() {
    //publish data
    publish_data();

    // delay(10000);
    // ready_to_send_data = true;


    // flash the onboard led
    led_heartbeat();

    // flash neopixel
    flash_neopixel(pixel_period, pixel_color);

    // toggle fan states
    toggle_fans(fan_state);

    // toggle buzzer
    toggle_buzzer(buzzer_state);

    // Run process
    process();

    // Update color string
    convert_color_to_string();

    // monitor button press
    button_poll();

    // Action button press
    action_button_press();

    // buzzer handler
    buzzer_handle();

    // Run through the mqtt mqtt_loop
    mqtt_loop();
}

Okay - lots of stuff to wade through - however -

In this part of the docs -

  • If the user loop blocks for more than about 20 seconds, the connection to the Cloud will be lost. To prevent this from happening, the user can call Particle.process() manually.

I am concerned that this may be happening due to single threading and delays in the overall flow, so can you add
SYSTEM_THREAD(ENABLED); at the top after your includes, so that the particle communications can manage itself and not be affected by the rest of the app.

I tried my best to avoid using blocking code,
Let me try this, and get back to you on the results of it. Thank you

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.