I could “unthread” the lib, but was not proficient enough in C++ Class definitions to get http response info back and forth.
So instead I am using the lib as it is with publishQueue.setPausePublishing(true);
and doing the publishing logic on the side.
Below what I am testing with now.
Instead of blocking, I always run Particle.publish()
with NO_ACK
followed by Particle.process()
but I am not sure I can always discard with publishQueue.discardOldEvent(false)
?
// This #include statement was automatically added by the Particle IDE.
#include <PublishQueueAsyncRK.h>
SYSTEM_THREAD(ENABLED);
SYSTEM_MODE(SEMI_AUTOMATIC) // https://docs.particle.io/reference/firmware/electron/#optimizing-cellular-data-use-with-cloud-connectivity-on-the-electron
SerialLogHandler logHandler;
retained uint8_t publishQueueRetainedBuffer[2048];
PublishQueueAsync publishQueue(publishQueueRetainedBuffer, sizeof(publishQueueRetainedBuffer));
const unsigned long PUBLISH_PERIOD_MS = 30000;
unsigned long lastPublish = 8000 - PUBLISH_PERIOD_MS;
int counter = 0;
// PublishAsync Statemachine related
enum MyStates {START/*startState*/,CHECK /*checkQueueState*/, ACK /*(new)*/, RETRY/*waitRetryState*/};
struct StateMachineType { MyStates State; void (*func) (void); char stateName[6]; };
const StateMachineType StateMachine[] = { {START, STARTstate, "START"}, {CHECK, CHECKstate, "CHECK"}, {ACK, ACKstate, "ACK"}, {RETRY, RETRYstate, "RETRY"}};
MyStates mainState = START;
MyStates lastState = mainState;
bool pausePublishing = false;
bool isSending = false;
unsigned long failureRetryMs = 15000;
// Used for HTTP response handling
enum responseStatus {CLEARED, WAITING, TIMEOUT, OK, ERROR};
responseStatus HttpResp = CLEARED;
const char* responseStatusNames[] = {"CLEARED","WAITING","TIMEOUT", "OK", "ERROR"};
void setup() {
Serial.begin();
publishQueue.setup();
publishQueue.setPausePublishing(true); // Handle publishing here, due to OS error when publishing in other thread than event response reeceived
Particle.subscribe(String("hook-response/PipeDream_" + System.deviceID()), responseHandler, MY_DEVICES);
Particle.subscribe(String("hook-error/PipeDream_" + System.deviceID()), errorHandler, MY_DEVICES);
Particle.process();
Particle.connect();
}
void loop() {
static unsigned long lastPublishQueue = 8000 - PUBLISH_PERIOD_MS;
if (millis() - lastPublishQueue >= PUBLISH_PERIOD_MS) {
lastPublishQueue = millis();
Log.info("publishing");
char buf[60]; // Particle.publish data (up to 255 characters (prior to 0.8.0), 622 characters (since 0.8.0))
snprintf(buf, sizeof(buf), "{\"R\":%i,\"I\":%i,\"D\":\"{3720:-11.2, 3800:10.4}\"}", -65, 60);
publishQueue.publish("PipeDream", buf, 60, PRIVATE, /*NO_ACK*/ WITH_ACK);
Particle.process();
}
(*StateMachine[mainState].func)(); // Runs the current statemachine state function
if (mainState != lastState) Log.info("%s state", StateMachine[mainState].stateName);
}
void responseHandler(const char *event, const char *data) {
Log.info("Response: OK");
if(HttpResp == WAITING) HttpResp = OK;
}
void errorHandler(const char *event, const char *data) {
Log.info("Response Error: " + String(data));
if(HttpResp == WAITING) HttpResp = ERROR;
}
void STARTstate() {
if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}
if (Particle.connected()) mainState = CHECK;
}
void CHECKstate() { /*checkQueueState*/
// the NO_ACK / WITH_ACK flag is hijacked to decide if a HTTP response will confirm transmition
// WITH_ACK requires particle.subscribe to hook_response and hook_error events and
// their handler functions must contain "if(HttpResp == WAITING) HttpResp = OK;" or "= ERROR;"
// The actual Particle publish is always done with PRIVATE, NO_ACK
if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}
if (!pausePublishing && Particle.connected() && millis() - lastPublish >= 1010) {
PublishQueueEventData *data = publishQueue.getOldestEvent();
if (data) {
// We have an event and can probably publish
isSending = true;
const char *buf = reinterpret_cast<const char *>(data);
const char *eventName = &buf[sizeof(PublishQueueEventData)];
const char *eventData = eventName;
eventData += strlen(eventData) + 1;
PublishFlags flags(PublishFlag(data->flags));
Log.info("publishing %s %s ttl=%d flags=%x", eventName, eventData, data->ttl, flags.value());
///*auto request = */Particle.publish(eventName, eventData, data->ttl, flags);
Particle.publish(eventName, eventData, data->ttl, PRIVATE, NO_ACK);
// If NO_ACK flag is set ignore http reponses, else wait for response before discarding
if((flags.value() & NO_ACK.value()) == NO_ACK.value()){
HttpResp = OK;
Log.info("NO_ACK, ignoring any HTTP response");
}
else{
HttpResp = WAITING;
Log.info("Waiting for HTTP response");
}
Particle.process(); // get the publish going, so we do not discard it too soon
lastPublish = millis();
mainState = ACK;
}
else {
// No event
}
}
else {
// Not cloud connected or can't publish yet (not connected or published too recently)
}
}
void ACKstate() { // Acknowledge back from the webhook
if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}
if (millis() - lastPublish >= 5000) {
HttpResp = TIMEOUT;
}
if (HttpResp != WAITING) {
if(HttpResp == OK){
// Successfully published
Log.info("Published, no retry");
publishQueue.discardOldEvent(false);
mainState = CHECK;
}
else {
// Did not successfully transmit, try again after retry time
Log.info("Published failed, reason HTTP %s, might retry later", responseStatusNames[HttpResp]);
mainState = RETRY;
}
isSending = false;
lastPublish = millis();
HttpResp = CLEARED;
}
}
void RETRYstate() { /*waitRetryState*/
if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}
if (millis() - lastPublish >= failureRetryMs) {
mainState = CHECK;
}
}