diff --git a/miniaudio.h b/miniaudio.h index 89da9f4a..a6ae80b2 100644 --- a/miniaudio.h +++ b/miniaudio.h @@ -1604,8 +1604,8 @@ Another major feature of the resource manager is the ability to asynchronously d This relieves the audio thread of time-consuming decoding which can negatively affect scalability due to the audio thread needing to complete it's work extremely quickly to avoid glitching. Asynchronous decoding is achieved through a job system. There is a central multi-producer, -multi-consumer, lock-free, fixed-capacity job queue. When some asynchronous work needs to be done, -a job is posted to the queue which is then read by a job thread. The number of job threads can be +multi-consumer, fixed-capacity job queue. When some asynchronous work needs to be done, a job is +posted to the queue which is then read by a job thread. The number of job threads can be configured for improved scalability, and job threads can all run in parallel without needing to worry about the order of execution (how this is achieved is explained below). @@ -1695,7 +1695,7 @@ returned when trying to read frames. When `ma_resource_manager_data_source_read_pcm_frames()` results in a page getting fully consumed a job is posted to load the next page. This will be posted from the same thread that called -`ma_resource_manager_data_source_read_pcm_frames()` which should be lock-free. +`ma_resource_manager_data_source_read_pcm_frames()`. Data streams are uninitialized by posting a job to the queue, but the function won't return until that job has been processed. The reason for this is that the caller owns the data stream object and @@ -1706,13 +1706,11 @@ complete before destroying any underlying object and the job system handles this 6.1.3. Job Queue ---------------- -The resource manager uses a job queue which is multi-producer, multi-consumer, lock-free and -fixed-capacity. The lock-free property of the queue is achieved using the algorithm described by -Michael and Scott: Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared -Memory Multiprocessors. In order for this to work, only a fixed number of jobs can be allocated and -inserted into the queue which is done through a lock-free data structure for allocating an index -into a fixed sized array, with reference counting for mitigation of the ABA problem. The reference -count is 32-bit. +The resource manager uses a job queue which is multi-producer, multi-consumer, and fixed-capacity. +This job queue is not currently lock-free, and instead uses a spinlock to achieve thread-safety. +Only a fixed number of jobs can be allocated and inserted into the queue which is done through a +lock-free data structure for allocating an index into a fixed sized array, with reference counting +for mitigation of the ABA problem. The reference count is 32-bit. For many types of jobs it's important that they execute in a specific order. In these cases, jobs are executed serially. For the resource manager, serial execution of jobs is only required on a @@ -9030,6 +9028,9 @@ typedef struct #endif ma_slot_allocator allocator; ma_resource_manager_job* pJobs; +#ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE + ma_spinlock lock; +#endif /* Memory management. */ void* _pHeap; @@ -9211,7 +9212,7 @@ struct ma_resource_manager ma_mutex dataBufferBSTLock; /* For synchronizing access to the data buffer binary tree. */ ma_thread jobThreads[MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT]; /* The threads for executing jobs. */ #endif - ma_resource_manager_job_queue jobQueue; /* Lock-free multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */ + ma_resource_manager_job_queue jobQueue; /* Multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */ ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */ ma_log log; /* Only used if no log was specified in the config. */ }; @@ -61829,22 +61830,30 @@ MA_API ma_result ma_resource_manager_job_queue_post(ma_resource_manager_job_queu pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].toc.breakup.code = pJob->toc.breakup.code; /* The job code needs to be applied again because the line above overwrote it. */ pQueue->pJobs[ma_resource_manager_job_extract_slot(slot)].next = MA_RESOURCE_MANAGER_JOB_ID_NONE; /* Reset for safety. */ - /* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */ - for (;;) { - tail = c89atomic_load_64(&pQueue->tail); - next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next); + #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE + ma_spinlock_lock(&pQueue->lock); + #endif + { + /* The job is stored in memory so now we need to add it to our linked list. We only ever add items to the end of the list. */ + for (;;) { + tail = c89atomic_load_64(&pQueue->tail); + next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next); - if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->tail))) { - if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) { - if (ma_resource_manager_job_queue_cas(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot)) { - break; + if (ma_resource_manager_job_toc_to_allocation(tail) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->tail))) { + if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) { + if (ma_resource_manager_job_queue_cas(&pQueue->pJobs[ma_resource_manager_job_extract_slot(tail)].next, next, slot)) { + break; + } + } else { + ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next)); } - } else { - ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next)); } } + ma_resource_manager_job_queue_cas(&pQueue->tail, tail, slot); } - ma_resource_manager_job_queue_cas(&pQueue->tail, tail, slot); + #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE + ma_spinlock_unlock(&pQueue->lock); + #endif /* Signal the semaphore as the last step if we're using synchronous mode. */ @@ -61886,26 +61895,46 @@ MA_API ma_result ma_resource_manager_job_queue_next(ma_resource_manager_job_queu #endif } - /* Now we need to remove the root item from the list. This must be done without locking. */ - for (;;) { - head = c89atomic_load_64(&pQueue->head); - tail = c89atomic_load_64(&pQueue->tail); - next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(head)].next); + #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE + ma_spinlock_lock(&pQueue->lock); + #endif + { + /* + BUG: In lock-free mode, multiple threads can be in this section of code. The "head" variable in the loop below + is stored. One thread can fall through to the freeing of this item while another is still using "head" for the + retrieval of the "next" variable. - if (ma_resource_manager_job_toc_to_allocation(head) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->head))) { - if (ma_resource_manager_job_extract_slot(head) == ma_resource_manager_job_extract_slot(tail)) { - if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) { - return MA_NO_DATA_AVAILABLE; - } - ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next)); - } else { - *pJob = pQueue->pJobs[ma_resource_manager_job_extract_slot(next)]; - if (ma_resource_manager_job_queue_cas(&pQueue->head, head, ma_resource_manager_job_extract_slot(next))) { - break; + The slot allocator might need to make use of some reference counting to ensure it's only truely freed when + there are no more references to the item. This must be fixed before removing these locks. + */ + + /* Now we need to remove the root item from the list. */ + for (;;) { + head = c89atomic_load_64(&pQueue->head); + tail = c89atomic_load_64(&pQueue->tail); + next = c89atomic_load_64(&pQueue->pJobs[ma_resource_manager_job_extract_slot(head)].next); + + if (ma_resource_manager_job_toc_to_allocation(head) == ma_resource_manager_job_toc_to_allocation(c89atomic_load_64(&pQueue->head))) { + if (ma_resource_manager_job_extract_slot(head) == ma_resource_manager_job_extract_slot(tail)) { + if (ma_resource_manager_job_extract_slot(next) == 0xFFFF) { + #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE + ma_spinlock_unlock(&pQueue->lock); + #endif + return MA_NO_DATA_AVAILABLE; + } + ma_resource_manager_job_queue_cas(&pQueue->tail, tail, ma_resource_manager_job_extract_slot(next)); + } else { + *pJob = pQueue->pJobs[ma_resource_manager_job_extract_slot(next)]; + if (ma_resource_manager_job_queue_cas(&pQueue->head, head, ma_resource_manager_job_extract_slot(next))) { + break; + } } } } } + #ifndef MA_USE_EXPERIMENTAL_LOCK_FREE_JOB_QUEUE + ma_spinlock_unlock(&pQueue->lock); + #endif ma_slot_allocator_free(&pQueue->allocator, head);