23 : mServiceConfig { serviceConfig }
29 std::vector<BlockUploadTask> uploadTasks,
33 std::make_shared<MissingBlocksUploader>(
Tag {}, serviceConfig);
35 if (!cancellationContex)
38 cancellationContex->OnCancelled(uploader);
41 std::move(cancellationContex), std::move(uploadTasks),
42 std::move(progressCallback));
54 std::vector<BlockUploadTask> uploadTasks,
108 if (!
mIsRunning.load(std::memory_order_relaxed))
117 [
this, task = item.
Task,
120 auto lock = weakThis.lock();
144 if (!
mIsRunning.load(std::memory_order_relaxed))
164 if (!
mIsRunning.load(std::memory_order_relaxed))
172 return std::move(item);
218 while (
mIsRunning.load(std::memory_order_consume))
234 if (compressedData.empty())
244 progressData, task.
Block,
245 { SyncResultCode::InternalClientError, {} });
250 ProducedItem { std::move(task), std::move(compressedData) });
255void MissingBlocksUploader::ConsumerThread()
257 while (mIsRunning.load(std::memory_order_consume))
259 auto item = PopBlockFromQueue();
260 ConsumeBlock(std::move(item));
Configuration for the audio.com.
static DataUploader & Get()
void Upload(CancellationContextPtr cancellationContex, const ServiceConfig &config, const UploadUrls &target, std::vector< uint8_t > data, std::function< void(ResponseResult)> callback, std::function< void(double)> progressCallback={})
size_t mConcurrentUploads
ProducedItem PopBlockFromQueue()
MissingBlocksUploadProgressCallback mProgressCallback
void HandleFailedBlock(const ResponseResult &result, BlockUploadTask task)
std::condition_variable mRingBufferNotFull
std::array< ProducedItem, RING_BUFFER_SIZE > mRingBuffer
std::atomic_bool mIsRunning
size_t mFirstUnprocessedBlockIndex
std::vector< BlockUploadTask > mUploadTasks
MissingBlocksUploader(Tag, const ServiceConfig &serviceConfig)
std::mutex mProgressDataMutex
size_t mRingBufferReadIndex
MissingBlocksUploadProgress mProgressData
ProducedItem ProduceBlock()
std::thread mConsumerThread
size_t mRingBufferWriteIndex
std::condition_variable mRingBufferNotEmpty
void PushBlockToQueue(ProducedItem item)
static std::shared_ptr< MissingBlocksUploader > Create(CancellationContextPtr cancellationContex, const ServiceConfig &serviceConfig, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
void Start(CancellationContextPtr cancellationContex, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
static constexpr auto RING_BUFFER_SIZE
const ServiceConfig & mServiceConfig
std::mutex mRingBufferMutex
void ConfirmBlock(BlockUploadTask task)
std::thread mProducerThread[NUM_PRODUCERS]
CancellationContextPtr mCancellationContext
static constexpr auto NUM_UPLOADERS
std::condition_variable mUploadsNotFull
void ConsumeBlock(ProducedItem item)
static CancellationContextPtr Create()
std::function< void(const MissingBlocksUploadProgress &, const LockedBlock &, ResponseResult blockResponseResult)> MissingBlocksUploadProgressCallback
std::vector< uint8_t > CompressBlock(const LockedBlock &block)
std::shared_ptr< CancellationContext > CancellationContextPtr
std::vector< ResponseResult > UploadErrors
std::vector< uint8_t > CompressedData