From c92c7764de31a6756a7ace8bd072ca0492c287ae Mon Sep 17 00:00:00 2001 From: David Reid Date: Tue, 16 Jun 2020 21:06:04 +1000 Subject: [PATCH] Experimental work on some lock free queues. These are completely untested. Nothing here is wired up to the main system - they're just for experimenting with some ideas. --- research/ma_engine.h | 421 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 418 insertions(+), 3 deletions(-) diff --git a/research/ma_engine.h b/research/ma_engine.h index 1e393d7d..41693db5 100644 --- a/research/ma_engine.h +++ b/research/ma_engine.h @@ -121,6 +121,7 @@ typedef struct ma_resource_manager_data_source ma_resource_manager_data_source; /* TODO: May need to do some stress testing and tweak this. */ +/* The job queue capacity must be a multiple of 32. */ #ifndef MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY #define MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY 1024 #endif @@ -136,9 +137,70 @@ typedef struct ma_resource_manager_data_source ma_resource_manager_data_source; #define MA_MESSAGE_DECODE_STREAM_PAGE 0x00000008 #define MA_MESSAGE_SEEK_DATA_STREAM 0x00000009 + +/* +The idea of the slot allocator is for it to be used in conjunction with a fixed sized buffer. You use the slot allocator to allocator an index that can be used +as the insertion point for an object. This is lock-free. +*/ typedef struct { - ma_uint32 code; + struct + { + ma_uint32 bitfield; + /*ma_uint32 refcount;*/ /* When greater than 0 it means something already has a hold on this group. */ + } groups[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY/32]; + ma_uint32 counter; +} ma_slot_allocator; + +MA_API ma_result ma_slot_allocator_init(ma_slot_allocator* pAllocator); +MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint32* pSlot); +MA_API ma_result ma_slot_allocator_alloc_16(ma_slot_allocator* pAllocator, ma_uint16* pSlot); +MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 slot); + + +typedef struct +{ + ma_uint16 code; + ma_uint16 slot; /* Internal use only. */ + ma_uint16 next; /* Internal use only. The slot of the next job in the list. Set to 0xFFFF if this is the last item. */ + ma_uint16 padding; + union + { + /* Resource Managemer Jobs */ + struct + { + ma_resource_manager_data_buffer* pDataBuffer; + char* pFilePath; + ma_event* pEvent; + } loadDataBuffer; + }; +} ma_job; + +MA_API ma_job ma_job_init(ma_uint16 code); + + +#define MA_JOB_QUEUE_ASYNC 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */ + +typedef struct +{ + ma_uint32 flags; /* Flags passed in at initialization time. */ + ma_uint16 head; /* The first item in the list. Required for removing from the top of the list. */ + ma_uint16 tail; /* The last item in the list. Required for appending to the end of the list. */ + ma_semaphore sem; /* Only used when MA_JOB_QUEUE_ASYNC is unset. */ + ma_slot_allocator allocator; + ma_job jobs[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; +} ma_job_queue; + +MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue); +MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue); +MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob); +MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob); + + +typedef struct +{ + ma_uint16 code; + ma_uint16 slot; union { struct @@ -194,7 +256,7 @@ typedef struct }; } ma_resource_manager_message; -MA_API ma_resource_manager_message ma_resource_manager_message_init(ma_uint32 code); +MA_API ma_resource_manager_message ma_resource_manager_message_init(ma_uint16 code); typedef struct @@ -607,6 +669,359 @@ MA_API ma_result ma_engine_listener_set_rotation(ma_engine* pEngine, ma_quat rot #endif +static ma_uint32 ma_ffs_32(ma_uint32 x) +{ + ma_uint32 i; + + /* Just a naive implementation just to get things working for now. Will optimize this later. */ + for (i = 0; i < 32; i += 1) { + if ((x & (1 << i)) != 0) { + return i; + } + } + + return i; +} + + +MA_API ma_result ma_slot_allocator_init(ma_slot_allocator* pAllocator) +{ + if (pAllocator == NULL) { + return MA_INVALID_ARGS; + } + + MA_ZERO_OBJECT(pAllocator); + + return MA_SUCCESS; +} + +MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint32* pSlot) +{ + ma_uint32 capacity; + + if (pAllocator == NULL || pSlot == NULL) { + return MA_INVALID_ARGS; + } + + capacity = ma_countof(pAllocator->groups) * 32; + + for (;;) { + /* We need to acquire a suitable bitfield first. This is a bitfield that's got an available slot within it. */ + ma_uint32 iGroup; + for (iGroup = 0; iGroup < ma_countof(pAllocator->groups); iGroup += 1) { +#if 1 + /* A CAS implementation which would rid us of the refcount requirement? */ + for (;;) { + ma_uint32 newBitfield; + ma_uint32 oldBitfield; + ma_uint32 bitOffset; + + oldBitfield = pAllocator->groups[iGroup].bitfield; + + bitOffset = ma_ffs_32(~oldBitfield); + if (bitOffset == 32) { + break; /* No available bits in this bitfield. */ + } + + newBitfield = oldBitfield | (1 << bitOffset); + + if ((ma_uint32)ma_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, newBitfield, oldBitfield) == oldBitfield) { + *pSlot = iGroup*32 + bitOffset; + ma_atomic_increment_32(&pAllocator->counter); + return MA_SUCCESS; + } + } +#else + /* A ref counted implementation which is a bit simpler to understand what's going on, but has the expense of an extra 32-bits for the group ref count. */ + ma_uint32 refcount; + refcount = ma_atomic_increment_32(&pAllocator->groups[iGroup].refcount); /* <-- Grab a hold on the bitfield. */ + if (refcount == 1) { + if (pAllocator->groups[iGroup].bitfield != 0xFFFFFFFF) { + /* We have an available bit. Now we just find the first unset bit. */ + ma_uint32 bitOffset; + + bitOffset = ma_ffs_32(~pAllocator->groups[iGroup].bitfield); /* ffs = find first set. We just invert the bits to find the first unset. */ + MA_ASSERT(bitOffset < 32); + + pAllocator->groups[iGroup].bitfield = pAllocator->groups[iGroup].bitfield | (1 << bitOffset); + + /* Before releasing the group we need to ensure the write operation above has completed so we'll throw a memory barrier in here for safety. */ + ma_memory_barrier(); + ma_atomic_increment_32(&pAllocator->counter); /* Incrementing the counter should happen before releasing the group's ref count to ensure we don't waste loop iterations in out-of-memory scenarios. */ + ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount); /* Release the hold as soon as possible to allow other things to access the bitfield. */ + + *pSlot = iGroup*32 + bitOffset; + + return MA_SUCCESS; + } else { + /* Every slot within this group has been consumed so we'll need to move on to the next one. */ + } + } else { + /* This group is being held by another thread for it's own allocation. Skip this group and move on to the next one. */ + MA_ASSERT(refcount > 1); + } + + /* Getting here means we didn't find a slot in this group. We need to release the hold on this group and move to the next one. */ + ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount); +#endif + } + + /* We weren't able to find a slot. If it's because we've reached our capacity we need to return MA_OUT_OF_MEMORY. Otherwise we need to do another iteration and try again. */ + if (pAllocator->counter < capacity) { + ma_yield(); + } else { + return MA_OUT_OF_MEMORY; + } + } + + /* Unreachable. */ +} + +MA_API ma_result ma_slot_allocator_alloc_16(ma_slot_allocator* pAllocator, ma_uint16* pSlot) +{ + ma_uint32 slot32; + ma_result result = ma_slot_allocator_alloc(pAllocator, &slot32); + if (result != MA_SUCCESS) { + return result; + } + + if (slot32 > 65535) { + return MA_OUT_OF_RANGE; + } + + if (pSlot != NULL) { + *pSlot = (ma_uint16)slot32; + } + + return MA_SUCCESS; +} + +MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 slot) +{ + ma_uint32 iGroup; + ma_uint32 iBit; + + if (pAllocator == NULL) { + return MA_INVALID_ARGS; + } + + iGroup = slot >> 5; /* slot / 32 */ + iBit = slot & 31; /* slot % 32 */ + + if (iGroup >= ma_countof(pAllocator->groups)) { + return MA_INVALID_ARGS; + } + + MA_ASSERT(iBit < 32); /* This must be true due to the logic we used to actually calculate it. */ + + while (pAllocator->counter > 0) { +#if 1 + /* CAS loop implementation. */ + ma_uint32 newBitfield; + ma_uint32 oldBitfield; + + oldBitfield = pAllocator->groups[iGroup].bitfield; + newBitfield = oldBitfield & ~(1 << iBit); + + if ((ma_uint32)ma_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, newBitfield, oldBitfield) == oldBitfield) { + ma_atomic_decrement_32(&pAllocator->counter); + return MA_SUCCESS; + } +#else + /* Ref counted implementation. */ + + /* We need to get a hold on the group. We may need to spin for a few iterations, but this should complete in a reasonable amount of time. */ + ma_uint32 refcount = ma_atomic_increment_32(&pAllocator->groups[iGroup]); + if (refcount == 1) { + pAllocator->groups[iGroup].bitfield = pAllocator->groups[iGroup].bitfield & ~(1 << iBit); /* Unset the bit. */ + + /* Before releasing the group we need to ensure the write operation above has completed so we'll throw a memory barrier in here for safety. */ + ma_memory_barrier(); + ma_atomic_decrement_32(&pAllocator->counter); /* Incrementing the counter should happen before releasing the group's ref count to ensure we don't waste loop iterations in out-of-memory scenarios. */ + ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount); /* Release the hold as soon as possible to allow other things to access the bitfield. */ + + return MA_SUCCESS; + } else { + /* Something else is holding the group. We need to spin for a bit. */ + MA_ASSERT(refcount > 1); + } + + /* Getting here means something is holding our lock. We need to release and spin. */ + ma_atomic_decrement_32(&pAllocator->groups[iGroup]); + ma_yield(); +#endif + } + + /* Getting here means there are no allocations available for freeing. */ + return MA_INVALID_OPERATION; +} + + +MA_API ma_job ma_job_init(ma_uint16 code) +{ + ma_job job; + + MA_ZERO_OBJECT(&job); + job.code = code; + job.slot = 0xFFFF; + job.next = 0xFFFF; + + return job; +} + + +/* +Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors + +TODO: + - Figure out ABA protection. +*/ +MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue) +{ + if (pQueue == NULL) { + return MA_INVALID_ARGS; + } + + MA_ZERO_OBJECT(pQueue); + pQueue->flags = flags; + + ma_slot_allocator_init(&pQueue->allocator); /* Will not fail. */ + + /* We need a semaphore if we're running in synchronous mode. */ + if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + ma_semaphore_init(0, &pQueue->sem); + } + + /* + Our queue needs to be initialized with a free standing node. This should always be slot 0. Required for the lock free algorithm. The first job in the queue is + just a dummy item for giving us the first item in the list which is stored in the "next" member. + */ + ma_slot_allocator_alloc_16(&pQueue->allocator, &pQueue->head); /* Will never fail. */ + pQueue->tail = pQueue->head; + + pQueue->jobs[pQueue->head].next = 0xFFFF; + + return MA_SUCCESS; +} + +MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue) +{ + if (pQueue == NULL) { + return MA_INVALID_ARGS; + } + + /* All we need to do is uninitialize the semaphore. */ + if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + ma_semaphore_uninit(&pQueue->sem); + } + + return MA_SUCCESS; +} + +MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob) +{ + ma_result result; + ma_uint16 slot; + ma_uint16 tail; + ma_uint16 next; + + if (pQueue == NULL || pJob == NULL) { + return MA_INVALID_ARGS; + } + + /* We need a new slot. */ + result = ma_slot_allocator_alloc_16(&pQueue->allocator, &slot); + if (result != MA_SUCCESS) { + return result; /* Probably ran out of slots. If so, MA_OUT_OF_MEMORY will be returned. */ + } + + /* At this point we should have a slot to place the job. */ + MA_ASSERT(slot < MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY); + + /* We need to put the job into memory before we do anything. */ + pQueue->jobs[slot] = *pJob; + pQueue->jobs[slot].slot = slot; /* Safe cast as our maximum slot is <= 65535. */ + pQueue->jobs[slot].next = 0xFFFF; /* Reset for safety. */ + + /* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */ + for (;;) { + tail = pQueue->tail; + next = pQueue->jobs[tail].next; + + if (tail == pQueue->tail) { + if (next == 0xFFFF) { + if (ma_compare_and_swap_16(&pQueue->jobs[tail].next, slot, next) == next) { + break; + } + } else { + ma_compare_and_swap_16(&pQueue->tail, next, tail); + } + } + } + ma_compare_and_swap_16(&pQueue->tail, slot, tail); + + + /* Signal the semaphore as the last step if we're using synchronous mode. */ + if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + ma_semaphore_release(&pQueue->sem); + } + + return MA_SUCCESS; +} + +MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob) +{ + ma_uint16 head; + ma_uint16 tail; + ma_uint16 next; + + if (pQueue == NULL || pJob == NULL) { + return MA_INVALID_ARGS; + } + + /* If we're running in synchronous mode we'll need to wait on a semaphore. */ + if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + ma_semaphore_wait(&pQueue->sem); + } + + /* Now we need to remove the root item from the list. This must be done without locking. */ + for (;;) { + head = pQueue->head; + tail = pQueue->tail; + next = pQueue->jobs[head].next; + + if (head == pQueue->head) { + if (head == tail) { + if (next == 0xFFFF) { + return MA_NO_DATA_AVAILABLE; + } + ma_compare_and_swap_16(&pQueue->tail, next, tail); + } else { + *pJob = pQueue->jobs[next]; + if (ma_compare_and_swap_16(&pQueue->head, next, head) == head) { + break; + } + } + } + } + + ma_slot_allocator_free(&pQueue->allocator, head); + + return MA_SUCCESS; +} + +MA_API ma_result ma_job_queue_free(ma_job_queue* pQueue, ma_job* pJob) +{ + if (pQueue == NULL || pJob == NULL) { + return MA_INVALID_ARGS; + } + + return ma_slot_allocator_free(&pQueue->allocator, pJob->slot); +} + + + + #ifndef MA_DEFAULT_HASH_SEED #define MA_DEFAULT_HASH_SEED 42 @@ -690,7 +1105,7 @@ static ma_uint32 ma_hash_string_32(const char* str) -MA_API ma_resource_manager_message ma_resource_manager_message_init(ma_uint32 code) +MA_API ma_resource_manager_message ma_resource_manager_message_init(ma_uint16 code) { ma_resource_manager_message message;