diff --git a/research/ma_engine.h b/research/ma_engine.h index d79d9caa..cd434e05 100644 --- a/research/ma_engine.h +++ b/research/ma_engine.h @@ -140,30 +140,49 @@ typedef struct ma_resource_manager_data_source ma_resource_manager_data_source; /* The idea of the slot allocator is for it to be used in conjunction with a fixed sized buffer. You use the slot allocator to allocator an index that can be used -as the insertion point for an object. This is lock-free. +as the insertion point for an object. + +Slots are reference counted to help mitigate the ABA problem in the lock-free queue we use for tracking jobs. + +The slot index is stored in the low 32 bits. The reference counter is stored in the high 32 bits: + + +-----------------+-----------------+ + | 32 Bits | 32 Bits | + +-----------------+-----------------+ + | Reference Count | Slot Index | + +-----------------+-----------------+ + + */ typedef struct { - struct + volatile struct { ma_uint32 bitfield; - /*ma_uint32 refcount;*/ /* When greater than 0 it means something already has a hold on this group. */ } groups[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY/32]; - ma_uint32 counter; + ma_uint32 slots[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; /* 1 bit to indicate if the slot is allocated, 31 bits for reference counting. */ + ma_uint32 count; /* Allocation count. */ } ma_slot_allocator; MA_API ma_result ma_slot_allocator_init(ma_slot_allocator* pAllocator); -MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint32* pSlot); -MA_API ma_result ma_slot_allocator_alloc_16(ma_slot_allocator* pAllocator, ma_uint16* pSlot); -MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 slot); +MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint64* pSlot); +MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint64 slot); typedef struct { - ma_uint16 code; - ma_uint16 slot; /* Internal use only. */ - ma_uint16 next; /* Internal use only. The slot of the next job in the list. Set to 0xFFFF if this is the last item. */ - ma_uint16 padding; + union + { + struct + { + ma_uint16 code; + ma_uint16 slot; + ma_uint32 refcount; + }; + ma_uint64 allocation; + } toc; /* 8 bytes. We encode the job code into the slot allocation data to save space. */ + ma_uint64 next; /* refcount + slot for the next item. Does not include the job code. */ + union { /* Resource Managemer Jobs */ @@ -184,8 +203,8 @@ MA_API ma_job ma_job_init(ma_uint16 code); typedef struct { ma_uint32 flags; /* Flags passed in at initialization time. */ - ma_uint16 head; /* The first item in the list. Required for removing from the top of the list. */ - ma_uint16 tail; /* The last item in the list. Required for appending to the end of the list. */ + ma_uint64 head; /* The first item in the list. Required for removing from the top of the list. */ + ma_uint64 tail; /* The last item in the list. Required for appending to the end of the list. */ ma_semaphore sem; /* Only used when MA_JOB_QUEUE_ASYNC is unset. */ ma_slot_allocator allocator; ma_job jobs[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; @@ -695,7 +714,7 @@ MA_API ma_result ma_slot_allocator_init(ma_slot_allocator* pAllocator) return MA_SUCCESS; } -MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint32* pSlot) +MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint64* pSlot) { ma_uint32 capacity; @@ -709,8 +728,7 @@ MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint3 /* We need to acquire a suitable bitfield first. This is a bitfield that's got an available slot within it. */ ma_uint32 iGroup; for (iGroup = 0; iGroup < ma_countof(pAllocator->groups); iGroup += 1) { -#if 1 - /* A CAS implementation which would rid us of the refcount requirement? */ + /* CAS */ for (;;) { ma_uint32 newBitfield; ma_uint32 oldBitfield; @@ -718,56 +736,38 @@ MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint3 oldBitfield = pAllocator->groups[iGroup].bitfield; - bitOffset = ma_ffs_32(~oldBitfield); - if (bitOffset == 32) { + /* Fast check to see if anything is available. */ + if (oldBitfield == 0xFFFFFFFF) { break; /* No available bits in this bitfield. */ } + bitOffset = ma_ffs_32(~oldBitfield); + MA_ASSERT(bitOffset < 32); + newBitfield = oldBitfield | (1 << bitOffset); - if ((ma_uint32)c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) { - *pSlot = iGroup*32 + bitOffset; - c89atomic_fetch_add_32(&pAllocator->counter, 1); + if (c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) { + ma_uint32 slotIndex; + + /* Increment the counter as soon as possible to have other threads report out-of-memory sooner than later. */ + c89atomic_fetch_add_32(&pAllocator->count, 1); + + /* The slot index is required for constructing the output value. */ + slotIndex = (iGroup << 5) + bitOffset; /* iGroup << 5 = iGroup * 32 */ + + /* Increment the reference count before constructing the output value. */ + pAllocator->slots[slotIndex] += 1; + + /* Construct the output value. */ + *pSlot = ((ma_uint64)pAllocator->slots[slotIndex] << 32 | slotIndex); + return MA_SUCCESS; } } -#else - /* A ref counted implementation which is a bit simpler to understand what's going on, but has the expense of an extra 32-bits for the group ref count. */ - ma_uint32 refcount; - refcount = ma_atomic_increment_32(&pAllocator->groups[iGroup].refcount); /* <-- Grab a hold on the bitfield. */ - if (refcount == 1) { - if (pAllocator->groups[iGroup].bitfield != 0xFFFFFFFF) { - /* We have an available bit. Now we just find the first unset bit. */ - ma_uint32 bitOffset; - - bitOffset = ma_ffs_32(~pAllocator->groups[iGroup].bitfield); /* ffs = find first set. We just invert the bits to find the first unset. */ - MA_ASSERT(bitOffset < 32); - - pAllocator->groups[iGroup].bitfield = pAllocator->groups[iGroup].bitfield | (1 << bitOffset); - - /* Before releasing the group we need to ensure the write operation above has completed so we'll throw a memory barrier in here for safety. */ - ma_memory_barrier(); - ma_atomic_increment_32(&pAllocator->counter); /* Incrementing the counter should happen before releasing the group's ref count to ensure we don't waste loop iterations in out-of-memory scenarios. */ - ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount); /* Release the hold as soon as possible to allow other things to access the bitfield. */ - - *pSlot = iGroup*32 + bitOffset; - - return MA_SUCCESS; - } else { - /* Every slot within this group has been consumed so we'll need to move on to the next one. */ - } - } else { - /* This group is being held by another thread for it's own allocation. Skip this group and move on to the next one. */ - MA_ASSERT(refcount > 1); - } - - /* Getting here means we didn't find a slot in this group. We need to release the hold on this group and move to the next one. */ - ma_atomic_decrement_32(&pAllocator->groups[iGroup].refcount); -#endif } /* We weren't able to find a slot. If it's because we've reached our capacity we need to return MA_OUT_OF_MEMORY. Otherwise we need to do another iteration and try again. */ - if (pAllocator->counter < capacity) { + if (pAllocator->count < capacity) { ma_yield(); } else { return MA_OUT_OF_MEMORY; @@ -777,26 +777,7 @@ MA_API ma_result ma_slot_allocator_alloc(ma_slot_allocator* pAllocator, ma_uint3 /* Unreachable. */ } -MA_API ma_result ma_slot_allocator_alloc_16(ma_slot_allocator* pAllocator, ma_uint16* pSlot) -{ - ma_uint32 slot32; - ma_result result = ma_slot_allocator_alloc(pAllocator, &slot32); - if (result != MA_SUCCESS) { - return result; - } - - if (slot32 > 65535) { - return MA_OUT_OF_RANGE; - } - - if (pSlot != NULL) { - *pSlot = (ma_uint16)slot32; - } - - return MA_SUCCESS; -} - -MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 slot) +MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint64 slot) { ma_uint32 iGroup; ma_uint32 iBit; @@ -805,8 +786,8 @@ MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 return MA_INVALID_ARGS; } - iGroup = slot >> 5; /* slot / 32 */ - iBit = slot & 31; /* slot % 32 */ + iGroup = (slot & 0xFFFFFFFF) >> 5; /* slot / 32 */ + iBit = (slot & 0xFFFFFFFF) & 31; /* slot % 32 */ if (iGroup >= ma_countof(pAllocator->groups)) { return MA_INVALID_ARGS; @@ -814,42 +795,18 @@ MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 MA_ASSERT(iBit < 32); /* This must be true due to the logic we used to actually calculate it. */ - while (pAllocator->counter > 0) { -#if 1 - /* CAS loop implementation. */ + while (pAllocator->count > 0) { + /* CAS */ ma_uint32 newBitfield; ma_uint32 oldBitfield; oldBitfield = pAllocator->groups[iGroup].bitfield; newBitfield = oldBitfield & ~(1 << iBit); - if ((ma_uint32)c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) { - c89atomic_fetch_sub_32(&pAllocator->counter, 1); + if (c89atomic_compare_and_swap_32(&pAllocator->groups[iGroup].bitfield, oldBitfield, newBitfield) == oldBitfield) { + c89atomic_fetch_sub_32(&pAllocator->count, 1); return MA_SUCCESS; } -#else - /* Ref counted implementation. */ - - /* We need to get a hold on the group. We may need to spin for a few iterations, but this should complete in a reasonable amount of time. */ - ma_uint32 refcount = ma_atomic_increment_32(&pAllocator->groups[iGroup]); - if (refcount == 1) { - pAllocator->groups[iGroup].bitfield = pAllocator->groups[iGroup].bitfield & ~(1 << iBit); /* Unset the bit. */ - - /* Before releasing the group we need to ensure the write operation above has completed so we'll throw a memory barrier in here for safety. */ - ma_memory_barrier(); - c89atomic_fetch_sub_32(&pAllocator->counter, 1); /* Incrementing the counter should happen before releasing the group's ref count to ensure we don't waste loop iterations in out-of-memory scenarios. */ - c89atomic_fetch_sub_32(&pAllocator->groups[iGroup].refcount, 1); /* Release the hold as soon as possible to allow other things to access the bitfield. */ - - return MA_SUCCESS; - } else { - /* Something else is holding the group. We need to spin for a bit. */ - MA_ASSERT(refcount > 1); - } - - /* Getting here means something is holding our lock. We need to release and spin. */ - c89atomic_fetch_sub_32(&pAllocator->groups[iGroup], 1); - ma_yield(); -#endif } /* Getting here means there are no allocations available for freeing. */ @@ -857,14 +814,37 @@ MA_API ma_result ma_slot_allocator_free(ma_slot_allocator* pAllocator, ma_uint32 } +#define MA_JOB_ID_NONE ~((ma_uint64)0) + +static MA_INLINE ma_uint32 ma_job_extract_refcount(ma_uint64 toc) +{ + return (ma_uint32)(toc >> 32); +} + +static MA_INLINE ma_uint16 ma_job_extract_slot(ma_uint64 toc) +{ + return (ma_uint16)(toc & 0x0000FFFF); +} + +static MA_INLINE ma_uint16 ma_job_extract_code(ma_uint64 toc) +{ + return (ma_uint16)((toc & 0xFFFF0000) >> 16); +} + +static MA_INLINE ma_uint64 ma_job_toc_to_allocation(ma_uint64 toc) +{ + return ((ma_uint64)ma_job_extract_refcount(toc) << 32) | (ma_uint64)ma_job_extract_slot(toc); +} + + MA_API ma_job ma_job_init(ma_uint16 code) { ma_job job; MA_ZERO_OBJECT(&job); - job.code = code; - job.slot = 0xFFFF; - job.next = 0xFFFF; + job.toc.code = code; + job.toc.slot = MA_JOB_ID_NONE; /* Temp value. Will be allocated when posted to a queue. */ + job.next = MA_JOB_ID_NONE; return job; } @@ -872,9 +852,6 @@ MA_API ma_job ma_job_init(ma_uint16 code) /* Lock free queue implementation based on the paper by Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors - -TODO: - - Figure out ABA protection. */ MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue) { @@ -896,10 +873,10 @@ MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue) Our queue needs to be initialized with a free standing node. This should always be slot 0. Required for the lock free algorithm. The first job in the queue is just a dummy item for giving us the first item in the list which is stored in the "next" member. */ - ma_slot_allocator_alloc_16(&pQueue->allocator, &pQueue->head); /* Will never fail. */ + ma_slot_allocator_alloc(&pQueue->allocator, &pQueue->head); /* Will never fail. */ pQueue->tail = pQueue->head; - pQueue->jobs[pQueue->head].next = 0xFFFF; + pQueue->jobs[pQueue->head].next = MA_JOB_ID_NONE; return MA_SUCCESS; } @@ -921,44 +898,45 @@ MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue) MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob) { ma_result result; - ma_uint16 slot; - ma_uint16 tail; - ma_uint16 next; + ma_uint64 slot; + ma_uint64 tail; + ma_uint64 next; if (pQueue == NULL || pJob == NULL) { return MA_INVALID_ARGS; } /* We need a new slot. */ - result = ma_slot_allocator_alloc_16(&pQueue->allocator, &slot); + result = ma_slot_allocator_alloc(&pQueue->allocator, &slot); if (result != MA_SUCCESS) { return result; /* Probably ran out of slots. If so, MA_OUT_OF_MEMORY will be returned. */ } /* At this point we should have a slot to place the job. */ - MA_ASSERT(slot < MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY); + MA_ASSERT(ma_job_extract_slot(slot) < MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY); /* We need to put the job into memory before we do anything. */ - pQueue->jobs[slot] = *pJob; - pQueue->jobs[slot].slot = slot; /* Safe cast as our maximum slot is <= 65535. */ - pQueue->jobs[slot].next = 0xFFFF; /* Reset for safety. */ + pQueue->jobs[ma_job_extract_slot(slot)] = *pJob; + pQueue->jobs[ma_job_extract_slot(slot)].toc.allocation = slot; /* This will overwrite the job code. */ + pQueue->jobs[ma_job_extract_slot(slot)].toc.code = pJob->toc.code; /* The job code needs to be applied again because the line above overwrote it. */ + pQueue->jobs[ma_job_extract_slot(slot)].next = MA_JOB_ID_NONE; /* Reset for safety. */ /* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */ for (;;) { tail = pQueue->tail; - next = pQueue->jobs[tail].next; + next = pQueue->jobs[ma_job_extract_slot(tail)].next; if (tail == pQueue->tail) { - if (next == 0xFFFF) { - if (c89atomic_compare_and_swap_16(&pQueue->jobs[tail].next, next, slot) == next) { + if (next == MA_JOB_ID_NONE) { + if (c89atomic_compare_and_swap_64(&pQueue->jobs[ma_job_extract_slot(tail)].next, next, slot) == next) { break; } } else { - c89atomic_compare_and_swap_16(&pQueue->tail, tail, next); + c89atomic_compare_and_swap_64(&pQueue->tail, tail, next); } } } - c89atomic_compare_and_swap_16(&pQueue->tail, tail, slot); + c89atomic_compare_and_swap_64(&pQueue->tail, tail, slot); /* Signal the semaphore as the last step if we're using synchronous mode. */ @@ -971,9 +949,9 @@ MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob) MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob) { - ma_uint16 head; - ma_uint16 tail; - ma_uint16 next; + ma_uint64 head; + ma_uint64 tail; + ma_uint64 next; if (pQueue == NULL || pJob == NULL) { return MA_INVALID_ARGS; @@ -988,17 +966,17 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob) for (;;) { head = pQueue->head; tail = pQueue->tail; - next = pQueue->jobs[head].next; + next = pQueue->jobs[ma_job_extract_slot(head)].next; if (head == pQueue->head) { if (head == tail) { - if (next == 0xFFFF) { + if (next == MA_JOB_ID_NONE) { return MA_NO_DATA_AVAILABLE; } - c89atomic_compare_and_swap_16(&pQueue->tail, tail, next); + c89atomic_compare_and_swap_64(&pQueue->tail, tail, next); } else { - *pJob = pQueue->jobs[next]; - if (c89atomic_compare_and_swap_16(&pQueue->head, head, next) == head) { + *pJob = pQueue->jobs[ma_job_extract_slot(next)]; + if (c89atomic_compare_and_swap_64(&pQueue->head, head, next) == head) { break; } } @@ -1016,7 +994,7 @@ MA_API ma_result ma_job_queue_free(ma_job_queue* pQueue, ma_job* pJob) return MA_INVALID_ARGS; } - return ma_slot_allocator_free(&pQueue->allocator, pJob->slot); + return ma_slot_allocator_free(&pQueue->allocator, ma_job_toc_to_allocation(pJob->toc.allocation)); }