PublishQueueAsyncRK driven by HTTP response status code

Using the excellent PublishQueueAsyncRK by @rickkas7, I need to modify it to use a status code from the end service like “200 OK” (from a HTTP POST with json), before discarding a publish from the queue.

I am thinking about setting a flag (cleared, success, failed) in the function that receives the HTTP response back on the device. That flag could be waited on and cleared by PublishQueueAsyncRK.

With PublishQueueAsyncRK in it’s own thread, how is a variable safely declared and used between threads?

Is there a web hook response template to only return the HTTP status code or limit the response to [15] chars to save cellular data?

Another way to achieve this?

Any help is greatly appreciated.

I tried implementing this, and it’s not particularly difficult to insert into PublishQueueAsyncRK. However, I ran into a problem I have not been able to work around.

It appears that if you publish from a worker thread and immediately get a hook-response event subscription response, something weird happens in Device OS and the cloud connection gets reset. I suspect it’s thread-related.

0000090403 [app.pubq] INFO: publishing testHookResponse 0 ttl=60 flags=9
0000090693 [comm.protocol] TRACE: Reply recieved: type=2, code=0
0000090694 [comm.protocol] TRACE: message id 60 complete with code 0.00
0000090695 [comm.protocol] TRACE: rcv'd message type=13
0000090696 [app.pubq] INFO: published successfully, waiting for hook-response
0000091000 [comm.protocol] TRACE: rcv'd message type=8
0000091002 [comm.protocol] ERROR: Event loop error 5
0000091003 [system] WARN: Communication loop error, closing cloud socket
0000091004 [system] INFO: Cloud: disconnecting
0000091005 [system] INFO: Cloud: disconnected
0000091006 [system] INFO: Cloud: connecting

If I turn off the server so there’s no response, it’s fine.
If I turn off the event subscription on-device, it’s fine.
With both enabled, then I get Event loop error 5 every time and the subscription handler is never called.

In any case, it will require some more investigation to figure out why this happens. Tested with 1.5.2 and 2.0.0-rc.1 on a Boron.

2 Likes

Thanks! I started on this as well (1.5.2 Argon) but the event subscription handler was not called, and had yet to figure out why.

Did you publish with NO_ACK?

I tried WITH_ACK, NO_ACK, and not waiting for the future to resolve. I also tried it on a Photon with 1.5.2.

On the Tracker Edge firmware, there’s a reliable publish scheme, and it uses a background thread to publish. However, it uses function calls, not subscriptions, to acknowledge the receipt of data. The reason is Tracker Edge is designed to work with unclaimed product devices, and unclaimed product devices cannot receive event subscriptions, but can receive function calls. Of course with a plain webhook you can’t send a function call, but that’s one reason why this code path is not exercised much.

1 Like

Thank you for the info. Here the end service is an Azure Service Bus, so requiring a function call to acknowledge is out of bounds.

Perhaps a work-around is a service monitoring SSE’s to run device specific function calls to acknowledge. But that would be adding a point of failure.

I would hate to give up PublishQueueAsyncRK, so I will look into “unthreading” it until this may be fixed in the OS one day.

Running with NO_ACK and SYSTEM_THREAD(ENABLED) it should not be blocking much as far as I can tell from the docs …

After “unthreading” the lib I can receive events again. It seems to run fine but needs testing.

To answer my own question on shortening responses, when there is no body with the response from the end server, the device does not get the hook-response/ event. So I put “X” in the response template to receive the event.

A hook-error/ response event contains a shortened message containing “error status 404 from [URL]”

Next is to let discarding a publish from the queue depend on hook response and a timeout …

I could “unthread” the lib, but was not proficient enough in C++ Class definitions to get http response info back and forth.

So instead I am using the lib as it is with publishQueue.setPausePublishing(true); and doing the publishing logic on the side.

Below what I am testing with now.

Instead of blocking, I always run Particle.publish() with NO_ACK followed by Particle.process() but I am not sure I can always discard with publishQueue.discardOldEvent(false) ?

// This #include statement was automatically added by the Particle IDE.
#include <PublishQueueAsyncRK.h>


SYSTEM_THREAD(ENABLED);
SYSTEM_MODE(SEMI_AUTOMATIC) // https://docs.particle.io/reference/firmware/electron/#optimizing-cellular-data-use-with-cloud-connectivity-on-the-electron


SerialLogHandler logHandler;

retained uint8_t publishQueueRetainedBuffer[2048];
PublishQueueAsync publishQueue(publishQueueRetainedBuffer, sizeof(publishQueueRetainedBuffer));

const unsigned long PUBLISH_PERIOD_MS = 30000;
unsigned long lastPublish = 8000 - PUBLISH_PERIOD_MS;
int counter = 0;


// PublishAsync Statemachine related
enum MyStates {START/*startState*/,CHECK /*checkQueueState*/, ACK /*(new)*/, RETRY/*waitRetryState*/};
struct StateMachineType { MyStates State; void (*func) (void); char stateName[6]; };
const StateMachineType StateMachine[] = { {START, STARTstate, "START"}, {CHECK, CHECKstate, "CHECK"}, {ACK, ACKstate, "ACK"}, {RETRY, RETRYstate, "RETRY"}};
MyStates mainState = START;
MyStates lastState = mainState;

bool pausePublishing = false;
bool isSending = false;
unsigned long failureRetryMs = 15000;


// Used for HTTP response handling 
enum responseStatus {CLEARED, WAITING, TIMEOUT, OK, ERROR};
responseStatus HttpResp = CLEARED;
const char* responseStatusNames[] = {"CLEARED","WAITING","TIMEOUT", "OK", "ERROR"};



void setup() {

	Serial.begin();
	publishQueue.setup();
	publishQueue.setPausePublishing(true); // Handle publishing here, due to OS error when publishing in other thread than event response reeceived
	
    Particle.subscribe(String("hook-response/PipeDream_" + System.deviceID()), responseHandler, MY_DEVICES);
    Particle.subscribe(String("hook-error/PipeDream_" + System.deviceID()), errorHandler, MY_DEVICES);

	Particle.process();
    Particle.connect();
}

void loop() {
    
    static unsigned long lastPublishQueue = 8000 - PUBLISH_PERIOD_MS;
    if (millis() - lastPublishQueue >= PUBLISH_PERIOD_MS) {
    	lastPublishQueue = millis();

    	Log.info("publishing");
        char buf[60]; // Particle.publish data (up to 255 characters (prior to 0.8.0), 622 characters (since 0.8.0))
        snprintf(buf, sizeof(buf), "{\"R\":%i,\"I\":%i,\"D\":\"{3720:-11.2, 3800:10.4}\"}", -65, 60);
        publishQueue.publish("PipeDream", buf, 60, PRIVATE, /*NO_ACK*/ WITH_ACK);
        Particle.process();	
    }

    (*StateMachine[mainState].func)(); // Runs the current statemachine state function
    if (mainState != lastState) Log.info("%s state",  StateMachine[mainState].stateName);

}



void responseHandler(const char *event, const char *data) {
    Log.info("Response: OK");
    if(HttpResp == WAITING) HttpResp = OK;
}

void errorHandler(const char *event, const char *data) {
    Log.info("Response Error: " + String(data));
    if(HttpResp == WAITING) HttpResp = ERROR;
}



void STARTstate() {
    if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}

    if (Particle.connected()) mainState = CHECK;
}


void CHECKstate() { /*checkQueueState*/
    // the NO_ACK / WITH_ACK flag is hijacked to decide if a HTTP response will confirm transmition
    // WITH_ACK requires particle.subscribe to hook_response and hook_error events and 
    // their handler functions must contain "if(HttpResp == WAITING) HttpResp = OK;" or "= ERROR;"
    // The actual Particle publish is always done with PRIVATE, NO_ACK
    if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}
    
	if (!pausePublishing && Particle.connected() && millis() - lastPublish >= 1010) {

		PublishQueueEventData *data = publishQueue.getOldestEvent();
		if (data) {
			// We have an event and can probably publish
			isSending = true;

			const char *buf = reinterpret_cast<const char *>(data);
			const char *eventName = &buf[sizeof(PublishQueueEventData)];
			const char *eventData = eventName;
			eventData += strlen(eventData) + 1;

			PublishFlags flags(PublishFlag(data->flags));

			Log.info("publishing %s %s ttl=%d flags=%x", eventName, eventData, data->ttl, flags.value());

			///*auto request = */Particle.publish(eventName, eventData, data->ttl, flags);
			Particle.publish(eventName, eventData, data->ttl, PRIVATE, NO_ACK);

            // If NO_ACK flag is set ignore http reponses, else wait for response before discarding
            if((flags.value() & NO_ACK.value()) == NO_ACK.value()){
                HttpResp = OK;
                Log.info("NO_ACK, ignoring any HTTP response");
            }
            else{
                HttpResp = WAITING;
                Log.info("Waiting for HTTP response");              
            }
            Particle.process(); // get the publish going, so we do not discard it too soon
            lastPublish = millis();
            mainState = ACK;
		}
		else {
			// No event
		}
	}
	else {
		// Not cloud connected or can't publish yet (not connected or published too recently)
	}

}


void ACKstate() { // Acknowledge back from the webhook 

    if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}

    if (millis() - lastPublish >= 5000) {
        HttpResp = TIMEOUT;
    }
    
	if (HttpResp != WAITING) {
	    
	    if(HttpResp == OK){
		    // Successfully published
		    Log.info("Published, no retry");
		    publishQueue.discardOldEvent(false);
		    mainState = CHECK;
	    }
	    else {
	    	// Did not successfully transmit, try again after retry time
	    	Log.info("Published failed, reason HTTP %s, might retry later", responseStatusNames[HttpResp]);
			mainState = RETRY;
	    }
	    isSending = false;
	    lastPublish = millis();
	    HttpResp = CLEARED;
	}
	
}


void RETRYstate() { /*waitRetryState*/

    if (mainState != lastState) {/*state changed action:*/ lastState = mainState;}

	if (millis() - lastPublish >= failureRetryMs) {
		mainState = CHECK;
	}
}

An observation.

I ran it for a couple of hours last night (30s period) with a serial log on an Argon, and the functionality itself worked well. But in more than 3% of the publishes the web hook response event subscription did not deliver, despite the cloud having received the http response. So the publish was retried successfully.

3% is drastic compared to my experience with other platforms.

I ran the web hook towards pipedream.com where all traffic was logged, and was also logging SSE’s. On two occasions I was present to check the console as well, showing the http response was received by the cloud both times, but no web hook event was received on the device according to the serial log.

@rickkas7 Regarding the use of publishQueue.discardOldEvent(false), AFAIK from the excellent doc it is all I can do. But it may be a problem when running storage in retained memory, and the buffer becomes full while sending the last item in the buffer.

There is no external access to the lib’s isSending flag, that is used internally to determine, if making room in a filled buffer should discard the last or second last item from the queue, when using retained memory.

If more people are running PublishQueueAsyncRK for the Queue function, perhaps a setIsSending() function can be added in the next version of the lib. Again, thank you for an excellent lib :+1:t2: