Using a second instance of BackgroundPublishRK library in parallel with PublishQueuePosixRK intance

I have an application where I have Boron that will be creating its own webhooks that I plan on backing up to non-volatile memory and sending asynchornously using the excellent PublishQueuePosixRK library , which uses the BackgroundPublishRK library.

In my product, the Boron is also the host of its own local rs-485 network in which a few slaves are connected. Each slave manages its own internal FIFO non-volatile webhook buffers that get popped/published/discarded via polling by the Boron.

My idea is to have two threads delivering webhooks: one thread that is caching the Boron-generated webhooks to external SPI using PublishQueuePosix library, and another thread that is delivering the oldest webhook from the rs-485 network in parallel.

Is that possible with the library as-is? I took a look at the BackgroundPublishRK library sourcecode and it seems like it is designed to only have a single instance. It says specifically:

   /**
     * @brief Gets the singleton instance of this class.
     * 
     * The constructor to this class is private; you cannot create an instance of this object
     * on the stack, as a global, or using new. Instead, you must use the static instance()
     * method to get the singleton instance of this class, as there can only be one per
     * application.
     */

Can I just copy/paste the library and do a global find/replace of “BackgroundPublishRK” and change it to “BackgroundPublishRKCopy” and - whamo - have a 2nd instance up-and running in parallel?

Is that a bad idea @rickkas7 ?

That definitely won’t work. Aside from the reason you mentioned, you can only have one publish loop because otherwise you’ll exceed the publish rate limit.

I think the easiest way to do it is fork the background publish library and implement a priority queuing scheme. Basically in the thread instead of only pulling from the background publish queue, you’d implement something that examines both queues and determines which to grab the event from.

@rickkas7 how would you go about doing this in workbench from a repo-management perspective?

I can fork the repo and add it as a “subtree” to my project in the /lib/ folder, and then trick Workbench into thinking that it is an installed library. Doing things this way, I can get things to compile properly (including the PublishQueuePosixRK library which now is compiling using the forked “library”).

Another option I’ve considered is to add that forked repo as a Submodule to my current repo. The problem with that approach is that I can’t locate the submodule in the /lib/ folder, and thus I can’t trick Workbench in to including the library in its build process.

What would you recommend?

I would do it the first way. Just make sure you remove the library from project.properties otherwise cloud compiles will pick up the original one, not your modified copy.

The submodule technique should work as well, as long as you have a project.properties file in the top level of your project, but you do need to populate the submodule, such as with

git submodule update --init --recursive

if you’ve cloned a fresh copy.

1 Like

I figured out a way of getting what I wanted by doing the following:

  • Forked PublishQueuePosixRK into my own repository: PublishQueuePosixRKTM
  • Added that repo as a git subtree of my project, cloned into the /lib directory
  • Changed the following things
    • Added the following class functions to PublishQueuePosix class:
      • sending()* :point_right: true when BackgroundPublishQueue is processing a request from the PublishQueuePosix instance
      • failCounter()* :point_right: returns the number of times that the BackgroundPublishQueue thread returned false to the PublishQueuePosix publish callback
      • deliveredCounter()* :point_right: returns the number of times that the BackgroundPublishQueue thread returned true to the PublishQueuePosix publish callback
      • resetCounters()* :point_right: call this to reset the failed/delivered counters

Then in my code, I have a webhook state machine that makes use of the existing setPausePublishing() and getPausePublishing() PublishPosixQueue class functions to allow my own app’s state machine to use the BackgroundPublishQueue instance/thread while the PublishPosixQueue state machine is in pause state.

See code below:

bool webhooks_update(void)
{
    bool posix_queue_sending = PublishQueuePosix::instance().sending();
    //Is there a boss webhook that was delivered that we ought to inform the boss about?
    //This will trigger after BackgroundPublishQueue boss callback
    if(
        unhandled_boss_webhook_delivery_success 
        && sendingBoss
        && sendingBoss->advanceWebhookFIFO()
    ){
        unhandled_boss_webhook_delivery_success = false;
    }
    //If there is an oldest webhook and there is no boss webhook being sent right now...
    if(!boss_hook_sending && !posix_queue_sending){
        // myLog.info("Determining oldest boss webhook...");
        //Determine which webhook to send, from which queue
        Boss* hookBoss = boss_oldest_hook();
        // size_t boss_queue_sz = boss_hook_count();
        static unsigned int here_count = 0;
        //BossApp webhooks take priority over other webhooks 90% of the time when they are available
        if(
            !hookBoss
            || (here_count++) % 10 == 0
        ){
            PublishQueuePosix::instance().setPausePublishing(false);
        }
        //If there is a boss webhook to send
        else if (hookBoss){
            //Send the oldest boss webhook
            const char* eventName = NULL;
            const char* eventData = NULL;
            if(
                hookBoss->prepareWebhook(eventName, eventData)
                && eventName && eventData
            ){
                PublishQueuePosix::instance().setPausePublishing(true);
                if(BackgroundPublishRK::instance().publish(eventName, eventData, PRIVATE | WITH_ACK, bossPublishCallback, "bossHook")){
                    myLog.info("publishing boss webhook");
                    boss_hook_sending = true;
                    sendingBoss = hookBoss;
                    PublishQueuePosix::instance().setPausePublishing(true);
                }
            }
        }
    }
    // else{
    //     myLog.info(boss_hook_sending?"boss_hook_sending...":"posix_queue_sending...");
    // }

    //Let the PublishQueuePosix library run its loop as normal, having ensured that the correct pauses are in place (see above)
    PublishQueuePosix::instance().loop();

   #ifdef WEBHOOK_UNIT_TEST_MODE
   webhook_unit_test_loop();
   #endif
    return true;
}

2 Likes

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.