microbit: Message Bus updates to support URGENT events queue optimisations

Added new URGENT listener type, that ensures immediate, non queuing delivery of events.
Introduced two-pass algorithm into message bus. Only events with an event listener are now
queued at ingress - this reduces unecessary memory usage for 'uninteresting' events.
This commit is contained in:
Joe Finney 2015-09-18 23:20:44 +01:00
parent 058f73ede1
commit 2c9f63902c
4 changed files with 55 additions and 33 deletions

View File

@ -5,7 +5,7 @@
#include "MicroBitEvent.h"
#include "MemberFunctionCallback.h"
// MessageBusListener flags...
// MicroBitListener flags...
#define MESSAGE_BUS_LISTENER_PARAMETERISED 0x0001
#define MESSAGE_BUS_LISTENER_METHOD 0x0002
#define MESSAGE_BUS_LISTENER_BUSY 0x0004
@ -13,6 +13,7 @@
#define MESSAGE_BUS_LISTENER_QUEUE_IF_BUSY 0x0010
#define MESSAGE_BUS_LISTENER_DROP_IF_BUSY 0x0020
#define MESSAGE_BUS_LISTENER_NONBLOCKING 0x0040
#define MESSAGE_BUS_LISTENER_URGENT 0x0080
struct MicroBitListener
{

View File

@ -64,8 +64,10 @@ class MicroBitMessageBus : public MicroBitComponent
* or the constructors provided by MicroBitEvent.
*
* @param evt The event to send.
* @param mask The type of listeners to process (optional). Matches MicroBitListener flags. If not defined, all standard listeners will be processed.
* @return The 1 if all matching listeners were processed, 0 if further processing is required.
*/
void process(MicroBitEvent evt);
int process(MicroBitEvent &evt, uint32_t mask = MESSAGE_BUS_LISTENER_REENTRANT | MESSAGE_BUS_LISTENER_QUEUE_IF_BUSY | MESSAGE_BUS_LISTENER_DROP_IF_BUSY | MESSAGE_BUS_LISTENER_NONBLOCKING);
/**
* Register a listener function.

View File

@ -224,6 +224,10 @@ void scheduler_event(MicroBitEvent evt)
f = t;
}
// Unregister this event, as we've woken up all the fibers with this match.
pc.printf("Scheduler: Deregistering for Event: %d:%d\n", evt.source, evt.value);
uBit.MessageBus.ignore(evt.source, evt.value, scheduler_event);
}
@ -306,6 +310,10 @@ void fiber_wait_for_event(uint16_t id, uint16_t value)
// Add ourselves to the sleep queue. We maintain strict ordering here to reduce lookup times.
queue_fiber(f, &waitQueue);
// Register to receive this event, so we can wake up the fiber when it happens.
pc.printf("Scheduler: Registering for Event: %d:%d\n", id, value);
uBit.MessageBus.listen(id, value, scheduler_event, MESSAGE_BUS_LISTENER_NONBLOCKING);
// Finally, enter the scheduler.
schedule();
}

View File

@ -104,16 +104,27 @@ void async_callback(void *param)
*/
void MicroBitMessageBus::queueEvent(MicroBitEvent &evt)
{
MicroBitEventQueueItem *item = new MicroBitEventQueueItem(evt);
int processingComplete;
__disable_irq();
// Firstly, process all handler regsitered as URGENT. These pre-empt the queue, and are useful for fast, high priority services.
processingComplete = this->process(evt, MESSAGE_BUS_LISTENER_URGENT);
if (evt_queue_tail == NULL)
evt_queue_head = evt_queue_tail = item;
else
evt_queue_tail->next = item;
pc.printf("QueueEvent: Queueing: %d\n", !processingComplete);
__enable_irq();
if (!processingComplete)
{
// We need to queue this event for later processing...
MicroBitEventQueueItem *item = new MicroBitEventQueueItem(evt);
__disable_irq();
if (evt_queue_tail == NULL)
evt_queue_head = evt_queue_tail = item;
else
evt_queue_tail->next = item;
__enable_irq();
}
}
/**
@ -154,7 +165,7 @@ void MicroBitMessageBus::idleTick()
// Whilst there are events to process and we have no useful other work to do, pull them off the queue and process them.
while (item)
{
// send the event.
// send the event to all standard event listeners.
this->process(item->evt);
// Free the queue item.
@ -212,43 +223,43 @@ void MicroBitMessageBus::send(MicroBitEvent evt)
* This will attempt to call the event handler directly, but spawn a fiber should that
* event handler attempt a blocking operation.
* @param evt The event to be delivered.
* @param mask The type of listeners to process (optional). Matches MicroBitListener flags. If not defined, all standard listeners will be processed.
* @return The 1 if all matching listeners were processed, 0 if further processing is required.
*/
void MicroBitMessageBus::process(MicroBitEvent evt)
int MicroBitMessageBus::process(MicroBitEvent &evt, uint32_t mask)
{
MicroBitListener *l;
int complete = 1;
l = listeners;
while (l != NULL)
{
if((l->id == evt.source || l->id == MICROBIT_ID_ANY) && (l->value == evt.value || l->value == MICROBIT_EVT_ANY))
{
l->evt = evt;
if(l->flags & mask)
{
l->evt = evt;
// OK, if this handler has regisitered itself as non-blocking, we just execute it directly...
// This is normally only done for trusted system components.
// Otherwise, we invoke it in a 'fork on block' context, that will automatically create a fiber
// should the event handler attempt a blocking operation, but doesn't have the overhead
// of creating a fiber needlessly. (cool huh?)
if (l->flags & MESSAGE_BUS_LISTENER_NONBLOCKING)
async_callback(l);
// OK, if this handler has regisitered itself as non-blocking, we just execute it directly...
// This is normally only done for trusted system components.
// Otherwise, we invoke it in a 'fork on block' context, that will automatically create a fiber
// should the event handler attempt a blocking operation, but doesn't have the overhead
// of creating a fiber needlessly. (cool huh?)
if (l->flags & MESSAGE_BUS_LISTENER_NONBLOCKING)
async_callback(l);
else
invoke(async_callback, l);
}
else
invoke(async_callback, l);
{
complete = 0;
}
}
l = l->next;
}
// Finally, forward the event to any other internal subsystems that may be interested.
// We *could* do this through the message bus of course, but this saves additional RAM,
// and procssor time (as we know these are non-blocking calls).
// Wake up any fibers that are blocked on this event
if (uBit.flags & MICROBIT_FLAG_SCHEDULER_RUNNING)
scheduler_event(evt);
// See if this event needs to be propogated through our BLE interface
if (uBit.ble_event_service)
uBit.ble_event_service->onMicroBitEvent(evt);
return complete;
}
/**