Particle publish and blocking

Ok thanks! Do you know if particle.publish is thread safe?

@rickkas7 @BDub I was able to remove some features in my product to include your 0.0.3 ver AsyncPublish library. I am still finding that on a lossy connection that the device gets blocked for upto 10 sec. This is a case where the connection dropped out after this or just prior. Do you have any thoughts why the device is still getting blocked in this case of a dropped connection?

@BDub Could this be related to the fixes included in 1.0.1 and made the recovery much faster?

This is where it blocked:

 102.958 AT send     109 "\x17\xfe\xfd\x00\x01\x00\x00\x00\x00\x00\x1c\x00`\x00\x01\x00\x00\x00\x00\x00\x1c\xaf?\xd8\xf1\x80\xc3\xba\xdc\xfdL_\xd5\x8a\xae\xae\x04C\xa4O\x17\xcf\xd3\x86V~\xd2b\xd6\xa7I,\x015\xc4\xd55\xb4*I\xcc\xc6\x1b\x06\x10\xd4\xd9\xfeY\x7f\xcc\xc3\xeb\xa7N\xf0\xff\xf1~b\xca\xdb\xa0\xe2\x0fZ\x0f\x1fy\xa4\x19d\t?\x14 c\x84,\x9a\x81\x9d\xa7\x1e\xc6\xc7\xf8\x97\xab"
0000103285 [app.pubq] INFO: queueing eventName=s data=02100001010000001000.1553024929.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
   112.967 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"

Below are the logs:

[ Modem::getSignalStrength ] = = = = = = = = = =
    97.538 AT send       8 "AT+CSQ\r\n"
    97.544 AT read  +   15 "\r\n+CSQ: 20,99\r\n"
    97.545 AT read OK    6 "\r\nOK\r\n"
0000097996 [app.pubq] INFO: queueing eventName=s data=02101001010000001000.1553024923.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
0000097997 [app.pubq] INFO: publishing s 02101001010000001000.1553024923.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags=9
0000097998 [comm.coap] TRACE: sending message id=16
0000097998 [system] TRACE: send 109
socketSendTo(0,18.213.151.3,5684,,109)
    97.995 AT send      11 "AT+CEREG?\r\n"
    98.003 AT read  +   34 "\r\n+CEREG: 2,1,\"4608\",\"4E62510\",8\r\n"
    98.005 AT read OK    6 "\r\nOK\r\n"
    98.005 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
    98.012 AT read  >    3 "\r\n@"
    98.012 AT send     109 "\x17\xfe\xfd\x00\x01\x00\x00\x00\x00\x00\x1b\x00`\x00\x01\x00\x00\x00\x00\x00\x1b\x89\x8a\x06\xf2\xb4#\x8b\x01=\xaf\xadjE>P\xcb\x1coo\xf0\xea\xab\xbe:\xeb\xbe\xecC\x86\xfeCLT\\\x0f;\x93\xe7\xcd\xb4\xa3\x1e\x1f\xf7]\x05\r\xb0\xb4\xa4a\xbd\xf9\xea0\xb3>\xd5\\\a\xbfg4\xf4\x92\x8ae\xa7\xafp\x98\xb2}\x8fC>#\xfeu%y\xca\x15\x94C'\xc3\xa3"
    98.032 AT read  +   17 "\r\n+USOST: 0,109\r\n"
    98.033 AT read OK    6 "\r\nOK\r\n"
    98.033 AT send      14 "AT+USORF=0,0\r\n"
    98.039 AT read UNK   1 "\r"
    98.040 AT read UNK   5 "\r\n\r\r\n"
    98.041 AT read UNK  11 "+USORF: 0,0"
    98.042 AT read OK    6 "\r\nOK\r\n"
0000098874 [app.pubq] INFO: queueing eventName=s data=02100001010000001000.1553024924.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84

[ Modem::getSignalStrength ] = = = = = = = = = =
   100.051 AT send       8 "AT+CSQ\r\n"
   100.057 AT read  +   15 "\r\n+CSQ: 20,99\r\n"
   100.058 AT read OK    6 "\r\nOK\r\n"

[ Modem::getSignalStrength ] = = = = = = = = = =
   102.560 AT send       8 "AT+CSQ\r\n"
   102.566 AT read  +   15 "\r\n+CSQ: 20,99\r\n"
   102.567 AT read OK    6 "\r\nOK\r\n"
0000102632 [app.pubq] INFO: queueing eventName=s data=02101001010000001000.1553024928.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
0000102941 [system] TRACE: send 109
socketSendTo(0,18.213.151.3,5684,,109)
   102.937 AT send      11 "AT+CEREG?\r\n"
   102.946 AT read  +   34 "\r\n+CEREG: 2,1,\"4608\",\"4E62510\",8\r\n"
   102.949 AT read OK    6 "\r\nOK\r\n"
   102.949 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
   102.958 AT read  >    3 "\r\n@"
   102.958 AT send     109 "\x17\xfe\xfd\x00\x01\x00\x00\x00\x00\x00\x1c\x00`\x00\x01\x00\x00\x00\x00\x00\x1c\xaf?\xd8\xf1\x80\xc3\xba\xdc\xfdL_\xd5\x8a\xae\xae\x04C\xa4O\x17\xcf\xd3\x86V~\xd2b\xd6\xa7I,\x015\xc4\xd55\xb4*I\xcc\xc6\x1b\x06\x10\xd4\xd9\xfeY\x7f\xcc\xc3\xeb\xa7N\xf0\xff\xf1~b\xca\xdb\xa0\xe2\x0fZ\x0f\x1fy\xa4\x19d\t?\x14 c\x84,\x9a\x81\x9d\xa7\x1e\xc6\xc7\xf8\x97\xab"
0000103285 [app.pubq] INFO: queueing eventName=s data=02100001010000001000.1553024929.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
   112.967 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
mbedtls/library/ssl_tls.c:2971: mbedtls_ssl_flush_output() returned -1 (-0x0001)
mbedtls/library/ssl_tls.c:7165: mbedtls_ssl_write_record() returned -1 (-0x0001)

[ Modem::getSignalStrength ] = = = = = = = = = =
   122.970 AT send       8 "AT+CSQ\r\n"
0000123074 [comm.protocol] ERROR: Event loop error 3
0000123074 [system] WARN: Communication loop error, closing cloud socket
0000123074 [system] INFO: Cloud: disconnecting
0000123075 [system] INFO: Cloud: disconnected
0000123075 [app.pubq] INFO: published failed, will retry in 30000 ms
0000123175 [system] INFO: Cloud: connecting
0000123175 [system] TRACE: sparkSocket Now =0
0000123175 [system] TRACE: Close Attempt
socketClose(0)
   123.973 AT send      11 "AT+CEREG?\r\n"
socketFree(0)
0000124978 [system] TRACE: socket_close()=success
0000124980 [system] INFO: Read Server Address = type:1,domain:$id.udp.particle.io

Are you calling Cellular.RSSI() from your code? If you are, it will block for up to several minutes if the cloud connection is lost.

I do call that every 2.5 seconds. I also check for particle connected and cellular ready before I check the RSSI.

From the log it was called just before the sending of data but retuend ok it blocked at this line

 102.958 AT send     109 "\x17\xfe\xfd\x

RSSI read just before, but returned

[ Modem::getSignalStrength ] = = = = = = = = = =
   102.560 AT send       8 "AT+CSQ\r\n"
   102.566 AT read  +   15 "\r\n+CSQ: 20,99\r\n"
   102.567 AT read OK    6 "\r\nOK\r\n"
0000102632 [app.pubq] INFO: queueing eventName=s data=02101001010000001000.1553024928.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
0000102941 [system] TRACE: send 109
socketSendTo(0,18.213.151.3,5684,,109)
   102.937 AT send      11 "AT+CEREG?\r\n"
   102.946 AT read  +   34 "\r\n+CEREG: 2,1,\"4608\",\"4E62510\",8\r\n"
   102.949 AT read OK    6 "\r\nOK\r\n"
   102.949 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
   102.958 AT read  >    3 "\r\n@"
   102.958 AT send     109 "\x17\xfe\xfd\x00\x01\x00\x00\x00\x00\x00\x1c\x00`\x00\x01\x00\x00\x00\x00\x00\x1c\xaf?\xd8\xf1\x80\xc3\xba\xdc\xfdL_\xd5\x8a\xae\xae\x04C\xa4O\x17\xcf\xd3\x86V~\xd2b\xd6\xa7I,\x015\xc4\xd55\xb4*I\xcc\xc6\x1b\x06\x10\xd4\xd9\xfeY\x7f\xcc\xc3\xeb\xa7N\xf0\xff\xf1~b\xca\xdb\xa0\xe2\x0fZ\x0f\x1fy\xa4\x19d\t?\x14 c\x84,\x9a\x81\x9d\xa7\x1e\xc6\xc7\xf8\x97\xab"
0000103285 [app.pubq] INFO: queueing eventName=s data=02100001010000001000.1553024929.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
   112.967 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
mbedtls/library/ssl_tls.c:2971: mbedtls_ssl_flush_output() returned -1 (-0x0001)
mbedtls/library/ssl_tls.c:7165: mbedtls_ssl_write_record() returned -1 (-0x0001)

The following sequence (from your trace) corresponds to what we have seen when LTE modem locks up due to cellular connection dropping from low signal strength during a transmit.

   102.949 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
   102.958 AT read  >    3 "\r\n@"
   102.958 AT send     109 "\x17\xfe\xfd\x00\x01\x00\x00\x00\x00\x00\x1c\x00`\x00\x01\x00\x00\x00\x00\x00\x1c\xaf?\xd8\xf1\x80\xc3\xba\xdc\xfdL_\xd5\x8a\xae\xae\x04C\xa4O\x17\xcf\xd3\x86V~\xd2b\xd6\xa7I,\x015\xc4\xd55\xb4*I\xcc\xc6\x1b\x06\x10\xd4\xd9\xfeY\x7f\xcc\xc3\xeb\xa7N\xf0\xff\xf1~b\xca\xdb\xa0\xe2\x0fZ\x0f\x1fy\xa4\x19d\t?\x14 c\x84,\x9a\x81\x9d\xa7\x1e\xc6\xc7\xf8\x97\xab"
0000103285 [app.pubq] INFO: queueing eventName=s data=02100001010000001000.1553024929.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
   112.967 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"
mbedtls/library/ssl_tls.c:2971: mbedtls_ssl_flush_output() returned -1 (-0x0001)
mbedtls/library/ssl_tls.c:7165: mbedtls_ssl_write_record() returned -1 (-0x0001)

[ Modem::getSignalStrength ] = = = = = = = = = =
   122.970 AT send       8 "AT+CSQ\r\n"

The USOST are transmit commands that each have a 10-second timeout and block your RSSI check (the CSQ command). Further, when your RSSI check finally gets a turn the modem is locked up and will hit its own 10-second time.

1.0.1 recovers more quickly by detecting the modem lockup and going through targeted recovery steps. But the underlying failure is coming out of the LTE modem and will result in some delay.

You can move your RSSI checks to a background thread so as to not block your main application thread if the cellular modem is currently busy or non-responsive.

2 Likes

I’ll try creating a new thread for checking RSSI as that also blocked too as you and @rickkas7 said. This case was for about 7 seconds.

[ Modem::getSignalStrength ] = = = = = = = = = =
   122.970 AT send       8 "AT+CSQ\r\n"
0000123074 [comm.protocol] ERROR: Event loop error 3
0000123074 [system] WARN: Communication loop error, closing cloud socket

@rickkas7 do you think I would be able to check the RSSI in the AsyncPublish thread instead of creating another thread?

I’ve been contemplating that. I think so. If RSSI is stuck, you wouldn’t be able to publish anyway because they’re both waiting on the same resource.

@rickkas7 Could it be checked at the below? Or would there be a better spot to check it at?

void PublishQueueAsync::threadFunction() {
	// Call the stateHandler forever
	while(true) {
		stateHandler(*this);
            ****Check RSSI HERE?
		os_thread_yield();
	}
}
void PublishQueueAsync::checkQueueState() {
	// Is there data waiting to go out?
	volatile RetainedBufHeader *hdr = reinterpret_cast<RetainedBufHeader *>(retainedBuffer);

	bool haveEvent = false;
	SINGLE_THREADED_BLOCK() {
		haveEvent = (hdr->numEvents > 0);
	}

	if (haveEvent && Particle.connected() && millis() - lastPublish >= 1010) {
		// We have an event and can probably publish
		isSending = true;

		EventData *data = reinterpret_cas

Yes, that’s a reasonable place to put it. You probably want to surround it with a millis check so you don’t do it on every loop. I may add this as an optional feature, since it would be useful.

2 Likes

@rickkas7 after using your library i’ve noticed that when I change screens when updating the TFT screen that now there are times certain things now do not render. If I encase the rendering in SINGLE_THREADED_BLOCK() { } they dont get messed with.

Is this a byproduct of using multithreads? Where on thread can iterrupt the other one at anytime?
Can one be deprioritized?

Also does it hurt if i slow down the thread by using?

os_thread_delay_until(&lastThreadTime, 100);
Can this be called: thread = new Thread("PublishQueueAsync", threadFunctionStatic, this, 1, 2048);
vs: thread = new Thread("PublishQueueAsync", threadFunctionStatic, this, OS_THREAD_PRIORITY_DEFAULT, 2048);

or are the only 2 priorities as per below:


const os_thread_prio_t OS_THREAD_PRIORITY_DEFAULT = 2;
const os_thread_prio_t OS_THREAD_PRIORITY_NETWORK = OS_THREAD_PRIORITY_DEFAULT;
const os_thread_prio_t OS_THREAD_PRIORITY_CRITICAL = 9;

This is what I was thinking to get the RSSI from the thread.

void getRSSI() {
	#if PLATFORM_ID == PLATFORM_ELECTRON_PRODUCTION
	static uint32_t prevRSSIMillisB = 0;
	if (Particle.connected() && Cellular.ready()) {
		if (millis() > 5000 && celluarTurnedOn && millis() - prevRSSIMillisB >= 2500) {
			CellularSignal sig = Cellular.RSSI();
			wirelessRSSI = sig.rssi;
			prevRSSIMillisB = millis();
		}
	#endif
}

void PublishQueueAsync::threadFunction() {
	// Call the stateHandler forever
	while(true) {
		stateHandler(*this);
		getRSSI();
		os_thread_delay_until(&lastThreadTime, 100); // Delay so we're called every 100 milliseconds (10 times per second)
		//os_thread_yield();
	}
}

@rickkas7, I’ve had much better success when the RSSI is on a separate thread such as including it into the Async Library. I am no longer getting it to lock up as before. Even in the exact scenario as above, now the device doesnt lock up at the AT send when it has data to send but the connection was lost!

102.958 AT send     109 "\x17\xfe\xfd\x00\x01\x00\x00\x00\x00\x00\x1c\x00`\x00\x01\x00\x00\x00\x00\x00\x1c\xaf?\xd8\xf1\x80\xc3\xba\xdc\xfdL_\xd5\x8a\xae\xae\x04C\xa4O\x17\xcf\xd3\x86V~\xd2b\xd6\xa7I,\x015\xc4\xd55\xb4*I\xcc\xc6\x1b\x06\x10\xd4\xd9\xfeY\x7f\xcc\xc3\xeb\xa7N\xf0\xff\xf1~b\xca\xdb\xa0\xe2\x0fZ\x0f\x1fy\xa4\x19d\t?\x14 c\x84,\x9a\x81\x9d\xa7\x1e\xc6\xc7\xf8\x97\xab"
0000103285 [app.pubq] INFO: queueing eventName=s data=02100001010000001000.1553024929.4305.-6.1420:100.81111111111111.413.1.0 ttl=60 flags1=1 flags2=8 size=84
   112.967 AT send      36 "AT+USOST=0,\"18.213.151.3\",5684,109\r\n"

Added reliability and simplicity to my app. I was self discovering much of the complexity that your post demonstrated which led me here. I'm so happy to find this library. Thank you!

I’m trying to use os_thread_delay_until in the main loop… apparently it only works in threads… I always assumed the main loop was just another thread… anyone have any experience with this call?
thanks

There is actually a lot more going on on the application thread then just executing the code you place in your loop() implementation.
With halting that thread you may cause other side effects.

Have a look at this (with newer releases things get a bit more scattered, but this should illustrate the gist)

hmmm… point talen… thanks!

This is what I updated and have been testing it for a while with no blocking of user thread

void PublishQueueAsync::getRSSI() {
	#if PLATFORM_ID == PLATFORM_ELECTRON_PRODUCTION
	static uint32_t prevRSSIMillisB = 0;
	if (Particle.connected() && Cellular.ready()) {
		if (millis() > 5000 && celluarTurnedOn && millis() - prevRSSIMillisB >= 2500) {
			CellularSignal sig = Cellular.RSSI();
			wirelessRSSI = sig.rssi;
			prevRSSIMillisB = millis();
		}
	}
	#endif
}

void PublishQueueAsync::threadFunction() {
	// Call the stateHandler forever
	while(true) {
		stateHandler(*this);
		getRSSI();
		//os_thread_delay_until(&lastThreadTime, 100); // Delay so we're called every 100 milliseconds (10 times per second)
		os_thread_yield();
	}
}

Noob question - if I setup the following in the file.ino

retained uint8_t publishQueueRetainedBuffer[2048];
PublishQueueAsync publishQueue(publishQueueRetainedBuffer, sizeof(publishQueueRetainedBuffer));

and I use

publishQueue.publish("msg", tempMsg, PRIVATE, WITH_ACK);

in another file.cpp

I get the error

'publishQueue' was not declared in this scope

How do I declare it in scope for the other .cpp files where it will be used?

How do I declare it in scope for the other .cpp files where it will be used?

#include "PublishQueueAsyncRK.h"

extern PublishQueueAsync publishQueue;
2 Likes