From 77057895a81d3969d87cb39f5958335a4e5eb8bb Mon Sep 17 00:00:00 2001 From: David Reid Date: Mon, 20 Jul 2020 19:07:41 +1000 Subject: [PATCH] Set up some infrastructure for enabling multiple job threads. --- research/ma_engine.h | 76 ++++++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/research/ma_engine.h b/research/ma_engine.h index 176ef9bc..2c7b749d 100644 --- a/research/ma_engine.h +++ b/research/ma_engine.h @@ -238,7 +238,7 @@ typedef struct MA_API ma_job ma_job_init(ma_uint16 code); -#define MA_JOB_QUEUE_NON_BLOCKING 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */ +#define MA_JOB_QUEUE_FLAG_NON_BLOCKING 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */ typedef struct { @@ -253,11 +253,15 @@ typedef struct MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue); MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue); MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob); -MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob); +MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob); /* Returns MA_CANCELLED if the next job is a quit job. */ -#define MA_RESOURCE_MANAGER_NO_JOB_THREAD 0x00000001 /* Enable this to manage the job thread yourself. */ -#define MA_RESOURCE_MANAGER_NON_BLOCKING 0x00000002 /* Indicates ma_resource_manager_next_job() should not block. Only valid with MA_RESOURCE_MANAGER_NO_JOB_THREAD. */ +/* Maximum job thread count will be restricted to this, but this may be removed later and replaced with a heap allocation thereby removing any limitation. */ +#ifndef MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT +#define MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT 64 +#endif + +#define MA_RESOURCE_MANAGER_FLAG_NON_BLOCKING 0x00000001 /* Indicates ma_resource_manager_next_job() should not block. Only valid with MA_RESOURCE_MANAGER_NO_JOB_THREAD. */ typedef struct { @@ -358,6 +362,7 @@ typedef struct ma_format decodedFormat; ma_uint32 decodedChannels; ma_uint32 decodedSampleRate; + ma_uint32 jobThreadCount; /* Set to 0 if you want to self-manage your job threads. Defaults to 1. */ ma_uint32 flags; ma_vfs* pVFS; /* Can be NULL in which case defaults will be used. */ } ma_resource_manager_config; @@ -367,13 +372,14 @@ MA_API ma_resource_manager_config ma_resource_manager_config_init(ma_format deco struct ma_resource_manager { ma_resource_manager_config config; - ma_resource_manager_data_buffer* pRootDataBuffer; /* The root buffer in the binary tree. */ - ma_mutex dataBufferLock; /* For synchronizing access to the data buffer binary tree. */ - ma_thread jobThread; /* The thread for executing jobs. Will probably turn this into an array. */ - ma_job_queue jobQueue; /* Lock-free multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */ - ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */ + ma_resource_manager_data_buffer* pRootDataBuffer; /* The root buffer in the binary tree. */ + ma_mutex dataBufferLock; /* For synchronizing access to the data buffer binary tree. */ + ma_thread jobThreads[MA_RESOURCE_MANAGER_MAX_JOB_THREAD_COUNT]; /* The thread for executing jobs. Will probably turn this into an array. */ + ma_job_queue jobQueue; /* Lock-free multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */ + ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */ }; +/* Init. */ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pConfig, ma_resource_manager* pResourceManager); MA_API void ma_resource_manager_uninit(ma_resource_manager* pResourceManager); @@ -833,7 +839,7 @@ MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue) ma_slot_allocator_init(&pQueue->allocator); /* Will not fail. */ /* We need a semaphore if we're running in synchronous mode. */ - if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) { ma_semaphore_init(0, &pQueue->sem); } @@ -856,7 +862,7 @@ MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue) } /* All we need to do is uninitialize the semaphore. */ - if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) { ma_semaphore_uninit(&pQueue->sem); } @@ -908,7 +914,7 @@ MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob) /* Signal the semaphore as the last step if we're using synchronous mode. */ - if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) { ma_semaphore_release(&pQueue->sem); } @@ -926,7 +932,7 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob) } /* If we're running in synchronous mode we'll need to wait on a semaphore. */ - if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_FLAG_NON_BLOCKING) == 0) { ma_semaphore_wait(&pQueue->sem); } @@ -953,6 +959,16 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob) ma_slot_allocator_free(&pQueue->allocator, head); + /* + If it's a quit job make sure it's put back on the queue to ensure other threads have an opportunity to detect it and terminate naturally. We + could instead just leave it on the queue, but that would involve fiddling with the lock-free code above and I want to keep that as simple as + possible. + */ + if (pJob->toc.code == MA_JOB_QUIT) { + ma_job_queue_post(pQueue, pJob); + return MA_CANCELLED; /* Return a cancelled status just in case the thread is checking return codes and not properly checking for a quit job. */ + } + return MA_SUCCESS; } @@ -1433,6 +1449,7 @@ MA_API ma_resource_manager_config ma_resource_manager_config_init(ma_format deco config.decodedFormat = decodedFormat; config.decodedChannels = decodedChannels; config.decodedSampleRate = decodedSampleRate; + config.jobThreadCount = 1; /* A single miniaudio-managed job thread by default. */ if (pAllocationCallbacks != NULL) { config.allocationCallbacks = *pAllocationCallbacks; @@ -1446,6 +1463,7 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon { ma_result result; ma_uint32 jobQueueFlags; + ma_uint32 iJobThread; if (pResourceManager == NULL) { return MA_INVALID_ARGS; @@ -1457,6 +1475,10 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon return MA_INVALID_ARGS; } + if (pConfig->jobThreadCount > ma_countof(pResourceManager->jobThreads)) { + return MA_INVALID_ARGS; /* Requesting too many job threads. */ + } + pResourceManager->config = *pConfig; ma_allocation_callbacks_init_copy(&pResourceManager->config.allocationCallbacks, &pConfig->allocationCallbacks); @@ -1471,12 +1493,12 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon /* Job queue. */ jobQueueFlags = 0; - if ((pConfig->flags & MA_RESOURCE_MANAGER_NON_BLOCKING) != 0) { - if ((pConfig->flags & MA_RESOURCE_MANAGER_NO_JOB_THREAD) != 0) { + if ((pConfig->flags & MA_RESOURCE_MANAGER_FLAG_NON_BLOCKING) != 0) { + if (pConfig->jobThreadCount > 0) { return MA_INVALID_ARGS; /* Non-blocking mode is only valid for self-managed job threads. */ } - jobQueueFlags |= MA_JOB_QUEUE_NON_BLOCKING; + jobQueueFlags |= MA_JOB_QUEUE_FLAG_NON_BLOCKING; } result = ma_job_queue_init(jobQueueFlags, &pResourceManager->jobQueue); @@ -1485,22 +1507,22 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon return result; } + /* Data buffer lock. */ result = ma_mutex_init(&pResourceManager->dataBufferLock); if (result != MA_SUCCESS) { return result; } - /* Create the job thread last to ensure the new thread has access to valid data. */ - if ((pConfig->flags & MA_RESOURCE_MANAGER_NO_JOB_THREAD) == 0) { - result = ma_thread_create(&pResourceManager->jobThread, ma_thread_priority_normal, 0, ma_resource_manager_job_thread, pResourceManager); + + /* Create the job threads last to ensure the threads has access to valid data. */ + for (iJobThread = 0; iJobThread < pConfig->jobThreadCount; iJobThread += 1) { + result = ma_thread_create(&pResourceManager->jobThreads[iJobThread], ma_thread_priority_normal, 0, ma_resource_manager_job_thread, pResourceManager); if (result != MA_SUCCESS) { ma_mutex_uninit(&pResourceManager->dataBufferLock); ma_job_queue_uninit(&pResourceManager->jobQueue); return result; } - } else { - pResourceManager->jobThread = NULL; } return MA_SUCCESS; @@ -1524,15 +1546,23 @@ static void ma_resource_manager_delete_all_data_buffers(ma_resource_manager* pRe MA_API void ma_resource_manager_uninit(ma_resource_manager* pResourceManager) { ma_job quitJob; + ma_uint32 iJobThread; if (pResourceManager == NULL) { return; } - /* The job thread need to be killed first. To do this we need to post a quit message to the message queue and then wait for the thread. */ + /* + Job threads need to be killed first. To do this we need to post a quit message to the message queue and then wait for the thread. The quit message will never be removed from the + queue which means it will never not be returned after being encounted for the first time which means all threads will eventually receive it. + */ quitJob = ma_job_init(MA_JOB_QUIT); ma_resource_manager_post_job(pResourceManager, &quitJob); - ma_thread_wait(&pResourceManager->jobThread); + + /* Wait for every job to finish before continuing to ensure nothing is sill trying to access any of our objects below. */ + for (iJobThread = 0; iJobThread < pResourceManager->config.jobThreadCount; iJobThread += 1) { + ma_thread_wait(&pResourceManager->jobThreads[iJobThread]); + } /* At this point the thread should have returned and no other thread should be accessing our data. We can now delete all data buffers. */ ma_resource_manager_delete_all_data_buffers(pResourceManager);