From 861f6a23c83475721962579eeba0bf39f6ed9f75 Mon Sep 17 00:00:00 2001 From: David Reid Date: Sun, 19 Jul 2020 21:56:12 +1000 Subject: [PATCH] Initial work on a new job system for the high level API. This new system is used for asynchronous decoding of sound data. The main improvement with this one over the old one is the ability to do multi-producer, multi-consumer lock-free posting of messages which means multiple threads can be used to process jobs simultaneously rather than a single thread processing all jobs serially. Decoding is inherently serial which means multiple job threads is only useful when decoding multiple sounds. Each individual sound will be decoded serially. Another change with this commit is the ability for applications to control whether or not the resource manager manages it's own job threads. This is useful if an application wants to manage the job queue themselves if, for example, they want to integrate it more closely with their existing job system. --- research/ma_engine.c | 6 +- research/ma_engine.h | 1403 +++++++++++++++++++----------------------- 2 files changed, 629 insertions(+), 780 deletions(-) diff --git a/research/ma_engine.c b/research/ma_engine.c index 974ffa03..53f81713 100644 --- a/research/ma_engine.c +++ b/research/ma_engine.c @@ -26,7 +26,7 @@ int main(int argc, char** argv) #if 1 - result = ma_engine_sound_init_from_file(&engine, argv[1], MA_DATA_SOURCE_FLAG_DECODE /*| MA_DATA_SOURCE_FLAG_ASYNC | MA_DATA_SOURCE_FLAG_STREAM*/, NULL, &sound); + result = ma_engine_sound_init_from_file(&engine, argv[1], MA_DATA_SOURCE_FLAG_DECODE | MA_DATA_SOURCE_FLAG_ASYNC | MA_DATA_SOURCE_FLAG_STREAM, NULL, &sound); if (result != MA_SUCCESS) { printf("Failed to load sound: %s\n", argv[1]); ma_engine_uninit(&engine); @@ -35,7 +35,7 @@ int main(int argc, char** argv) /*ma_data_source_seek_to_pcm_frame(sound.pDataSource, 5000000);*/ - ma_engine_sound_set_volume(&engine, &sound, 0.25f); + /*ma_engine_sound_set_volume(&engine, &sound, 0.25f);*/ ma_engine_sound_set_pitch(&engine, &sound, 1.0f); ma_engine_sound_set_pan(&engine, &sound, 0.0f); ma_engine_sound_set_looping(&engine, &sound, MA_TRUE); @@ -48,7 +48,7 @@ int main(int argc, char** argv) ma_engine_play_sound(&engine, argv[3], NULL);*/ #endif -#if 1 +#if 0 float pitch = 1; float pitchStep = 0.01f; float pitchMin = 0.125f; diff --git a/research/ma_engine.h b/research/ma_engine.h index cd434e05..176ef9bc 100644 --- a/research/ma_engine.h +++ b/research/ma_engine.h @@ -95,7 +95,6 @@ The flags below are used for controlling how the resource manager should handle #define MA_DATA_SOURCE_FLAG_ASYNC 0x00000004 /* When set, the resource manager will load the data source asynchronously. */ - typedef enum { ma_resource_manager_data_buffer_encoding_encoded, @@ -118,24 +117,20 @@ typedef struct ma_resource_manager_data_source ma_resource_manager_data_source; - - -/* TODO: May need to do some stress testing and tweak this. */ -/* The job queue capacity must be a multiple of 32. */ -#ifndef MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY -#define MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY 1024 +#ifndef MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY +#define MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY 1024 #endif -#define MA_MESSAGE_TERMINATE 0x00000000 -#define MA_MESSAGE_LOAD_DATA_BUFFER 0x00000001 -#define MA_MESSAGE_FREE_DATA_BUFFER 0x00000002 -#define MA_MESSAGE_LOAD_DATA_STREAM 0x00000003 -#define MA_MESSAGE_FREE_DATA_STREAM 0x00000004 -#define MA_MESSAGE_LOAD_DATA_SOURCE 0x00000005 -/*#define MA_MESSAGE_FREE_DATA_SOURCE 0x00000006*/ -#define MA_MESSAGE_DECODE_BUFFER_PAGE 0x00000007 -#define MA_MESSAGE_DECODE_STREAM_PAGE 0x00000008 -#define MA_MESSAGE_SEEK_DATA_STREAM 0x00000009 +#define MA_JOB_QUIT 0x00000000 +#define MA_JOB_LOAD_DATA_BUFFER 0x00000001 +#define MA_JOB_FREE_DATA_BUFFER 0x00000002 +#define MA_JOB_PAGE_DATA_BUFFER 0x00000003 +#define MA_JOB_LOAD_DATA_STREAM 0x00000004 +#define MA_JOB_FREE_DATA_STREAM 0x00000005 +#define MA_JOB_PAGE_DATA_STREAM 0x00000006 +#define MA_JOB_SEEK_DATA_STREAM 0x00000007 +#define MA_JOB_LOAD_DATA_SOURCE 0x00000008 +#define MA_JOB_FREE_DATA_SOURCE 0x00000009 /* @@ -151,16 +146,14 @@ The slot index is stored in the low 32 bits. The reference counter is stored in +-----------------+-----------------+ | Reference Count | Slot Index | +-----------------+-----------------+ - - */ typedef struct { volatile struct { ma_uint32 bitfield; - } groups[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY/32]; - ma_uint32 slots[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; /* 1 bit to indicate if the slot is allocated, 31 bits for reference counting. */ + } groups[MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY/32]; + ma_uint32 slots[MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY]; /* 32 bits for reference counting for ABA mitigation. */ ma_uint32 count; /* Allocation count. */ } ma_slot_allocator; @@ -182,6 +175,7 @@ typedef struct ma_uint64 allocation; } toc; /* 8 bytes. We encode the job code into the slot allocation data to save space. */ ma_uint64 next; /* refcount + slot for the next item. Does not include the job code. */ + ma_uint32 order; /* Execution order. Used to create a data dependency and ensure a job is executed in order. Usage is contextual depending on the job type. */ union { @@ -192,67 +186,11 @@ typedef struct char* pFilePath; ma_event* pEvent; } loadDataBuffer; - }; -} ma_job; - -MA_API ma_job ma_job_init(ma_uint16 code); - - -#define MA_JOB_QUEUE_ASYNC 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */ - -typedef struct -{ - ma_uint32 flags; /* Flags passed in at initialization time. */ - ma_uint64 head; /* The first item in the list. Required for removing from the top of the list. */ - ma_uint64 tail; /* The last item in the list. Required for appending to the end of the list. */ - ma_semaphore sem; /* Only used when MA_JOB_QUEUE_ASYNC is unset. */ - ma_slot_allocator allocator; - ma_job jobs[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; -} ma_job_queue; - -MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue); -MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue); -MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob); -MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob); - - -typedef struct -{ - ma_uint16 code; - ma_uint16 slot; - union - { - struct - { - ma_resource_manager_data_buffer* pDataBuffer; - char* pFilePath; /* Allocated when the message is posted, freed by the async thread after loading. */ - ma_event* pEvent; - } loadDataBuffer; struct { ma_resource_manager_data_buffer* pDataBuffer; } freeDataBuffer; struct - { - ma_resource_manager_data_stream* pDataStream; - char* pFilePath; /* Allocated when the message is posted, freed by the async thread after loading. */ - ma_event* pEvent; - } loadDataStream; - struct - { - ma_resource_manager_data_stream* pDataStream; - ma_event* pEvent; - } freeDataStream; - struct - { - ma_resource_manager_data_source* pDataSource; - ma_event* pEvent; - } loadDataSource; - struct - { - ma_resource_manager_data_source* pDataSource; - } freeDataSource; - struct { ma_resource_manager_data_buffer* pDataBuffer; ma_decoder* pDecoder; @@ -261,40 +199,66 @@ typedef struct size_t dataSizeInBytes; ma_uint64 decodedFrameCount; ma_bool32 isUnknownLength; /* When set to true does not update the running frame count of the data buffer nor the data pointer until the last page has been decoded. */ - } decodeBufferPage; + } pageDataBuffer; + struct { ma_resource_manager_data_stream* pDataStream; - ma_uint32 pageIndex; /* The index of the page to decode into. */ - } decodeStreamPage; + char* pFilePath; /* Allocated when the job is posted, freed by the job thread after loading. */ + ma_event* pEvent; + } loadDataStream; + struct + { + ma_resource_manager_data_stream* pDataStream; + ma_event* pEvent; + } freeDataStream; + struct + { + ma_resource_manager_data_stream* pDataStream; + ma_uint32 pageIndex; /* The index of the page to decode into. */ + } pageDataStream; struct { ma_resource_manager_data_stream* pDataStream; ma_uint64 frameIndex; } seekDataStream; + + struct + { + ma_resource_manager_data_source* pDataSource; + ma_event* pEvent; + } loadDataSource; + struct + { + ma_resource_manager_data_source* pDataSource; + } freeDataSource; }; -} ma_resource_manager_message; +} ma_job; -MA_API ma_resource_manager_message ma_resource_manager_message_init(ma_uint16 code); +MA_API ma_job ma_job_init(ma_uint16 code); +#define MA_JOB_QUEUE_NON_BLOCKING 0x00000001 /* When set, ma_job_queue_next() will not wait and no semaphore will be signaled in ma_job_queue_post(). ma_job_queue_next() will return MA_NO_DATA_AVAILABLE if nothing is available. */ + typedef struct { - ma_resource_manager_message messages[MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY]; - volatile ma_uint32 getCursor; /* For reading. */ - volatile ma_uint32 putCursor; /* For writing. */ - ma_semaphore sem; /* Semaphore for only freeing */ - ma_mutex lock; /* For thread-safe access to the message queue. */ -} ma_resource_manager_message_queue; + ma_uint32 flags; /* Flags passed in at initialization time. */ + ma_uint64 head; /* The first item in the list. Required for removing from the top of the list. */ + ma_uint64 tail; /* The last item in the list. Required for appending to the end of the list. */ + ma_semaphore sem; /* Only used when MA_JOB_QUEUE_ASYNC is unset. */ + ma_slot_allocator allocator; + ma_job jobs[MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY]; +} ma_job_queue; -MA_API ma_result ma_resource_manager_message_queue_init(ma_resource_manager* pResourceManager, ma_resource_manager_message_queue* pQueue); -MA_API void ma_resource_manager_message_queue_uninit(ma_resource_manager_message_queue* pQueue); -MA_API ma_result ma_resource_manager_message_queue_post(ma_resource_manager_message_queue* pQueue, const ma_resource_manager_message* pMessage); -MA_API ma_result ma_resource_manager_message_queue_next(ma_resource_manager_message_queue* pQueue, ma_resource_manager_message* pMessage); /* Blocking */ -MA_API ma_result ma_resource_manager_message_queue_peek(ma_resource_manager_message_queue* pQueue, ma_resource_manager_message* pMessage); /* Non-Blocking */ -MA_API ma_result ma_resource_manager_message_queue_post_terminate(ma_resource_manager_message_queue* pQueue); +MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue); +MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue); +MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob); +MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob); +#define MA_RESOURCE_MANAGER_NO_JOB_THREAD 0x00000001 /* Enable this to manage the job thread yourself. */ +#define MA_RESOURCE_MANAGER_NON_BLOCKING 0x00000002 /* Indicates ma_resource_manager_next_job() should not block. Only valid with MA_RESOURCE_MANAGER_NO_JOB_THREAD. */ + typedef struct { const void* pData; @@ -323,9 +287,11 @@ typedef struct struct ma_resource_manager_data_buffer { - ma_uint32 hashedName32; /* The hashed name. This is the key. */ + ma_uint32 hashedName32; /* The hashed name. This is the key. */ ma_uint32 refCount; - ma_result result; /* Result from asynchronous loading. When loading set to MA_BUSY. When fully loaded set to MA_SUCCESS. When deleting set to MA_UNAVAILABLE. */ + ma_result result; /* Result from asynchronous loading. When loading set to MA_BUSY. When fully loaded set to MA_SUCCESS. When deleting set to MA_UNAVAILABLE. */ + ma_uint32 executionCounter; /* For allocating execution orders for jobs. */ + ma_uint32 executionPointer; /* For managing the order of execution for asynchronous jobs relating to this object. Incremented as jobs complete processing. */ ma_bool32 isDataOwnedByResourceManager; ma_resource_manager_memory_buffer data; ma_resource_manager_data_buffer* pParent; @@ -335,22 +301,24 @@ struct ma_resource_manager_data_buffer struct ma_resource_manager_data_stream { - ma_decoder decoder; /* Used for filling pages with data. This is only ever accessed by the async thread. The public API should never touch this. */ - ma_bool32 isDecoderInitialized; /* Required for determining whether or not the decoder should be uninitialized in MA_MESSAGE_FREE_DATA_STREAM. */ - ma_uint32 relativeCursor; /* The playback cursor, relative to the current page. Only ever accessed by the public API. Never accessed by the async thread. */ - ma_uint32 currentPageIndex; /* Toggles between 0 and 1. Index 0 is the first half of pPageData. Index 1 is the second half. Only ever accessed by the public API. Never accessed by the async thread. */ + ma_decoder decoder; /* Used for filling pages with data. This is only ever accessed by the job thread. The public API should never touch this. */ + ma_bool32 isDecoderInitialized; /* Required for determining whether or not the decoder should be uninitialized in MA_JOB_FREE_DATA_STREAM. */ + ma_uint32 relativeCursor; /* The playback cursor, relative to the current page. Only ever accessed by the public API. Never accessed by the job thread. */ + ma_uint32 currentPageIndex; /* Toggles between 0 and 1. Index 0 is the first half of pPageData. Index 1 is the second half. Only ever accessed by the public API. Never accessed by the job thread. */ + ma_uint32 executionCounter; /* For allocating execution orders for jobs. */ + ma_uint32 executionPointer; /* For managing the order of execution for asynchronous jobs relating to this object. Incremented as jobs complete processing. */ - /* Written by the public API, read by the async thread. */ + /* Written by the public API, read by the job thread. */ ma_bool32 isLooping; /* Whether or not the stream is looping. It's important to set the looping flag at the data stream level for smooth loop transitions. */ - /* Written by the async thread, read by the public API. */ + /* Written by the job thread, read by the public API. */ void* pPageData; /* Buffer containing the decoded data of each page. Allocated once at initialization time. */ ma_uint32 pageFrameCount[2]; /* The number of valid PCM frames in each page. Used to determine the last valid frame. */ - /* Written and read by both the public API and the async thread. */ + /* Written and read by both the public API and the job thread. */ ma_result result; /* Result from asynchronous loading. When loading set to MA_BUSY. When initialized set to MA_SUCCESS. When deleting set to MA_UNAVAILABLE. If an error occurs when loading, set to an error code. */ ma_bool32 isDecoderAtEnd; /* Whether or not the decoder has reached the end. */ - ma_bool32 isPageValid[2]; /* Booleans to indicate whether or not a page is valid. Set to false by the public API, set to true by the async thread. Set to false as the pages are consumed, true when they are filled. */ + ma_bool32 isPageValid[2]; /* Booleans to indicate whether or not a page is valid. Set to false by the public API, set to true by the job thread. Set to false as the pages are consumed, true when they are filled. */ ma_bool32 seekCounter; /* When 0, no seeking is being performed. When > 0, a seek is being performed and reading should be delayed with MA_BUSY. */ }; @@ -358,14 +326,16 @@ struct ma_resource_manager_data_source { ma_data_source_callbacks ds; ma_resource_manager* pResourceManager; - ma_result result; /* Result from asynchronous loading. When loading set to MA_BUSY. When fully loaded set to MA_SUCCESS. When deleting set to MA_UNAVAILABLE. */ - ma_uint32 flags; /* The flags that were passed in to ma_resource_manager_data_source_init(). */ + ma_result result; /* Result from asynchronous loading. When loading set to MA_BUSY. When fully loaded set to MA_SUCCESS. When deleting set to MA_UNAVAILABLE. */ + ma_uint32 flags; /* The flags that were passed in to ma_resource_manager_data_source_init(). */ + ma_uint32 executionCounter; /* For allocating execution orders for jobs. */ + ma_uint32 executionPointer; /* For managing the order of execution for asynchronous jobs relating to this object. Incremented as jobs complete processing. */ union { struct { ma_resource_manager_data_buffer* pDataBuffer; - ma_uint64 cursor; /* Only used with data buffers (the cursor is drawn from the internal decoder for data streams). Only updated by the public API. Never written nor read from the async thread. */ + ma_uint64 cursor; /* Only used with data buffers (the cursor is drawn from the internal decoder for data streams). Only updated by the public API. Never written nor read from the job thread. */ ma_bool32 seekToCursorOnNextRead; /* On the next read we need to seek to the frame cursor. */ ma_bool32 isLooping; ma_resource_manager_data_buffer_connector connectorType; @@ -388,6 +358,7 @@ typedef struct ma_format decodedFormat; ma_uint32 decodedChannels; ma_uint32 decodedSampleRate; + ma_uint32 flags; ma_vfs* pVFS; /* Can be NULL in which case defaults will be used. */ } ma_resource_manager_config; @@ -398,8 +369,8 @@ struct ma_resource_manager ma_resource_manager_config config; ma_resource_manager_data_buffer* pRootDataBuffer; /* The root buffer in the binary tree. */ ma_mutex dataBufferLock; /* For synchronizing access to the data buffer binary tree. */ - ma_thread asyncThread; /* Thread for running asynchronous operations. */ - ma_resource_manager_message_queue messageQueue; + ma_thread jobThread; /* The thread for executing jobs. Will probably turn this into an array. */ + ma_job_queue jobQueue; /* Lock-free multi-consumer, multi-producer job queue for managing jobs for asynchronous decoding and streaming. */ ma_default_vfs defaultVFS; /* Only used if a custom VFS is not specified. */ }; @@ -414,7 +385,6 @@ MA_API ma_result ma_resource_manager_register_decoded_data(ma_resource_manager* MA_API ma_result ma_resource_manager_register_encoded_data(ma_resource_manager* pResourceManager, const char* pName, const void* pData, size_t sizeInBytes); /* Does not copy. Increments the reference count if already exists and returns MA_SUCCESS. */ MA_API ma_result ma_resource_manager_unregister_data(ma_resource_manager* pResourceManager, const char* pName); - /* Data Streams. */ MA_API ma_result ma_resource_manager_create_data_stream(ma_resource_manager* pResourceManager, const char* pFilePath, ma_event* pEvent, ma_resource_manager_data_stream* pDataStream); MA_API ma_result ma_resource_manager_delete_data_stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream); @@ -434,12 +404,10 @@ MA_API ma_result ma_resource_manager_data_source_result(ma_resource_manager* pRe MA_API ma_result ma_resource_manager_data_source_set_looping(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource, ma_bool32 isLooping); MA_API ma_result ma_resource_manager_data_source_get_looping(ma_resource_manager* pResourceManager, const ma_resource_manager_data_source* pDataSource, ma_bool32* pIsLooping); -/* Message handling. */ -MA_API ma_result ma_resource_manager_handle_message(ma_resource_manager* pResourceManager, const ma_resource_manager_message* pMessage); -MA_API ma_result ma_resource_manager_post_message(ma_resource_manager* pResourceManager, const ma_resource_manager_message* pMessage); /* Message will be copied. */ -MA_API ma_result ma_resource_manager_next_message(ma_resource_manager* pResourceManager, ma_resource_manager_message* pMessage); -MA_API ma_result ma_resource_manager_peek_message(ma_resource_manager* pResourceManager, ma_resource_manager_message* pMessage); - +/* Job management. */ +MA_API ma_result ma_resource_manager_post_job(ma_resource_manager* pResourceManager, const ma_job* pJob); +MA_API ma_result ma_resource_manager_next_job(ma_resource_manager* pResourceManager, ma_job* pJob); +MA_API ma_result ma_resource_manager_process_job(ma_resource_manager* pResourceManager, ma_job* pJob); /* Engine @@ -865,7 +833,7 @@ MA_API ma_result ma_job_queue_init(ma_uint32 flags, ma_job_queue* pQueue) ma_slot_allocator_init(&pQueue->allocator); /* Will not fail. */ /* We need a semaphore if we're running in synchronous mode. */ - if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { ma_semaphore_init(0, &pQueue->sem); } @@ -888,7 +856,7 @@ MA_API ma_result ma_job_queue_uninit(ma_job_queue* pQueue) } /* All we need to do is uninitialize the semaphore. */ - if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { ma_semaphore_uninit(&pQueue->sem); } @@ -913,7 +881,7 @@ MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob) } /* At this point we should have a slot to place the job. */ - MA_ASSERT(ma_job_extract_slot(slot) < MA_RESOURCE_MANAGER_MESSAGE_QUEUE_CAPACITY); + MA_ASSERT(ma_job_extract_slot(slot) < MA_RESOURCE_MANAGER_JOB_QUEUE_CAPACITY); /* We need to put the job into memory before we do anything. */ pQueue->jobs[ma_job_extract_slot(slot)] = *pJob; @@ -940,7 +908,7 @@ MA_API ma_result ma_job_queue_post(ma_job_queue* pQueue, const ma_job* pJob) /* Signal the semaphore as the last step if we're using synchronous mode. */ - if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { ma_semaphore_release(&pQueue->sem); } @@ -958,7 +926,7 @@ MA_API ma_result ma_job_queue_next(ma_job_queue* pQueue, ma_job* pJob) } /* If we're running in synchronous mode we'll need to wait on a semaphore. */ - if ((pQueue->flags & MA_JOB_QUEUE_ASYNC) == 0) { + if ((pQueue->flags & MA_JOB_QUEUE_NON_BLOCKING) == 0) { ma_semaphore_wait(&pQueue->sem); } @@ -1083,215 +1051,6 @@ static ma_uint32 ma_hash_string_32(const char* str) -MA_API ma_resource_manager_message ma_resource_manager_message_init(ma_uint16 code) -{ - ma_resource_manager_message message; - - MA_ZERO_OBJECT(&message); - message.code = code; - - return message; -} - -MA_API ma_result ma_resource_manager_message_queue_init(ma_resource_manager* pResourceManager, ma_resource_manager_message_queue* pQueue) -{ - ma_result result; - - if (pQueue == NULL) { - return MA_INVALID_ARGS; - } - - MA_ZERO_OBJECT(pQueue); - - if (pResourceManager == NULL) { - return MA_INVALID_ARGS; - } - - /* We need a semaphore for blocking while there are no messages available. */ - result = ma_semaphore_init(0, &pQueue->sem); - if (result != MA_SUCCESS) { - return result; /* Failed to initialize semaphore. */ - } - - /* Currently we're naively locking access to the queue using a mutex. It will be nice to make this lock-free later on. */ - result = ma_mutex_init(&pQueue->lock); - if (result != MA_SUCCESS) { - ma_semaphore_uninit(&pQueue->sem); - return result; - } - - return MA_SUCCESS; -} - -MA_API void ma_resource_manager_message_queue_uninit(ma_resource_manager_message_queue* pQueue) -{ - if (pQueue == NULL) { - return; - } - - ma_mutex_uninit(&pQueue->lock); - ma_semaphore_uninit(&pQueue->sem); -} - - -static ma_uint32 ma_resource_manager_message_queue_get_count(ma_resource_manager_message_queue* pQueue) -{ - ma_uint32 getCursor; - ma_uint32 getIndex; - ma_uint32 getLoopFlag; - ma_uint32 putCursor; - ma_uint32 putIndex; - ma_uint32 putLoopFlag; - - MA_ASSERT(pQueue != NULL); - - getCursor = pQueue->getCursor; - putCursor = pQueue->putCursor; - - ma_rb__deconstruct_offset(getCursor, &getIndex, &getLoopFlag); - ma_rb__deconstruct_offset(putCursor, &putIndex, &putLoopFlag); - - if (getLoopFlag == putLoopFlag) { - return putIndex - getIndex; - } else { - return putIndex + (ma_countof(pQueue->messages) - getIndex); - } -} - -static ma_result ma_resource_manager_message_queue_post_nolock(ma_resource_manager_message_queue* pQueue, const ma_resource_manager_message* pMessage) -{ - ma_uint32 putIndex; - ma_uint32 putLoopFlag; - - MA_ASSERT(pQueue != NULL); - - /* - Here is where we can do some synchronized operations before inserting into the queue. This is useful for setting some state of an object - or for cancelling an event based on the state of an object. - */ - - /* We cannot be decoding anything if the data buffer is set to any status other than MA_BUSY. */ - if (pMessage->code == MA_MESSAGE_DECODE_BUFFER_PAGE) { - MA_ASSERT(pMessage->decodeBufferPage.pDataBuffer != NULL); - - if (pMessage->decodeBufferPage.pDataBuffer->result != MA_BUSY) { - return MA_INVALID_OPERATION; /* Cannot decode after the data buffer has been marked as unavailable. Abort. */ - } - } - - - if (ma_resource_manager_message_queue_get_count(pQueue) == ma_countof(pQueue->messages)) { - return MA_OUT_OF_MEMORY; /* The queue is already full. */ - } - - ma_rb__deconstruct_offset(pQueue->putCursor, &putIndex, &putLoopFlag); - - pQueue->messages[putIndex] = *pMessage; - - /* Move the cursor forward. */ - putIndex += 1; - if (putIndex > ma_countof(pQueue->messages)) { - putIndex = 0; - putLoopFlag ^= 0x80000000; - } - - c89atomic_exchange_32(&pQueue->putCursor, ma_rb__construct_offset(putIndex, putLoopFlag)); - - /* Now that the message is in the queue we can let the consumer thread know about it by releasing the semaphore. */ - ma_semaphore_release(&pQueue->sem); - - return MA_SUCCESS; -} - -MA_API ma_result ma_resource_manager_message_queue_post(ma_resource_manager_message_queue* pQueue, const ma_resource_manager_message* pMessage) -{ - ma_result result; - - if (pQueue == NULL || pMessage == NULL) { - return MA_INVALID_ARGS; - } - - /* This is the producer. There can be many producer threads, so a typical single-producer, single-consumer ring buffer will not work here. */ - if (ma_resource_manager_message_queue_get_count(pQueue) == ma_countof(pQueue->messages)) { - return MA_OUT_OF_MEMORY; /* The queue is already full. */ - } - - ma_mutex_lock(&pQueue->lock); - { - result = ma_resource_manager_message_queue_post_nolock(pQueue, pMessage); - } - ma_mutex_unlock(&pQueue->lock); - - return result; -} - -MA_API ma_result ma_resource_manager_message_queue_next(ma_resource_manager_message_queue* pQueue, ma_resource_manager_message* pMessage) -{ - ma_result result; - ma_uint32 getIndex; - ma_uint32 getLoopFlag; - - if (pQueue == NULL) { - return MA_INVALID_ARGS; - } - - /* This is the consumer. There is only ever a single consumer thread which means we have simplified lock-free requirements. */ - - /* We first need to wait for a message. */ - result = ma_semaphore_wait(&pQueue->sem); - if (result != MA_SUCCESS) { - return result; /* Failed to retrieve a message. */ - } - - MA_ASSERT(ma_resource_manager_message_queue_get_count(pQueue) > 0); - - /* We have a message so now we need to copy it to the output buffer and increment the cursor. */ - ma_rb__deconstruct_offset(pQueue->getCursor, &getIndex, &getLoopFlag); - - *pMessage = pQueue->messages[getIndex]; - - /* The cursor needs to be moved forward. */ - getIndex += 1; - if (getIndex == ma_countof(pQueue->messages)) { - getIndex = 0; - getLoopFlag ^= 0x80000000; - } - - c89atomic_exchange_32(&pQueue->getCursor, ma_rb__construct_offset(getIndex, getLoopFlag)); - - return MA_SUCCESS; -} - -MA_API ma_result ma_resource_manager_message_queue_peek(ma_resource_manager_message_queue* pQueue, ma_resource_manager_message* pMessage) -{ - ma_uint32 readIndex; - ma_uint32 loopFlag; - - if (pQueue == NULL || pMessage == NULL) { - return MA_INVALID_ARGS; - } - - /* This should only ever be called by the consumer thread. If the count is greater than zero it won't ever be reduced which means it's safe to read the message. */ - if (ma_resource_manager_message_queue_get_count(pQueue) == 0) { - MA_ZERO_OBJECT(pMessage); - return MA_NO_DATA_AVAILABLE; - } - - ma_rb__deconstruct_offset(pQueue->getCursor, &readIndex, &loopFlag); - - *pMessage = pQueue->messages[readIndex]; - - return MA_SUCCESS; -} - -MA_API ma_result ma_resource_manager_message_queue_post_terminate(ma_resource_manager_message_queue* pQueue) -{ - ma_resource_manager_message message = ma_resource_manager_message_init(MA_MESSAGE_TERMINATE); - return ma_resource_manager_message_queue_post(pQueue, &message); -} - - - /* Basic BST Functions */ @@ -1640,35 +1399,32 @@ static void ma_resource_manager_data_buffer_free(ma_resource_manager* pResourceM - - -static ma_thread_result MA_THREADCALL ma_resource_manager_resource_thread(void* pUserData) +static ma_thread_result MA_THREADCALL ma_resource_manager_job_thread(void* pUserData) { ma_resource_manager* pResourceManager = (ma_resource_manager*)pUserData; MA_ASSERT(pResourceManager != NULL); for (;;) { ma_result result; - ma_resource_manager_message message; + ma_job job; - result = ma_resource_manager_next_message(pResourceManager, &message); + result = ma_resource_manager_next_job(pResourceManager, &job); if (result != MA_SUCCESS) { break; } - /* Terminate if we got a termination message. */ - if (message.code == MA_MESSAGE_TERMINATE) { + /* Terminate if we got a quit message. */ + if (job.toc.code == MA_JOB_QUIT) { break; } - ma_resource_manager_handle_message(pResourceManager, &message); + ma_resource_manager_process_job(pResourceManager, &job); } return (ma_thread_result)0; } - MA_API ma_resource_manager_config ma_resource_manager_config_init(ma_format decodedFormat, ma_uint32 decodedChannels, ma_uint32 decodedSampleRate, const ma_allocation_callbacks* pAllocationCallbacks) { ma_resource_manager_config config; @@ -1689,6 +1445,7 @@ MA_API ma_resource_manager_config ma_resource_manager_config_init(ma_format deco MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pConfig, ma_resource_manager* pResourceManager) { ma_result result; + ma_uint32 jobQueueFlags; if (pResourceManager == NULL) { return MA_INVALID_ARGS; @@ -1712,26 +1469,38 @@ MA_API ma_result ma_resource_manager_init(const ma_resource_manager_config* pCon pResourceManager->config.pVFS = &pResourceManager->defaultVFS; } + /* Job queue. */ + jobQueueFlags = 0; + if ((pConfig->flags & MA_RESOURCE_MANAGER_NON_BLOCKING) != 0) { + if ((pConfig->flags & MA_RESOURCE_MANAGER_NO_JOB_THREAD) != 0) { + return MA_INVALID_ARGS; /* Non-blocking mode is only valid for self-managed job threads. */ + } + + jobQueueFlags |= MA_JOB_QUEUE_NON_BLOCKING; + } + + result = ma_job_queue_init(jobQueueFlags, &pResourceManager->jobQueue); + if (result != MA_SUCCESS) { + ma_mutex_uninit(&pResourceManager->dataBufferLock); + return result; + } + /* Data buffer lock. */ result = ma_mutex_init(&pResourceManager->dataBufferLock); if (result != MA_SUCCESS) { return result; } - /* We need a message queue. */ - result = ma_resource_manager_message_queue_init(pResourceManager, &pResourceManager->messageQueue); - if (result != MA_SUCCESS) { - ma_mutex_uninit(&pResourceManager->dataBufferLock); - return result; - } - - - /* Create the resource thread last to ensure the new thread has access to valid data. */ - result = ma_thread_create(&pResourceManager->asyncThread, ma_thread_priority_normal, 0, ma_resource_manager_resource_thread, pResourceManager); - if (result != MA_SUCCESS) { - ma_mutex_uninit(&pResourceManager->dataBufferLock); - ma_resource_manager_message_queue_uninit(&pResourceManager->messageQueue); - return result; + /* Create the job thread last to ensure the new thread has access to valid data. */ + if ((pConfig->flags & MA_RESOURCE_MANAGER_NO_JOB_THREAD) == 0) { + result = ma_thread_create(&pResourceManager->jobThread, ma_thread_priority_normal, 0, ma_resource_manager_job_thread, pResourceManager); + if (result != MA_SUCCESS) { + ma_mutex_uninit(&pResourceManager->dataBufferLock); + ma_job_queue_uninit(&pResourceManager->jobQueue); + return result; + } + } else { + pResourceManager->jobThread = NULL; } return MA_SUCCESS; @@ -1754,31 +1523,40 @@ static void ma_resource_manager_delete_all_data_buffers(ma_resource_manager* pRe MA_API void ma_resource_manager_uninit(ma_resource_manager* pResourceManager) { + ma_job quitJob; + if (pResourceManager == NULL) { return; } - /* The async threads need to be killed first. To do this we need to post a termination message to the message queue and then wait for the thread. */ - ma_resource_manager_message_queue_post_terminate(&pResourceManager->messageQueue); - ma_thread_wait(&pResourceManager->asyncThread); + /* The job thread need to be killed first. To do this we need to post a quit message to the message queue and then wait for the thread. */ + quitJob = ma_job_init(MA_JOB_QUIT); + ma_resource_manager_post_job(pResourceManager, &quitJob); + ma_thread_wait(&pResourceManager->jobThread); /* At this point the thread should have returned and no other thread should be accessing our data. We can now delete all data buffers. */ ma_resource_manager_delete_all_data_buffers(pResourceManager); - /* The message queue is no longer needed. */ - ma_resource_manager_message_queue_uninit(&pResourceManager->messageQueue); + /* The job queue is no longer needed. */ + ma_job_queue_uninit(&pResourceManager->jobQueue); /* We're no longer doing anything with data buffers so the lock can now be uninitialized. */ ma_mutex_uninit(&pResourceManager->dataBufferLock); } +static ma_uint32 ma_resource_manager_data_buffer_next_execution_order(ma_resource_manager_data_buffer* pDataBuffer) +{ + MA_ASSERT(pDataBuffer != NULL); + return c89atomic_fetch_add_32(&pDataBuffer->executionCounter, 1); +} + static ma_result ma_resource_manager_create_data_buffer_nolock(ma_resource_manager* pResourceManager, const char* pFilePath, ma_uint32 hashedName32, ma_resource_manager_data_buffer_encoding type, ma_resource_manager_memory_buffer* pExistingData, ma_event* pEvent, ma_resource_manager_data_buffer** ppDataBuffer) { ma_result result; ma_resource_manager_data_buffer* pDataBuffer; ma_resource_manager_data_buffer* pInsertPoint; - char* pFilePathCopy; /* Allocated here, freed in the resource thread. */ + char* pFilePathCopy; /* Allocated here, freed in the job thread. */ MA_ASSERT(pResourceManager != NULL); MA_ASSERT(pFilePath != NULL); @@ -1831,8 +1609,8 @@ static ma_result ma_resource_manager_create_data_buffer_nolock(ma_resource_manag pDataBuffer->data = *pExistingData; pDataBuffer->result = MA_SUCCESS; } else { - /* The data needs to be loaded. We do this by posting an event to the resource thread. */ - ma_resource_manager_message message; + /* The data needs to be loaded. We do this by posting a job to the job thread. */ + ma_job job; pDataBuffer->isDataOwnedByResourceManager = MA_TRUE; pDataBuffer->result = MA_BUSY; @@ -1849,12 +1627,13 @@ static ma_result ma_resource_manager_create_data_buffer_nolock(ma_resource_manag return MA_OUT_OF_MEMORY; } - /* We now have everything we need to post the message to the resource thread. This is the last thing we need to do from here. The rest will be done by the resource thread. */ - message = ma_resource_manager_message_init(MA_MESSAGE_LOAD_DATA_BUFFER); - message.loadDataBuffer.pDataBuffer = pDataBuffer; - message.loadDataBuffer.pFilePath = pFilePathCopy; - message.loadDataBuffer.pEvent = pEvent; - result = ma_resource_manager_post_message(pResourceManager, &message); + /* We now have everything we need to post the job to the job thread. */ + job = ma_job_init(MA_JOB_LOAD_DATA_BUFFER); + job.order = ma_resource_manager_data_buffer_next_execution_order(pDataBuffer); + job.loadDataBuffer.pDataBuffer = pDataBuffer; + job.loadDataBuffer.pFilePath = pFilePathCopy; + job.loadDataBuffer.pEvent = pEvent; + result = ma_resource_manager_post_job(pResourceManager, &job); if (result != MA_SUCCESS) { if (pEvent != NULL) { ma_event_signal(pEvent); @@ -1933,12 +1712,13 @@ static ma_result ma_resource_manager_delete_data_buffer_nolock(ma_resource_manag /* Don't delete any underlying data if it's not owned by the resource manager. */ if (pDataBuffer->isDataOwnedByResourceManager) { - ma_resource_manager_message message = ma_resource_manager_message_init(MA_MESSAGE_FREE_DATA_BUFFER); - message.freeDataBuffer.pDataBuffer = pDataBuffer; + ma_job job = ma_job_init(MA_JOB_FREE_DATA_BUFFER); + job.order = ma_resource_manager_data_buffer_next_execution_order(pDataBuffer); + job.freeDataBuffer.pDataBuffer = pDataBuffer; - result = ma_resource_manager_post_message(pResourceManager, &message); + result = ma_resource_manager_post_job(pResourceManager, &job); if (result != MA_SUCCESS) { - return result; /* Failed to post the message for some reason. Probably due to too many messages in the queue. */ + return result; } } } @@ -2042,12 +1822,17 @@ MA_API ma_result ma_resource_manager_unregister_data(ma_resource_manager* pResou } +static ma_uint32 ma_resource_manager_data_stream_next_execution_order(ma_resource_manager_data_stream* pDataStream) +{ + MA_ASSERT(pDataStream != NULL); + return c89atomic_fetch_add_32(&pDataStream->executionCounter, 1); +} MA_API ma_result ma_resource_manager_create_data_stream(ma_resource_manager* pResourceManager, const char* pFilePath, ma_event* pEvent, ma_resource_manager_data_stream* pDataStream) { ma_result result; char* pFilePathCopy; - ma_resource_manager_message message; + ma_job job; if (pDataStream == NULL) { if (pEvent != NULL) { @@ -2068,7 +1853,7 @@ MA_API ma_result ma_resource_manager_create_data_stream(ma_resource_manager* pRe return MA_INVALID_ARGS; } - /* We want all access to the VFS and the internal decoder to happen on the async thread just to keep things easier to manage for the VFS. */ + /* We want all access to the VFS and the internal decoder to happen on the job thread just to keep things easier to manage for the VFS. */ /* We need a copy of the file path. We should probably make this more efficient, but for now we'll do a transient memory allocation. */ pFilePathCopy = ma_copy_string(pFilePath, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_TRANSIENT_STRING*/); @@ -2080,12 +1865,13 @@ MA_API ma_result ma_resource_manager_create_data_stream(ma_resource_manager* pRe return MA_OUT_OF_MEMORY; } - /* We now have everything we need to post the message to the resource thread. This is the last thing we need to do from here. The rest will be done by the resource thread. */ - message = ma_resource_manager_message_init(MA_MESSAGE_LOAD_DATA_STREAM); - message.loadDataStream.pDataStream = pDataStream; - message.loadDataStream.pFilePath = pFilePathCopy; - message.loadDataStream.pEvent = pEvent; - result = ma_resource_manager_post_message(pResourceManager, &message); + /* We now have everything we need to post the job. This is the last thing we need to do from here. The rest will be done by the job thread. */ + job = ma_job_init(MA_JOB_LOAD_DATA_STREAM); + job.order = ma_resource_manager_data_stream_next_execution_order(pDataStream); + job.loadDataStream.pDataStream = pDataStream; + job.loadDataStream.pFilePath = pFilePathCopy; + job.loadDataStream.pEvent = pEvent; + result = ma_resource_manager_post_job(pResourceManager, &job); if (result != MA_SUCCESS) { if (pEvent != NULL) { ma_event_signal(pEvent); @@ -2101,7 +1887,7 @@ MA_API ma_result ma_resource_manager_create_data_stream(ma_resource_manager* pRe MA_API ma_result ma_resource_manager_delete_data_stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream) { ma_event freeEvent; - ma_resource_manager_message message; + ma_job job; if (pResourceManager == NULL || pDataStream == NULL) { return MA_INVALID_ARGS; @@ -2111,17 +1897,18 @@ MA_API ma_result ma_resource_manager_delete_data_stream(ma_resource_manager* pRe c89atomic_exchange_32(&pDataStream->result, MA_UNAVAILABLE); /* - We need to post a message to ensure we're not in the middle or decoding or anything. Because the object is owned by the caller, we'll need + We need to post a job to ensure we're not in the middle or decoding or anything. Because the object is owned by the caller, we'll need to wait for it to complete before returning which means we need an event. */ ma_event_init(&freeEvent); - message = ma_resource_manager_message_init(MA_MESSAGE_FREE_DATA_STREAM); - message.freeDataStream.pDataStream = pDataStream; - message.freeDataStream.pEvent = &freeEvent; - ma_resource_manager_post_message(pResourceManager, &message); + job = ma_job_init(MA_JOB_FREE_DATA_STREAM); + job.order = ma_resource_manager_data_stream_next_execution_order(pDataStream); + job.freeDataStream.pDataStream = pDataStream; + job.freeDataStream.pEvent = &freeEvent; + ma_resource_manager_post_job(pResourceManager, &job); - /* We need to wait for the message before we return. */ + /* We need to wait for the job to finish processing before we return. */ ma_event_wait(&freeEvent); ma_event_uninit(&freeEvent); @@ -2177,6 +1964,60 @@ static void* ma_resource_manager_data_stream_get_page_data_pointer(ma_resource_m return ma_offset_ptr(pDataStream->pPageData, ((ma_resource_manager_data_stream_get_page_size_in_frames(pDataStream) * pageIndex) + relativeCursor) * ma_get_bytes_per_frame(pDataStream->decoder.outputFormat, pDataStream->decoder.outputChannels)); } +static void ma_resource_manager_data_stream_fill_page(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, ma_uint32 pageIndex) +{ + ma_uint64 pageSizeInFrames; + ma_uint64 totalFramesReadForThisPage = 0; + void* pPageData = ma_resource_manager_data_stream_get_page_data_pointer(pDataStream, pageIndex, 0); + + pageSizeInFrames = ma_resource_manager_data_stream_get_page_size_in_frames(pDataStream); + + if (pDataStream->isLooping) { + while (totalFramesReadForThisPage < pageSizeInFrames) { + ma_uint64 framesRemaining; + ma_uint64 framesRead; + + framesRemaining = pageSizeInFrames - totalFramesReadForThisPage; + framesRead = ma_decoder_read_pcm_frames(&pDataStream->decoder, ma_offset_pcm_frames_ptr(pPageData, totalFramesReadForThisPage, pDataStream->decoder.outputFormat, pDataStream->decoder.outputChannels), framesRemaining); + totalFramesReadForThisPage += framesRead; + + /* Loop back to the start if we reached the end. */ + if (framesRead < framesRemaining) { + ma_decoder_seek_to_pcm_frame(&pDataStream->decoder, 0); + } + } + } else { + totalFramesReadForThisPage = ma_decoder_read_pcm_frames(&pDataStream->decoder, pPageData, pageSizeInFrames); + } + + if (totalFramesReadForThisPage < pageSizeInFrames) { + c89atomic_exchange_32(&pDataStream->isDecoderAtEnd, MA_TRUE); + } + + c89atomic_exchange_32(&pDataStream->pageFrameCount[pageIndex], (ma_uint32)totalFramesReadForThisPage); + c89atomic_exchange_32(&pDataStream->isPageValid[pageIndex], MA_TRUE); + + (void)pResourceManager; +} + +static void ma_resource_manager_data_stream_fill_pages(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream) +{ + ma_uint32 iPage; + + MA_ASSERT(pResourceManager != NULL); + MA_ASSERT(pDataStream != NULL); + + /* For each page... */ + for (iPage = 0; iPage < 2; iPage += 1) { + ma_resource_manager_data_stream_fill_page(pResourceManager, pDataStream, iPage); + + /* If we reached the end make sure we get out of the loop to prevent us from trying to load the second page. */ + if (pDataStream->isDecoderAtEnd) { + break; + } + } +} + MA_API ma_result ma_resource_manager_data_stream_read_paged_pcm_frames(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, void* pFramesOut, ma_uint64 frameCount, ma_uint64* pFramesRead) @@ -2222,7 +2063,7 @@ MA_API ma_result ma_resource_manager_data_stream_read_paged_pcm_frames(ma_resour result = ma_resource_manager_data_stream_unmap_paged_pcm_frames(pResourceManager, pDataStream, mappedFrameCount); if (result != MA_SUCCESS) { - break; /* This is really bad - will only get an error here if we failed to post a message to the queue for loading the next page. */ + break; /* This is really bad - will only get an error here if we failed to post a job to the queue for loading the next page. */ } } @@ -2235,7 +2076,7 @@ MA_API ma_result ma_resource_manager_data_stream_read_paged_pcm_frames(ma_resour MA_API ma_result ma_resource_manager_data_stream_seek_to_pcm_frame(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, ma_uint64 frameIndex) { - ma_resource_manager_message message; + ma_job job; if (pResourceManager == NULL || pDataStream == NULL) { return MA_INVALID_ARGS; @@ -2250,8 +2091,8 @@ MA_API ma_result ma_resource_manager_data_stream_seek_to_pcm_frame(ma_resource_m /* We need to clear our currently loaded pages so that the stream starts playback from the new seek point as soon as possible. These are for the purpose of the public - API and will be ignored by the seek message. The seek message on the async thread will operate on the assumption that both pages have been marked as invalid and - the cursor is at the start of the first page. + API and will be ignored by the seek job. The seek job will operate on the assumption that both pages have been marked as invalid and the cursor is at the start of + the first page. */ pDataStream->relativeCursor = 0; pDataStream->currentPageIndex = 0; @@ -2259,13 +2100,14 @@ MA_API ma_result ma_resource_manager_data_stream_seek_to_pcm_frame(ma_resource_m c89atomic_exchange_32(&pDataStream->isPageValid[1], MA_FALSE); /* - The public API is not allowed to touch the internal decoder so we need to use a message to perform the seek. When seeking, the async thread will assume both pages + The public API is not allowed to touch the internal decoder so we need to use a job to perform the seek. When seeking, the job thread will assume both pages are invalid and any content contained within them will be discarded and replaced with newly decoded data. */ - message = ma_resource_manager_message_init(MA_MESSAGE_SEEK_DATA_STREAM); - message.seekDataStream.pDataStream = pDataStream; - message.seekDataStream.frameIndex = frameIndex; - return ma_resource_manager_post_message(pResourceManager, &message); + job = ma_job_init(MA_JOB_SEEK_DATA_STREAM); + job.order = ma_resource_manager_data_stream_next_execution_order(pDataStream); + job.seekDataStream.pDataStream = pDataStream; + job.seekDataStream.frameIndex = frameIndex; + return ma_resource_manager_post_job(pResourceManager, &job); } MA_API ma_result ma_resource_manager_data_stream_map_paged_pcm_frames(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, void** ppFramesOut, ma_uint64* pFrameCount) @@ -2294,7 +2136,7 @@ MA_API ma_result ma_resource_manager_data_stream_map_paged_pcm_frames(ma_resourc return MA_BUSY; } - /* If the page we're on is invalid it means we've caught up to the async thread. */ + /* If the page we're on is invalid it means we've caught up to the job thread. */ if (pDataStream->isPageValid[pDataStream->currentPageIndex] == MA_FALSE) { framesAvailable = 0; } else { @@ -2311,7 +2153,7 @@ MA_API ma_result ma_resource_manager_data_stream_map_paged_pcm_frames(ma_resourc if (pDataStream->isDecoderAtEnd) { return MA_AT_END; } else { - return MA_BUSY; /* There are no frames available, but we're not marked as EOF so we might have caught up to the async thread. Need to return MA_BUSY and wait for more data. */ + return MA_BUSY; /* There are no frames available, but we're not marked as EOF so we might have caught up to the job thread. Need to return MA_BUSY and wait for more data. */ } } @@ -2331,7 +2173,7 @@ MA_API ma_result ma_resource_manager_data_stream_unmap_paged_pcm_frames(ma_resou { ma_uint32 newRelativeCursor; ma_uint32 pageSizeInFrames; - ma_resource_manager_message message; + ma_job job; if (pResourceManager == NULL || pDataStream == NULL) { return MA_INVALID_ARGS; @@ -2348,26 +2190,26 @@ MA_API ma_result ma_resource_manager_data_stream_unmap_paged_pcm_frames(ma_resou pageSizeInFrames = ma_resource_manager_data_stream_get_page_size_in_frames(pDataStream); - /* Here is where we need to check if we need to load a new page, and if so, post a message to the async thread to load it. */ + /* Here is where we need to check if we need to load a new page, and if so, post a job to load it. */ newRelativeCursor = pDataStream->relativeCursor + (ma_uint32)frameCount; /* If the new cursor has flowed over to the next page we need to mark the old one as invalid and post an event for it. */ if (newRelativeCursor >= pageSizeInFrames) { - /*newRelativeCursor -= (pageSizeInFrames*(pDataStream->currentPageIndex+1));*/ newRelativeCursor -= pageSizeInFrames; - /* Here is where we post the message to the async thread to start decoding. */ - message = ma_resource_manager_message_init(MA_MESSAGE_DECODE_STREAM_PAGE); - message.decodeStreamPage.pDataStream = pDataStream; - message.decodeStreamPage.pageIndex = pDataStream->currentPageIndex; + /* Here is where we post the job start decoding. */ + job = ma_job_init(MA_JOB_PAGE_DATA_STREAM); + job.order = ma_resource_manager_data_stream_next_execution_order(pDataStream); + job.pageDataStream.pDataStream = pDataStream; + job.pageDataStream.pageIndex = pDataStream->currentPageIndex; /* The page needs to be marked as invalid so that the public API doesn't try reading from it. */ c89atomic_exchange_32(&pDataStream->isPageValid[pDataStream->currentPageIndex], MA_FALSE); - /* Before sending the message we need to make sure we set some state. */ + /* Before posting the job we need to make sure we set some state. */ pDataStream->relativeCursor = newRelativeCursor; pDataStream->currentPageIndex = (pDataStream->currentPageIndex + 1) & 0x01; - return ma_resource_manager_post_message(pResourceManager, &message); + return ma_resource_manager_post_job(pResourceManager, &job); } else { /* We haven't moved into a new page so we can just move the cursor forward. */ pDataStream->relativeCursor = newRelativeCursor; @@ -2393,6 +2235,14 @@ MA_API ma_result ma_resource_manager_data_stream_get_data_format(ma_resource_man } + +static ma_uint32 ma_resource_manager_data_source_next_execution_order(ma_resource_manager_data_source* pDataSource) +{ + MA_ASSERT(pDataSource != NULL); + return c89atomic_fetch_add_32(&pDataSource->executionCounter, 1); +} + + static ma_result ma_resource_manager_data_source_read__stream(ma_data_source* pDataSource, void* pFramesOut, ma_uint64 frameCount, ma_uint64* pFramesRead) { ma_resource_manager_data_source* pRMDataSource = (ma_resource_manager_data_source*)pDataSource; @@ -2454,7 +2304,7 @@ static ma_result ma_resource_manager_data_source_get_data_format__stream(ma_data static ma_result ma_resource_manager_data_source_init_stream(ma_resource_manager* pResourceManager, const char* pName, ma_uint32 flags, ma_resource_manager_data_source* pDataSource) { ma_result result; - ma_resource_manager_message message; + ma_job job; ma_event waitEvent; MA_ASSERT(pResourceManager != NULL); @@ -2476,9 +2326,10 @@ static ma_result ma_resource_manager_data_source_init_stream(ma_resource_manager pDataSource->ds.onGetDataFormat = ma_resource_manager_data_source_get_data_format__stream; pDataSource->result = MA_BUSY; - /* We need to post a message just like we do with data buffers because the caller may be wanting to run this asynchronously. */ - message = ma_resource_manager_message_init(MA_MESSAGE_LOAD_DATA_SOURCE); - message.loadDataSource.pDataSource = pDataSource; + /* We need to post a job just like we do with data buffers because the caller may be wanting to run this asynchronously. */ + job = ma_job_init(MA_JOB_LOAD_DATA_SOURCE); + job.order = ma_resource_manager_data_source_next_execution_order(pDataSource); + job.loadDataSource.pDataSource = pDataSource; if ((flags & MA_DATA_SOURCE_FLAG_ASYNC) == 0) { result = ma_event_init(&waitEvent); @@ -2487,28 +2338,28 @@ static ma_result ma_resource_manager_data_source_init_stream(ma_resource_manager return result; } - message.loadDataSource.pEvent = &waitEvent; + job.loadDataSource.pEvent = &waitEvent; } else { - message.loadDataSource.pEvent = NULL; + job.loadDataSource.pEvent = NULL; } - result = ma_resource_manager_post_message(pResourceManager, &message); + result = ma_resource_manager_post_job(pResourceManager, &job); if (result != MA_SUCCESS) { ma_resource_manager_delete_data_stream(pResourceManager, &pDataSource->dataStream.stream); - if (message.loadDataSource.pEvent != NULL) { - ma_event_uninit(message.loadDataSource.pEvent); + if (job.loadDataSource.pEvent != NULL) { + ma_event_uninit(job.loadDataSource.pEvent); } return result; } - /* The message has been posted. We now need to wait for the event to get signalled if we're in synchronous mode. */ - if (message.loadDataSource.pEvent != NULL) { + /* The job has been posted. We now need to wait for the event to get signalled if we're in synchronous mode. */ + if (job.loadDataSource.pEvent != NULL) { ma_result streamResult; - ma_event_wait(message.loadDataSource.pEvent); - ma_event_uninit(message.loadDataSource.pEvent); - message.loadDataSource.pEvent = NULL; + ma_event_wait(job.loadDataSource.pEvent); + ma_event_uninit(job.loadDataSource.pEvent); + job.loadDataSource.pEvent = NULL; /* If the data stream or data source have errors we need to return an error. */ streamResult = ma_resource_manager_data_stream_result(pResourceManager, &pDataSource->dataStream.stream); @@ -2719,7 +2570,8 @@ static ma_result ma_resource_manager_data_source_set_result_and_signal(ma_resour return result; } -static ma_result ma_resource_manager_data_source_init_backend_buffer(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource) + +static ma_result ma_resource_manager_data_source_init_connector(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource) { ma_result result; ma_resource_manager_data_buffer* pDataBuffer; @@ -2796,7 +2648,7 @@ static ma_result ma_resource_manager_data_source_init_backend_buffer(ma_resource return result; } -static ma_result ma_resource_manager_data_source_uninit_backend_buffer(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource) +static ma_result ma_resource_manager_data_source_uninit_connector(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource) { MA_ASSERT(pResourceManager != NULL); MA_ASSERT(pDataSource != NULL); @@ -2842,19 +2694,20 @@ static ma_result ma_resource_manager_data_source_init_buffer(ma_resource_manager pDataSource->ds.onUnmap = ma_resource_manager_data_source_unmap; pDataSource->ds.onGetDataFormat = ma_resource_manager_data_source_get_data_format; pDataSource->dataBuffer.pDataBuffer = pDataBuffer; - pDataSource->dataBuffer.connectorType = ma_resource_manager_data_buffer_connector_unknown; /* The backend type hasn't been determine yet - that happens when it's initialized properly by the resource thread. */ + pDataSource->dataBuffer.connectorType = ma_resource_manager_data_buffer_connector_unknown; /* The backend type hasn't been determined yet - that happens when it's initialized properly by the job thread. */ pDataSource->result = MA_BUSY; /* - If the data buffer has been fully initialized we can complete initialization of the data source now. Otherwise we need to post an event to the resource thread to complete + If the data buffer has been fully initialized we can complete initialization of the data source now. Otherwise we need to post an event to the job thread to complete initialization to ensure it's done after the data buffer. */ dataBufferResult = ma_resource_manager_data_buffer_result(pResourceManager, pDataBuffer); if (dataBufferResult == MA_BUSY) { - /* The data buffer is in the middle of loading. We need to post an event to the resource thread. */ - ma_resource_manager_message message; - message = ma_resource_manager_message_init(MA_MESSAGE_LOAD_DATA_SOURCE); - message.loadDataSource.pDataSource = pDataSource; + /* The data buffer is in the middle of loading. We need to post an event to the job thread. */ + ma_job job; + job = ma_job_init(MA_JOB_LOAD_DATA_SOURCE); + job.order = ma_resource_manager_data_source_next_execution_order(pDataSource); + job.loadDataSource.pDataSource = pDataSource; if ((flags & MA_DATA_SOURCE_FLAG_ASYNC) == 0) { result = ma_event_init(&waitEvent); @@ -2863,26 +2716,26 @@ static ma_result ma_resource_manager_data_source_init_buffer(ma_resource_manager return result; } - message.loadDataSource.pEvent = &waitEvent; + job.loadDataSource.pEvent = &waitEvent; } else { - message.loadDataSource.pEvent = NULL; + job.loadDataSource.pEvent = NULL; } - result = ma_resource_manager_post_message(pResourceManager, &message); + result = ma_resource_manager_post_job(pResourceManager, &job); if (result != MA_SUCCESS) { - if (message.loadDataSource.pEvent != NULL) { - ma_event_uninit(message.loadDataSource.pEvent); + if (job.loadDataSource.pEvent != NULL) { + ma_event_uninit(job.loadDataSource.pEvent); } ma_resource_manager_delete_data_buffer(pResourceManager, pDataBuffer); return result; } - /* The message has been posted. We now need to wait for the event to get signalled if we're in synchronous mode. */ - if (message.loadDataSource.pEvent != NULL) { - ma_event_wait(message.loadDataSource.pEvent); - ma_event_uninit(message.loadDataSource.pEvent); - message.loadDataSource.pEvent = NULL; + /* The job has been posted. We now need to wait for the event to get signalled if we're in synchronous mode. */ + if (job.loadDataSource.pEvent != NULL) { + ma_event_wait(job.loadDataSource.pEvent); + ma_event_uninit(job.loadDataSource.pEvent); + job.loadDataSource.pEvent = NULL; /* Check the status of the data buffer for any errors. Even in the event of an error, the data source will not be deleted. */ if (pDataBuffer->result != MA_SUCCESS) { @@ -2895,7 +2748,7 @@ static ma_result ma_resource_manager_data_source_init_buffer(ma_resource_manager return MA_SUCCESS; } else if (dataBufferResult == MA_SUCCESS) { /* The underlying data buffer has already been initialized so we can just complete initialization of the data source right now. */ - result = ma_resource_manager_data_source_init_backend_buffer(pResourceManager, pDataSource); + result = ma_resource_manager_data_source_init_connector(pResourceManager, pDataSource); if (result != MA_SUCCESS) { ma_resource_manager_delete_data_buffer(pResourceManager, pDataBuffer); return result; @@ -2947,8 +2800,8 @@ static ma_result ma_resource_manager_data_source_uninit_buffer(ma_resource_manag MA_ASSERT(pResourceManager != NULL); MA_ASSERT(pDataSource != NULL); - /* We should uninitialize the data source's backend before deleting the data buffer just to keep the order of operations clean. */ - ma_resource_manager_data_source_uninit_backend_buffer(pResourceManager, pDataSource); + /* We should uninitialize the data source's connector before deleting the data buffer just to keep the order of operations clean. */ + ma_resource_manager_data_source_uninit_connector(pResourceManager, pDataSource); pDataSource->dataBuffer.connectorType = ma_resource_manager_data_buffer_connector_unknown; /* The data buffer needs to be deleted. */ @@ -2968,7 +2821,7 @@ MA_API ma_result ma_resource_manager_data_source_uninit(ma_resource_manager* pRe /* We need to run this synchronously because the caller owns the data source and we can't return before it's been fully uninitialized. We do, however, need to do - the actual uninitialization on the resource thread for order-of-operations reasons. + the actual uninitialization on the job thread for order-of-operations reasons. */ /* We need to wait to finish loading before we try uninitializing. */ @@ -3002,8 +2855,8 @@ MA_API ma_result ma_resource_manager_data_source_set_looping(ma_resource_manager } /* - The loop flag is set at the level of the underlying data source. This is require for streams in particular because page loading logic performed on the - async thread will ensure the loop transition is smooth. + The loop flag is set at the level of the underlying data source. This is required for streams in particular because page loading logic performed on the + job thread will ensure the loop transition is smooth. */ if ((pDataSource->flags & MA_DATA_SOURCE_FLAG_STREAM) != 0) { return ma_resource_manager_data_stream_set_looping(pResourceManager, &pDataSource->dataStream.stream, isLooping); @@ -3028,27 +2881,55 @@ MA_API ma_result ma_resource_manager_data_source_get_looping(ma_resource_manager } +MA_API ma_result ma_resource_manager_post_job(ma_resource_manager* pResourceManager, const ma_job* pJob) +{ + if (pResourceManager == NULL) { + return MA_INVALID_ARGS; + } -static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resource_manager* pResourceManager, ma_resource_manager_data_buffer* pDataBuffer, char* pFilePath, ma_event* pEvent) + return ma_job_queue_post(&pResourceManager->jobQueue, pJob); +} + +MA_API ma_result ma_resource_manager_next_job(ma_resource_manager* pResourceManager, ma_job* pJob) +{ + if (pResourceManager == NULL) { + return MA_INVALID_ARGS; + } + + return ma_job_queue_next(&pResourceManager->jobQueue, pJob); +} + + +static ma_result ma_resource_manager_process_job__load_data_buffer(ma_resource_manager* pResourceManager, ma_job* pJob) { ma_result result = MA_SUCCESS; + ma_resource_manager_data_buffer* pDataBuffer; MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pDataBuffer != NULL); - MA_ASSERT(pFilePath != NULL); - MA_ASSERT(pDataBuffer->isDataOwnedByResourceManager == MA_TRUE); /* The data should always be owned by the resource manager. */ + MA_ASSERT(pJob != NULL); + MA_ASSERT(pJob->loadDataBuffer.pDataBuffer != NULL); + MA_ASSERT(pJob->loadDataBuffer.pFilePath != NULL); + MA_ASSERT(pJob->loadDataBuffer.pDataBuffer->isDataOwnedByResourceManager == MA_TRUE); /* The data should always be owned by the resource manager. */ + pDataBuffer = pJob->loadDataBuffer.pDataBuffer; + + /* First thing we need to do is check whether or not the data buffer is getting deleted. If so we just abort. */ if (pDataBuffer->result != MA_BUSY) { result = MA_INVALID_OPERATION; /* The data buffer may be getting deleted before it's even been loaded. */ goto done; } + /* The data buffer is not getting deleted, but we may be getting executed out of order. If so, we need to push the job back onto the queue and return. */ + if (pJob->order != pDataBuffer->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Attempting to execute out of order. Probably interleaved with a MA_JOB_FREE_DATA_BUFFER job. */ + } + if (pDataBuffer->data.type == ma_resource_manager_data_buffer_encoding_encoded) { /* No decoding. Just store the file contents in memory. */ void* pData; size_t sizeInBytes; - result = ma_vfs_open_and_read_file_ex(pResourceManager->config.pVFS, pFilePath, &pData, &sizeInBytes, &pResourceManager->config.allocationCallbacks, MA_ALLOCATION_TYPE_ENCODED_BUFFER); + result = ma_vfs_open_and_read_file_ex(pResourceManager->config.pVFS, pJob->loadDataBuffer.pFilePath, &pData, &sizeInBytes, &pResourceManager->config.allocationCallbacks, MA_ALLOCATION_TYPE_ENCODED_BUFFER); if (result == MA_SUCCESS) { pDataBuffer->data.encoded.pData = pData; pDataBuffer->data.encoded.sizeInBytes = sizeInBytes; @@ -3063,10 +2944,10 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc ma_uint64 dataSizeInFrames; ma_uint64 pageSizeInFrames; ma_uint64 framesRead; /* <-- Keeps track of how many frames we read for the first page. */ - ma_resource_manager_message decodeBufferPageMessage; + ma_job pageDataBufferJob; /* - With the file initialized we now need to initialize the decoder. We need to pass this decoder around on the message queue so we'll need to + With the file initialized we now need to initialize the decoder. We need to pass this decoder around on the job queue so we'll need to allocate memory for this dynamically. */ pDecoder = (ma_decoder*)ma__malloc_from_callbacks(sizeof(*pDecoder), &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_DECODER*/); @@ -3078,7 +2959,7 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc config = ma_decoder_config_init(pResourceManager->config.decodedFormat, 0, pResourceManager->config.decodedSampleRate); /* Need to keep the native channel count because we'll be using that for spatialization. */ config.allocationCallbacks = pResourceManager->config.allocationCallbacks; - result = ma_decoder_init_vfs(pResourceManager->config.pVFS, pFilePath, &config, pDecoder); + result = ma_decoder_init_vfs(pResourceManager->config.pVFS, pJob->loadDataBuffer.pFilePath, &config, pDecoder); /* Make sure we never set the result code to MA_BUSY or else we'll get everything confused. */ if (result == MA_BUSY) { @@ -3126,7 +3007,7 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc goto done; } - /* The buffer needs to be initialized to silence in case the caller reads from it which they may decide to do. */ + /* The buffer needs to be initialized to silence in case the caller reads from it. */ ma_silence_pcm_frames(pData, dataSizeInFrames, pDecoder->outputFormat, pDecoder->outputChannels); @@ -3149,17 +3030,18 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc result = MA_SUCCESS; goto done; } else { - /* We've still got more to decode. We need to post a message to keep decoding the rest. */ - decodeBufferPageMessage = ma_resource_manager_message_init(MA_MESSAGE_DECODE_BUFFER_PAGE); - decodeBufferPageMessage.decodeBufferPage.pDataBuffer = pDataBuffer; - decodeBufferPageMessage.decodeBufferPage.pDecoder = pDecoder; - decodeBufferPageMessage.decodeBufferPage.pCompletedEvent = pEvent; - decodeBufferPageMessage.decodeBufferPage.pData = pData; - decodeBufferPageMessage.decodeBufferPage.dataSizeInBytes = (size_t)dataSizeInBytes; /* Safe cast. Was checked for > MA_SIZE_MAX earlier. */ - decodeBufferPageMessage.decodeBufferPage.decodedFrameCount = framesRead; + /* We've still got more to decode. We need to post a job to continue decoding. */ + pageDataBufferJob = ma_job_init(MA_JOB_PAGE_DATA_BUFFER); + pageDataBufferJob.order = ma_resource_manager_data_buffer_next_execution_order(pDataBuffer); + pageDataBufferJob.pageDataBuffer.pDataBuffer = pDataBuffer; + pageDataBufferJob.pageDataBuffer.pDecoder = pDecoder; + pageDataBufferJob.pageDataBuffer.pCompletedEvent = pJob->loadDataBuffer.pEvent; + pageDataBufferJob.pageDataBuffer.pData = pData; + pageDataBufferJob.pageDataBuffer.dataSizeInBytes = (size_t)dataSizeInBytes; /* Safe cast. Was checked for > MA_SIZE_MAX earlier. */ + pageDataBufferJob.pageDataBuffer.decodedFrameCount = framesRead; if (totalFrameCount > 0) { - decodeBufferPageMessage.decodeBufferPage.isUnknownLength = MA_FALSE; + pageDataBufferJob.pageDataBuffer.isUnknownLength = MA_FALSE; pDataBuffer->data.decoded.pData = pData; pDataBuffer->data.decoded.frameCount = totalFrameCount; @@ -3172,7 +3054,7 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc c89atomic_thread_fence(c89atomic_memory_order_acquire); pDataBuffer->data.decoded.decodedFrameCount = framesRead; } else { - decodeBufferPageMessage.decodeBufferPage.isUnknownLength = MA_TRUE; + pageDataBufferJob.pageDataBuffer.isUnknownLength = MA_TRUE; /* These members are all set after the last page has been decoded. The reason for this is that the application should not be attempting to @@ -3184,8 +3066,8 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc pDataBuffer->data.decoded.decodedFrameCount = 0; } - /* The message has been set up so it can now be posted. */ - result = ma_resource_manager_post_message(pResourceManager, &decodeBufferPageMessage); + /* The job has been set up so it can now be posted. */ + result = ma_resource_manager_post_job(pResourceManager, &pageDataBufferJob); /* The result needs to be set to MA_BUSY to ensure the status of the data buffer is set properly in the next section. */ if (result == MA_SUCCESS) { @@ -3193,12 +3075,12 @@ static ma_result ma_resource_manager_handle_message__load_data_buffer(ma_resourc } /* We want to make sure we don't signal the event here. It needs to be delayed until the last page. */ - pEvent = NULL; + pJob->loadDataBuffer.pEvent = NULL; } } done: - ma__free_from_callbacks(pFilePath, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_TRANSIENT_STRING*/); + ma__free_from_callbacks(pJob->loadDataBuffer.pFilePath, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_TRANSIENT_STRING*/); /* We need to set the result to at the very end to ensure no other threads try reading the data before we've fully initialized the object. Other threads @@ -3210,101 +3092,181 @@ done: c89atomic_compare_and_swap_32(&pDataBuffer->result, MA_BUSY, result); /* Only signal the other threads after the result has been set just for cleanliness sake. */ - if (pEvent != NULL) { - ma_event_signal(pEvent); + if (pJob->loadDataBuffer.pEvent != NULL) { + ma_event_signal(pJob->loadDataBuffer.pEvent); } + c89atomic_fetch_add_32(&pDataBuffer->executionPointer, 1); return result; } -static ma_result ma_resource_manager_handle_message__free_data_buffer(ma_resource_manager* pResourceManager, ma_resource_manager_data_buffer* pDataBuffer) +static ma_result ma_resource_manager_process_job__free_data_buffer(ma_resource_manager* pResourceManager, ma_job* pJob) { - if (pDataBuffer == NULL) { - return MA_INVALID_ARGS; + MA_ASSERT(pResourceManager != NULL); + MA_ASSERT(pJob != NULL); + MA_ASSERT(pJob->freeDataBuffer.pDataBuffer != NULL); + MA_ASSERT(pJob->freeDataBuffer.pDataBuffer->result == MA_UNAVAILABLE); + + if (pJob->order != pJob->freeDataBuffer.pDataBuffer->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ } - MA_ASSERT(pDataBuffer->result == MA_UNAVAILABLE); - - ma_resource_manager_data_buffer_free(pResourceManager, pDataBuffer); + ma_resource_manager_data_buffer_free(pResourceManager, pJob->freeDataBuffer.pDataBuffer); + c89atomic_fetch_add_32(&pJob->freeDataBuffer.pDataBuffer->executionPointer, 1); return MA_SUCCESS; } - -static void ma_resource_manager_data_stream_fill_page(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, ma_uint32 pageIndex) +static ma_result ma_resource_manager_process_job__page_data_buffer(ma_resource_manager* pResourceManager, ma_job* pJob) { - (void)pResourceManager; - + ma_result result = MA_SUCCESS; ma_uint64 pageSizeInFrames; - ma_uint64 totalFramesReadForThisPage = 0; - void* pPageData = ma_resource_manager_data_stream_get_page_data_pointer(pDataStream, pageIndex, 0); - - pageSizeInFrames = ma_resource_manager_data_stream_get_page_size_in_frames(pDataStream); - - if (pDataStream->isLooping) { - while (totalFramesReadForThisPage < pageSizeInFrames) { - ma_uint64 framesRemaining; - ma_uint64 framesRead; - - framesRemaining = pageSizeInFrames - totalFramesReadForThisPage; - framesRead = ma_decoder_read_pcm_frames(&pDataStream->decoder, ma_offset_pcm_frames_ptr(pPageData, totalFramesReadForThisPage, pDataStream->decoder.outputFormat, pDataStream->decoder.outputChannels), framesRemaining); - totalFramesReadForThisPage += framesRead; - - /* Loop back to the start if we reached the end. */ - if (framesRead < framesRemaining) { - ma_decoder_seek_to_pcm_frame(&pDataStream->decoder, 0); - } - } - } else { - totalFramesReadForThisPage = ma_decoder_read_pcm_frames(&pDataStream->decoder, pPageData, pageSizeInFrames); - } - - if (totalFramesReadForThisPage < pageSizeInFrames) { - c89atomic_exchange_32(&pDataStream->isDecoderAtEnd, MA_TRUE); - } - - c89atomic_exchange_32(&pDataStream->pageFrameCount[pageIndex], (ma_uint32)totalFramesReadForThisPage); - c89atomic_exchange_32(&pDataStream->isPageValid[pageIndex], MA_TRUE); -} - -static void ma_resource_manager_data_stream_fill_pages(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream) -{ - ma_uint32 iPage; + ma_uint64 framesRead; + void* pRunningData; + ma_job jobCopy; + ma_resource_manager_data_buffer* pDataBuffer; MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pDataStream != NULL); + MA_ASSERT(pJob != NULL); - /* For each page... */ - for (iPage = 0; iPage < 2; iPage += 1) { - ma_resource_manager_data_stream_fill_page(pResourceManager, pDataStream, iPage); + pDataBuffer = pJob->pageDataBuffer.pDataBuffer; - /* If we reached the end make sure we get out of the loop to prevent us from trying to load the second page. */ - if (pDataStream->isDecoderAtEnd) { - break; + /* Don't do any more decoding if the data buffer has started the uninitialization process. */ + if (pDataBuffer->result != MA_BUSY) { + return MA_INVALID_OPERATION; + } + + if (pJob->order != pDataBuffer->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ + } + + /* We're going to base everything off the original job. */ + jobCopy = *pJob; + + /* We need to know the size of a page in frames to know how many frames to decode. */ + pageSizeInFrames = MA_RESOURCE_MANAGER_PAGE_SIZE_IN_MILLISECONDS * (jobCopy.pageDataBuffer.pDecoder->outputSampleRate/1000); + + /* If the total length is unknown we may need to expand the size of the buffer. */ + if (jobCopy.pageDataBuffer.isUnknownLength == MA_TRUE) { + ma_uint64 requiredSize = (jobCopy.pageDataBuffer.decodedFrameCount + pageSizeInFrames) * ma_get_bytes_per_frame(jobCopy.pageDataBuffer.pDecoder->outputFormat, jobCopy.pageDataBuffer.pDecoder->outputChannels); + if (requiredSize <= MA_SIZE_MAX) { + if (requiredSize > jobCopy.pageDataBuffer.dataSizeInBytes) { + size_t newSize = (size_t)ma_max(requiredSize, jobCopy.pageDataBuffer.dataSizeInBytes * 2); + void *pNewData = ma__realloc_from_callbacks(jobCopy.pageDataBuffer.pData, newSize, jobCopy.pageDataBuffer.dataSizeInBytes, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_DECODED_BUFFER*/); + if (pNewData != NULL) { + jobCopy.pageDataBuffer.pData = pNewData; + jobCopy.pageDataBuffer.dataSizeInBytes = newSize; + } else { + result = MA_OUT_OF_MEMORY; + } + } + } else { + result = MA_TOO_BIG; } } + + /* We should have the memory set up so now we can decode the next page. */ + if (result == MA_SUCCESS) { + pRunningData = ma_offset_ptr(jobCopy.pageDataBuffer.pData, jobCopy.pageDataBuffer.decodedFrameCount * ma_get_bytes_per_frame(jobCopy.pageDataBuffer.pDecoder->outputFormat, jobCopy.pageDataBuffer.pDecoder->outputChannels)); + + framesRead = ma_decoder_read_pcm_frames(jobCopy.pageDataBuffer.pDecoder, pRunningData, pageSizeInFrames); + if (framesRead < pageSizeInFrames) { + result = MA_AT_END; + } + + /* If the total length is known we can increment out decoded frame count. Otherwise it needs to be left at 0 until the last page is decoded. */ + if (jobCopy.pageDataBuffer.isUnknownLength == MA_FALSE) { + pDataBuffer->data.decoded.decodedFrameCount += framesRead; + } + + /* If there's more to decode, post a job to keep decoding. */ + if (result != MA_AT_END) { + jobCopy.pageDataBuffer.decodedFrameCount += framesRead; + jobCopy.order = ma_resource_manager_data_buffer_next_execution_order(pDataBuffer); /* We need a fresh execution order. */ + + result = ma_resource_manager_post_job(pResourceManager, &jobCopy); + } + } + + /* + The result will be MA_SUCCESS if another page is in the queue for decoding. Otherwise it will be set to MA_AT_END if the end has been reached or + any other result code if some other error occurred. If we are not decoding another page we need to free the decoder and close the file. + */ + if (result != MA_SUCCESS) { + ma_decoder_uninit(jobCopy.pageDataBuffer.pDecoder); + ma__free_from_callbacks(jobCopy.pageDataBuffer.pDecoder, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_DECODER*/); + + /* When the length is unknown we were doubling the size of the buffer each time we needed more data. Let's try reducing this by doing a final realloc(). */ + if (jobCopy.pageDataBuffer.isUnknownLength) { + ma_uint64 newSizeInBytes = jobCopy.pageDataBuffer.decodedFrameCount * ma_get_bytes_per_frame(pDataBuffer->data.decoded.format, pDataBuffer->data.decoded.channels); + void* pNewData = ma__realloc_from_callbacks(jobCopy.pageDataBuffer.pData, (size_t)newSizeInBytes, jobCopy.pageDataBuffer.dataSizeInBytes, &pResourceManager->config.allocationCallbacks); + if (pNewData != NULL) { + jobCopy.pageDataBuffer.pData = pNewData; + jobCopy.pageDataBuffer.dataSizeInBytes = (size_t)newSizeInBytes; /* <-- Don't really need to set this, but I think it's good practice. */ + } + } + + /* + We can now set the frame counts appropriately. We want to set the frame count regardless of whether or not it had a known length just in case we have + a weird situation where the frame count an opening time was different to the final count we got after reading. + */ + pDataBuffer->data.decoded.pData = jobCopy.pageDataBuffer.pData; + pDataBuffer->data.decoded.frameCount = jobCopy.pageDataBuffer.decodedFrameCount; + + /* + decodedFrameCount is what other threads will use to determine whether or not data is available. We must ensure pData and frameCount + is set *before* setting the number of available frames. This way, the other thread need only check if decodedFrameCount > 0, in + which case it can assume pData and frameCount are valid. + */ + c89atomic_thread_fence(c89atomic_memory_order_seq_cst); + pDataBuffer->data.decoded.decodedFrameCount = jobCopy.pageDataBuffer.decodedFrameCount; + + + /* If we reached the end we need to treat it as successful. */ + if (result == MA_AT_END) { + result = MA_SUCCESS; + } + + /* We need to set the status of the page so other things can know about it. We can only change the status away from MA_BUSY. If it's anything else it cannot be changed. */ + c89atomic_compare_and_swap_32(&pDataBuffer->result, MA_BUSY, result); + + /* We need to signal an event to indicate that we're done. */ + if (jobCopy.pageDataBuffer.pCompletedEvent != NULL) { + ma_event_signal(jobCopy.pageDataBuffer.pCompletedEvent); + } + } + + c89atomic_fetch_add_32(&pDataBuffer->executionPointer, 1); + return result; } -static ma_result ma_resource_manager_handle_message__load_data_stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, char* pFilePath, ma_event* pEvent) + +static ma_result ma_resource_manager_process_job__load_data_stream(ma_resource_manager* pResourceManager, ma_job* pJob) { ma_result result = MA_SUCCESS; ma_decoder_config decoderConfig; ma_uint32 pageBufferSizeInBytes; + ma_resource_manager_data_stream* pDataStream; MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pDataStream != NULL); - MA_ASSERT(pFilePath != NULL); + MA_ASSERT(pJob != NULL); + + pDataStream = pJob->loadDataStream.pDataStream; if (pDataStream->result != MA_BUSY) { result = MA_INVALID_OPERATION; /* Most likely the data stream is being uninitialized. */ goto done; } + if (pJob->order != pDataStream->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ + } + /* We need to initialize the decoder first so we can determine the size of the pages. */ decoderConfig = ma_decoder_config_init(pResourceManager->config.decodedFormat, 0, pResourceManager->config.decodedSampleRate); /* Need to keep the native channel count because we'll be using that for spatialization. */ decoderConfig.allocationCallbacks = pResourceManager->config.allocationCallbacks; - result = ma_decoder_init_vfs(pResourceManager->config.pVFS, pFilePath, &decoderConfig, &pDataStream->decoder); + result = ma_decoder_init_vfs(pResourceManager->config.pVFS, pJob->loadDataStream.pFilePath, &decoderConfig, &pDataStream->decoder); if (result != MA_SUCCESS) { goto done; } @@ -3328,27 +3290,37 @@ static ma_result ma_resource_manager_handle_message__load_data_stream(ma_resourc result = MA_SUCCESS; done: - ma__free_from_callbacks(pFilePath, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_TRANSIENT_STRING*/); + ma__free_from_callbacks(pJob->loadDataStream.pFilePath, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_TRANSIENT_STRING*/); /* We can only change the status away from MA_BUSY. If it's set to anything else it means an error has occurred somewhere or the uninitialization process has started (most likely). */ c89atomic_compare_and_swap_32(&pDataStream->result, MA_BUSY, result); /* Only signal the other threads after the result has been set just for cleanliness sake. */ - if (pEvent != NULL) { - ma_event_signal(pEvent); + if (pJob->loadDataStream.pEvent != NULL) { + ma_event_signal(pJob->loadDataStream.pEvent); } + c89atomic_fetch_add_32(&pDataStream->executionPointer, 1); return result; } -static ma_result ma_resource_manager_handle_message__free_data_stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, ma_event* pEvent) +static ma_result ma_resource_manager_process_job__free_data_stream(ma_resource_manager* pResourceManager, ma_job* pJob) { + ma_resource_manager_data_stream* pDataStream; + MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pDataStream != NULL); + MA_ASSERT(pJob != NULL); + + pDataStream = pJob->freeDataStream.pDataStream; + MA_ASSERT(pDataStream != NULL); /* If our status is not MA_UNAVAILABLE we have a bug somewhere. */ MA_ASSERT(pDataStream->result == MA_UNAVAILABLE); + if (pJob->order != pDataStream->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ + } + if (pDataStream->isDecoderInitialized) { ma_decoder_uninit(&pDataStream->decoder); } @@ -3359,15 +3331,82 @@ static ma_result ma_resource_manager_handle_message__free_data_stream(ma_resourc } /* The event needs to be signalled last. */ - if (pEvent != NULL) { - ma_event_signal(pEvent); + if (pJob->freeDataStream.pEvent != NULL) { + ma_event_signal(pJob->freeDataStream.pEvent); } + c89atomic_fetch_add_32(&pDataStream->executionPointer, 1); return MA_SUCCESS; } +static ma_result ma_resource_manager_process_job__page_data_stream(ma_resource_manager* pResourceManager, ma_job* pJob) +{ + ma_result result = MA_SUCCESS; + ma_resource_manager_data_stream* pDataStream; -static ma_result ma_resource_manager_handle_message__load_data_source__buffer(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource, ma_event* pEvent) + MA_ASSERT(pResourceManager != NULL); + MA_ASSERT(pJob != NULL); + + pDataStream = pJob->pageDataStream.pDataStream; + MA_ASSERT(pDataStream != NULL); + + /* For streams, the status should be MA_SUCCESS. */ + if (pDataStream->result != MA_SUCCESS) { + result = MA_INVALID_OPERATION; + goto done; + } + + if (pJob->order != pDataStream->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ + } + + ma_resource_manager_data_stream_fill_page(pResourceManager, pDataStream, pJob->pageDataStream.pageIndex); + +done: + c89atomic_fetch_add_32(&pDataStream->executionPointer, 1); + return result; +} + +static ma_result ma_resource_manager_process_job__seek_data_stream(ma_resource_manager* pResourceManager, ma_job* pJob) +{ + ma_result result = MA_SUCCESS; + ma_resource_manager_data_stream* pDataStream; + + MA_ASSERT(pResourceManager != NULL); + MA_ASSERT(pJob != NULL); + + pDataStream = pJob->seekDataStream.pDataStream; + MA_ASSERT(pDataStream != NULL); + + /* For streams the status should be MA_SUCCESS for this to do anything. */ + if (pDataStream->result != MA_SUCCESS || pDataStream->isDecoderInitialized == MA_FALSE) { + result = MA_INVALID_OPERATION; + goto done; + } + + if (pJob->order != pDataStream->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ + } + + /* + With seeking we just assume both pages are invalid and the relative frame cursor at at position 0. This is basically exactly the same as loading, except + instead of initializing the decoder, we seek to a frame. + */ + ma_decoder_seek_to_pcm_frame(&pDataStream->decoder, pJob->seekDataStream.frameIndex); + + /* After seeking we'll need to reload the pages. */ + ma_resource_manager_data_stream_fill_pages(pResourceManager, pDataStream); + + /* We need to let the public API know that we're done seeking. */ + c89atomic_fetch_sub_32(&pDataStream->seekCounter, 1); + +done: + c89atomic_fetch_add_32(&pDataStream->executionPointer, 1); + return result; +} + + +static ma_result ma_resource_manager_process_job__load_data_source__buffer(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource, ma_event* pEvent) { ma_result dataBufferResult; @@ -3384,7 +3423,7 @@ static ma_result ma_resource_manager_handle_message__load_data_source__buffer(ma return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, MA_NO_DATA_AVAILABLE, pEvent); } - return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, ma_resource_manager_data_source_init_backend_buffer(pResourceManager, pDataSource), pEvent); + return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, ma_resource_manager_data_source_init_connector(pResourceManager, pDataSource), pEvent); } else { /* We can initialize the data source if there is a non-zero decoded frame count. If the sound is being loaded synchronously or there are no frames available we need to re-insert @@ -3415,18 +3454,19 @@ static ma_result ma_resource_manager_handle_message__load_data_source__buffer(ma } if (canInitialize) { - return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, ma_resource_manager_data_source_init_backend_buffer(pResourceManager, pDataSource), pEvent); + return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, ma_resource_manager_data_source_init_connector(pResourceManager, pDataSource), pEvent); } else { - /* We can't initialize just yet so we need to just post the message again. */ - ma_resource_manager_message message = ma_resource_manager_message_init(MA_MESSAGE_LOAD_DATA_SOURCE); - message.loadDataSource.pDataSource = pDataSource; - message.loadDataSource.pEvent = pEvent; - return ma_resource_manager_post_message(pResourceManager, &message); + /* We can't initialize just yet so we need to just post the job again. */ + ma_job job = ma_job_init(MA_JOB_LOAD_DATA_SOURCE); + job.order = ma_resource_manager_data_source_next_execution_order(pDataSource); + job.loadDataSource.pDataSource = pDataSource; + job.loadDataSource.pEvent = pEvent; + return ma_resource_manager_post_job(pResourceManager, &job); } } } -static ma_result ma_resource_manager_handle_message__load_data_source__stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource, ma_event* pEvent) +static ma_result ma_resource_manager_process_job__load_data_source__stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource, ma_event* pEvent) { ma_result dataStreamResult; @@ -3436,281 +3476,90 @@ static ma_result ma_resource_manager_handle_message__load_data_source__stream(ma return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, dataStreamResult, pEvent); } - /* We don't need to do anything other than set the result. The decoder will have been initialized from the MA_MESSAGE_LOAD_DATA_STREAM message. */ + /* We don't need to do anything other than set the result. The decoder will have been initialized from the MA_JOB_LOAD_DATA_STREAM job. */ return ma_resource_manager_data_source_set_result_and_signal(pResourceManager, pDataSource, MA_SUCCESS, pEvent); } -static ma_result ma_resource_manager_handle_message__load_data_source(ma_resource_manager* pResourceManager, ma_resource_manager_data_source* pDataSource, ma_event* pEvent) -{ - MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pDataSource != NULL); - MA_ASSERT(pDataSource->result == MA_BUSY || pDataSource->result == MA_UNAVAILABLE); - if (pDataSource->result == MA_UNAVAILABLE) { - /* - The data source is getting deleted before it's even been loaded. We want to continue loading in this case because in the queue we'll have a - corresponding MA_MESSAGE_FREE_DATA_SOURCE which will be doing the opposite. By letting it continue we can simplify the implementation because - otherwise we'd need to keep track of a separate bit of state to track whether or not the backend has been initialized or not. - */ +static ma_result ma_resource_manager_process_job__load_data_source(ma_resource_manager* pResourceManager, ma_job* pJob) +{ + ma_result result = MA_SUCCESS; + ma_resource_manager_data_source* pDataSource; + + MA_ASSERT(pResourceManager != NULL); + MA_ASSERT(pJob != NULL); + + pDataSource = pJob->loadDataSource.pDataSource; + MA_ASSERT(pDataSource != NULL); + MA_ASSERT(pDataSource->result == MA_BUSY); + MA_ASSERT(pDataSource->result != MA_UNAVAILABLE); /* We should never be getting here when the status is set to unavailable because ma_resource_manager_data_source_uninit() will wait for loading to complete before doing anything. */ + + if (pJob->order != pDataSource->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ } if ((pDataSource->flags & MA_DATA_SOURCE_FLAG_STREAM) != 0) { - return ma_resource_manager_handle_message__load_data_source__stream(pResourceManager, pDataSource, pEvent); + result = ma_resource_manager_process_job__load_data_source__stream(pResourceManager, pDataSource, pJob->loadDataSource.pEvent); } else { - return ma_resource_manager_handle_message__load_data_source__buffer(pResourceManager, pDataSource, pEvent); - } -} - -static ma_result ma_resource_manager_handle_message__decode_buffer_page(ma_resource_manager* pResourceManager, const ma_resource_manager_message* pMessage) -{ - ma_result result = MA_SUCCESS; - ma_uint64 pageSizeInFrames; - ma_uint64 framesRead; - void* pRunningData; - ma_resource_manager_message messageCopy; - - MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pMessage != NULL); - - /* Don't do any more decoding if the data buffer has started the uninitialization process. */ - if (pMessage->decodeBufferPage.pDataBuffer->result != MA_BUSY) { - return MA_INVALID_OPERATION; - } - - /* We're going to base everything off the original message. */ - messageCopy = *pMessage; - - /* We need to know the size of a page in frames to know how many frames to decode. */ - pageSizeInFrames = MA_RESOURCE_MANAGER_PAGE_SIZE_IN_MILLISECONDS * (messageCopy.decodeBufferPage.pDecoder->outputSampleRate/1000); - - /* If the total length is unknown we may need to expand the size of the buffer. */ - if (messageCopy.decodeBufferPage.isUnknownLength == MA_TRUE) { - ma_uint64 requiredSize = (messageCopy.decodeBufferPage.decodedFrameCount + pageSizeInFrames) * ma_get_bytes_per_frame(messageCopy.decodeBufferPage.pDecoder->outputFormat, messageCopy.decodeBufferPage.pDecoder->outputChannels); - if (requiredSize <= MA_SIZE_MAX) { - if (requiredSize > messageCopy.decodeBufferPage.dataSizeInBytes) { - size_t newSize = (size_t)ma_max(requiredSize, messageCopy.decodeBufferPage.dataSizeInBytes * 2); - void *pNewData = ma__realloc_from_callbacks(messageCopy.decodeBufferPage.pData, newSize, messageCopy.decodeBufferPage.dataSizeInBytes, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_DECODED_BUFFER*/); - if (pNewData != NULL) { - messageCopy.decodeBufferPage.pData = pNewData; - messageCopy.decodeBufferPage.dataSizeInBytes = newSize; - } else { - result = MA_OUT_OF_MEMORY; - } - } - } else { - result = MA_TOO_BIG; - } - } - - /* We should have the memory set up so now we can decode the next page. */ - if (result == MA_SUCCESS) { - pRunningData = ma_offset_ptr(messageCopy.decodeBufferPage.pData, messageCopy.decodeBufferPage.decodedFrameCount * ma_get_bytes_per_frame(messageCopy.decodeBufferPage.pDecoder->outputFormat, messageCopy.decodeBufferPage.pDecoder->outputChannels)); - - framesRead = ma_decoder_read_pcm_frames(messageCopy.decodeBufferPage.pDecoder, pRunningData, pageSizeInFrames); - if (framesRead < pageSizeInFrames) { - result = MA_AT_END; - } - - /* If the total length is known we can increment out decoded frame count. Otherwise it needs to be left at 0 until the last page is decoded. */ - if (messageCopy.decodeBufferPage.isUnknownLength == MA_FALSE) { - messageCopy.decodeBufferPage.pDataBuffer->data.decoded.decodedFrameCount += framesRead; - } - - /* If there's more to decode, post a message to keep decoding. */ - if (result != MA_AT_END) { - messageCopy.decodeBufferPage.decodedFrameCount += framesRead; - - result = ma_resource_manager_post_message(pResourceManager, &messageCopy); - } - } - - /* - The result will be MA_SUCCESS if another page is in the queue for decoding. Otherwise it will be set to MA_AT_END if the end has been reached or - any other result code if some other error occurred. If we are not decoding another page we need to free the decoder and close the file. - */ - if (result != MA_SUCCESS) { - ma_decoder_uninit(messageCopy.decodeBufferPage.pDecoder); - ma__free_from_callbacks(messageCopy.decodeBufferPage.pDecoder, &pResourceManager->config.allocationCallbacks/*, MA_ALLOCATION_TYPE_DECODER*/); - - /* When the length is unknown we were doubling the size of the buffer each time we needed more data. Let's try reducing this by doing a final realloc(). */ - if (messageCopy.decodeBufferPage.isUnknownLength) { - ma_uint64 newSizeInBytes = messageCopy.decodeBufferPage.decodedFrameCount * ma_get_bytes_per_frame(messageCopy.decodeBufferPage.pDataBuffer->data.decoded.format, messageCopy.decodeBufferPage.pDataBuffer->data.decoded.channels); - void* pNewData = ma__realloc_from_callbacks(messageCopy.decodeBufferPage.pData, (size_t)newSizeInBytes, messageCopy.decodeBufferPage.dataSizeInBytes, &pResourceManager->config.allocationCallbacks); - if (pNewData != NULL) { - messageCopy.decodeBufferPage.pData = pNewData; - messageCopy.decodeBufferPage.dataSizeInBytes = (size_t)newSizeInBytes; /* <-- Don't really need to set this, but I think it's good practice. */ - } - } - - /* - We can now set the frame counts appropriately. We want to set the frame count regardless of whether or not it had a known length just in case we have - a weird situation where the frame count an opening time was different to the final count we got after reading. - */ - messageCopy.decodeBufferPage.pDataBuffer->data.decoded.pData = messageCopy.decodeBufferPage.pData; - messageCopy.decodeBufferPage.pDataBuffer->data.decoded.frameCount = messageCopy.decodeBufferPage.decodedFrameCount; - - /* - decodedFrameCount is what other threads will use to determine whether or not data is available. We must ensure pData and frameCount - is set *before* setting the number of available frames. This way, the other thread need only check if decodedFrameCount > 0, in - which case it can assume pData and frameCount are valid. - */ - c89atomic_thread_fence(c89atomic_memory_order_seq_cst); - messageCopy.decodeBufferPage.pDataBuffer->data.decoded.decodedFrameCount = messageCopy.decodeBufferPage.decodedFrameCount; - - - /* If we reached the end we need to treat it as successful. */ - if (result == MA_AT_END) { - result = MA_SUCCESS; - } - - /* We need to set the status of the page so other things can know about it. We can only change the status away from MA_BUSY. If it's anything else it cannot be changed. */ - c89atomic_compare_and_swap_32(&messageCopy.decodeBufferPage.pDataBuffer->result, MA_BUSY, result); - - /* We need to signal an event to indicate that we're done. */ - if (messageCopy.decodeBufferPage.pCompletedEvent != NULL) { - ma_event_signal(messageCopy.decodeBufferPage.pCompletedEvent); - } + result = ma_resource_manager_process_job__load_data_source__buffer(pResourceManager, pDataSource, pJob->loadDataSource.pEvent); } + c89atomic_fetch_add_32(&pDataSource->executionPointer, 1); return result; } -static ma_result ma_resource_manager_handle_message__decode_stream_page(ma_resource_manager* pResourceManager, const ma_resource_manager_message* pMessage) +static ma_result ma_resource_manager_process_job__free_data_source(ma_resource_manager* pResourceManager, ma_job* pJob) { - ma_resource_manager_data_stream* pDataStream; + ma_resource_manager_data_source* pDataSource; MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pMessage != NULL); + MA_ASSERT(pJob != NULL); - pDataStream = pMessage->decodeStreamPage.pDataStream; - MA_ASSERT(pDataStream != NULL); + pDataSource = pJob->loadDataSource.pDataSource; + MA_ASSERT(pDataSource != NULL); + MA_ASSERT(pDataSource->result == MA_UNAVAILABLE); /* The status of a data source should be set to unavailable before this event is fired. */ - /* For streams, the status should be MA_SUCCESS. */ - if (pMessage->decodeStreamPage.pDataStream->result != MA_SUCCESS) { - return MA_INVALID_OPERATION; + if (pJob->order != pDataSource->executionPointer) { + return ma_resource_manager_post_job(pResourceManager, pJob); /* Out of order. */ } - ma_resource_manager_data_stream_fill_page(pResourceManager, pMessage->decodeStreamPage.pDataStream, pMessage->decodeStreamPage.pageIndex); - - return MA_SUCCESS; -} - -static ma_result ma_resource_manager_handle_message__seek_data_stream(ma_resource_manager* pResourceManager, ma_resource_manager_data_stream* pDataStream, ma_uint64 frameIndex) -{ - MA_ASSERT(pResourceManager != NULL); - MA_ASSERT(pDataStream != NULL); - - /* For streams the status should be MA_SUCCESS for this to do anything. */ - if (pDataStream->result != MA_SUCCESS || pDataStream->isDecoderInitialized == MA_FALSE) { - return MA_INVALID_OPERATION; - } - - /* - With seeking we just assume both pages are invalid and the relative frame cursor at at position 0. This is basically exactly the same as loading, except - instead of initializing the decoder, we seek to a frame. - */ - ma_decoder_seek_to_pcm_frame(&pDataStream->decoder, frameIndex); - - /* After seeking we'll need to reload the pages. */ - ma_resource_manager_data_stream_fill_pages(pResourceManager, pDataStream); - - /* We need to let the public API know that we're done seeking. */ - c89atomic_fetch_sub_32(&pDataStream->seekCounter, 1); + /* Not doing anything at the moment. This event is not currently used. Everything here is just template code in preparation for something to perhaps go here later if need be. */ + c89atomic_fetch_add_32(&pDataSource->executionPointer, 1); return MA_SUCCESS; } -MA_API ma_result ma_resource_manager_handle_message(ma_resource_manager* pResourceManager, const ma_resource_manager_message* pMessage) +MA_API ma_result ma_resource_manager_process_job(ma_resource_manager* pResourceManager, ma_job* pJob) { - if (pResourceManager == NULL || pMessage == NULL) { + if (pResourceManager == NULL || pJob == NULL) { return MA_INVALID_ARGS; } - switch (pMessage->code) + switch (pJob->toc.code) { - case MA_MESSAGE_LOAD_DATA_BUFFER: - { - return ma_resource_manager_handle_message__load_data_buffer(pResourceManager, pMessage->loadDataBuffer.pDataBuffer, pMessage->loadDataBuffer.pFilePath, pMessage->loadDataBuffer.pEvent); - } break; + /* Data Buffer */ + case MA_JOB_LOAD_DATA_BUFFER: return ma_resource_manager_process_job__load_data_buffer(pResourceManager, pJob); + case MA_JOB_FREE_DATA_BUFFER: return ma_resource_manager_process_job__free_data_buffer(pResourceManager, pJob); + case MA_JOB_PAGE_DATA_BUFFER: return ma_resource_manager_process_job__page_data_buffer(pResourceManager, pJob); - case MA_MESSAGE_FREE_DATA_BUFFER: - { - return ma_resource_manager_handle_message__free_data_buffer(pResourceManager, pMessage->freeDataBuffer.pDataBuffer); - } break; + /* Data Stream */ + case MA_JOB_LOAD_DATA_STREAM: return ma_resource_manager_process_job__load_data_stream(pResourceManager, pJob); + case MA_JOB_FREE_DATA_STREAM: return ma_resource_manager_process_job__free_data_stream(pResourceManager, pJob); + case MA_JOB_PAGE_DATA_STREAM: return ma_resource_manager_process_job__page_data_stream(pResourceManager, pJob); + case MA_JOB_SEEK_DATA_STREAM: return ma_resource_manager_process_job__seek_data_stream(pResourceManager, pJob); - - case MA_MESSAGE_LOAD_DATA_STREAM: - { - return ma_resource_manager_handle_message__load_data_stream(pResourceManager, pMessage->loadDataStream.pDataStream, pMessage->loadDataStream.pFilePath, pMessage->loadDataStream.pEvent); - } break; - - case MA_MESSAGE_FREE_DATA_STREAM: - { - return ma_resource_manager_handle_message__free_data_stream(pResourceManager, pMessage->freeDataStream.pDataStream, pMessage->freeDataStream.pEvent); - } break; - - - case MA_MESSAGE_LOAD_DATA_SOURCE: - { - return ma_resource_manager_handle_message__load_data_source(pResourceManager, pMessage->loadDataSource.pDataSource, pMessage->loadDataSource.pEvent); - } break; - - #if 0 - case MA_MESSAGE_FREE_DATA_SOURCE: - { - return ma_resource_manager_handle_message__free_data_source(pResourceManager, pMessage->freeDataSource.pDataSource); - } break; - #endif - - - case MA_MESSAGE_DECODE_BUFFER_PAGE: - { - return ma_resource_manager_handle_message__decode_buffer_page(pResourceManager, pMessage); - } break; - - case MA_MESSAGE_DECODE_STREAM_PAGE: - { - return ma_resource_manager_handle_message__decode_stream_page(pResourceManager, pMessage); - } break; - - case MA_MESSAGE_SEEK_DATA_STREAM: - { - return ma_resource_manager_handle_message__seek_data_stream(pResourceManager, pMessage->seekDataStream.pDataStream, pMessage->seekDataStream.frameIndex); - } break; + /* Data Source */ + case MA_JOB_LOAD_DATA_SOURCE: return ma_resource_manager_process_job__load_data_source(pResourceManager, pJob); + case MA_JOB_FREE_DATA_SOURCE: return ma_resource_manager_process_job__free_data_source(pResourceManager, pJob); /* No used at the moment. Data sources are freed by the caller directly in ma_resource_manager_data_source_uninit(). */ default: break; } - return MA_SUCCESS; + /* Getting here means we don't know what the job code is and cannot do anything with it. */ + return MA_INVALID_OPERATION; } -MA_API ma_result ma_resource_manager_post_message(ma_resource_manager* pResourceManager, const ma_resource_manager_message* pMessage) -{ - if (pResourceManager == NULL) { - return MA_INVALID_ARGS; - } - - return ma_resource_manager_message_queue_post(&pResourceManager->messageQueue, pMessage); -} - -MA_API ma_result ma_resource_manager_next_message(ma_resource_manager* pResourceManager, ma_resource_manager_message* pMessage) -{ - if (pResourceManager == NULL) { - return MA_INVALID_ARGS; - } - - return ma_resource_manager_message_queue_next(&pResourceManager->messageQueue, pMessage); -} - -MA_API ma_result ma_resource_manager_peek_message(ma_resource_manager* pResourceManager, ma_resource_manager_message* pMessage) -{ - if (pResourceManager == NULL) { - return MA_INVALID_ARGS; - } - - return ma_resource_manager_message_queue_peek(&pResourceManager->messageQueue, pMessage); -}