Audacity 3.2.0
Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes | List of all members
audacity::cloud::audiocom::sync::MissingBlocksUploader Class Referencefinal

#include <MissingBlocksUploader.h>

Inheritance diagram for audacity::cloud::audiocom::sync::MissingBlocksUploader:
[legend]
Collaboration diagram for audacity::cloud::audiocom::sync::MissingBlocksUploader:
[legend]

Classes

struct  ProducedItem
 
struct  Tag
 

Public Member Functions

 MissingBlocksUploader (Tag, const ServiceConfig &serviceConfig)
 
 ~MissingBlocksUploader ()
 
- Public Member Functions inherited from audacity::concurrency::ICancellable
virtual ~ICancellable ()=default
 
virtual void Cancel ()=0
 

Static Public Member Functions

static std::shared_ptr< MissingBlocksUploaderCreate (CancellationContextPtr cancellationContex, const ServiceConfig &serviceConfig, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
 

Static Public Attributes

static constexpr auto NUM_PRODUCERS = 3
 
static constexpr auto NUM_UPLOADERS = 6
 
static constexpr auto RING_BUFFER_SIZE = 16
 

Private Member Functions

void Start (CancellationContextPtr cancellationContex, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
 
void Cancel () override
 
ProducedItem ProduceBlock ()
 
void ConsumeBlock (ProducedItem item)
 
void PushBlockToQueue (ProducedItem item)
 
ProducedItem PopBlockFromQueue ()
 
void ConfirmBlock (BlockUploadTask task)
 
void HandleFailedBlock (const ResponseResult &result, BlockUploadTask task)
 
void ProducerThread ()
 
void ConsumerThread ()
 

Private Attributes

const ServiceConfigmServiceConfig
 
std::vector< BlockUploadTaskmUploadTasks
 
MissingBlocksUploadProgressCallback mProgressCallback
 
std::atomic_bool mIsRunning { true }
 
std::thread mProducerThread [NUM_PRODUCERS]
 
std::thread mConsumerThread
 
std::mutex mBlocksMutex
 
size_t mFirstUnprocessedBlockIndex { 0 }
 
std::mutex mUploadsMutex
 
std::condition_variable mUploadsNotFull
 
size_t mConcurrentUploads { 0 }
 
std::mutex mRingBufferMutex
 
std::condition_variable mRingBufferNotEmpty
 
std::condition_variable mRingBufferNotFull
 
std::array< ProducedItem, RING_BUFFER_SIZEmRingBuffer
 
size_t mRingBufferWriteIndex { 0 }
 
size_t mRingBufferReadIndex { 0 }
 
std::mutex mProgressDataMutex
 
MissingBlocksUploadProgress mProgressData
 
CancellationContextPtr mCancellationContext
 

Detailed Description

Definition at line 63 of file MissingBlocksUploader.h.

Constructor & Destructor Documentation

◆ MissingBlocksUploader()

audacity::cloud::audiocom::sync::MissingBlocksUploader::MissingBlocksUploader ( Tag  ,
const ServiceConfig serviceConfig 
)

Definition at line 21 of file MissingBlocksUploader.cpp.

23 : mServiceConfig { serviceConfig }
24{
25}

◆ ~MissingBlocksUploader()

audacity::cloud::audiocom::sync::MissingBlocksUploader::~MissingBlocksUploader ( )

Definition at line 47 of file MissingBlocksUploader.cpp.

References Cancel().

Here is the call graph for this function:

Member Function Documentation

◆ Cancel()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::Cancel ( )
overrideprivatevirtual

Implements audacity::concurrency::ICancellable.

Definition at line 71 of file MissingBlocksUploader.cpp.

72{
73 if (!mIsRunning.exchange(false))
74 return;
75
76 mRingBufferNotEmpty.notify_all();
77 mRingBufferNotFull.notify_all();
78 mUploadsNotFull.notify_all();
79
80 for (auto& thread : mProducerThread)
81 thread.join();
82
83 mConsumerThread.join();
84
85 std::lock_guard lock(mProgressDataMutex);
86}

References mConsumerThread, mIsRunning, mProducerThread, mProgressDataMutex, mRingBufferNotEmpty, mRingBufferNotFull, and mUploadsNotFull.

Referenced by ~MissingBlocksUploader().

Here is the caller graph for this function:

◆ ConfirmBlock()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::ConfirmBlock ( BlockUploadTask  task)
private

Definition at line 175 of file MissingBlocksUploader.cpp.

176{
177 MissingBlocksUploadProgress progressData;
178 {
179 std::lock_guard<std::mutex> lock(mProgressDataMutex);
181 progressData = mProgressData;
182 }
183
184 mProgressCallback(progressData, item.Block, {});
185
186 {
187 std::lock_guard<std::mutex> lock(mUploadsMutex);
189 mUploadsNotFull.notify_one();
190 }
191}

References audacity::cloud::audiocom::sync::BlockUploadTask::Block, mConcurrentUploads, mProgressCallback, mProgressData, mProgressDataMutex, mUploadsMutex, mUploadsNotFull, and audacity::cloud::audiocom::sync::MissingBlocksUploadProgress::UploadedBlocks.

Referenced by ConsumeBlock().

Here is the caller graph for this function:

◆ ConsumeBlock()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::ConsumeBlock ( ProducedItem  item)
private

Definition at line 96 of file MissingBlocksUploader.cpp.

97{
98 {
99 std::unique_lock<std::mutex> lock(mUploadsMutex);
100 mUploadsNotFull.wait(
101 lock,
102 [this]
103 {
105 !mIsRunning.load(std::memory_order_consume);
106 });
107
108 if (!mIsRunning.load(std::memory_order_relaxed))
109 return;
110
112 }
113
115 mCancellationContext, mServiceConfig, item.Task.BlockUrls,
116 std::move(item.CompressedData),
117 [this, task = item.Task,
118 weakThis = weak_from_this()](ResponseResult result)
119 {
120 auto lock = weakThis.lock();
121
122 if (!lock)
123 return;
124
125 if (result.Code != SyncResultCode::Success)
126 HandleFailedBlock(result, task);
127 else
128 ConfirmBlock(task);
129 });
130}
void Upload(CancellationContextPtr cancellationContex, const ServiceConfig &config, const UploadUrls &target, std::vector< uint8_t > data, std::function< void(ResponseResult)> callback, std::function< void(double)> progressCallback={})
void HandleFailedBlock(const ResponseResult &result, BlockUploadTask task)

References audacity::cloud::audiocom::sync::BlockUploadTask::BlockUrls, audacity::cloud::audiocom::ResponseResult::Code, audacity::cloud::audiocom::sync::MissingBlocksUploader::ProducedItem::CompressedData, ConfirmBlock(), audacity::cloud::audiocom::sync::DataUploader::Get(), HandleFailedBlock(), mCancellationContext, mConcurrentUploads, mIsRunning, mServiceConfig, mUploadsMutex, mUploadsNotFull, NUM_UPLOADERS, audacity::cloud::audiocom::Success, audacity::cloud::audiocom::sync::MissingBlocksUploader::ProducedItem::Task, and audacity::cloud::audiocom::sync::DataUploader::Upload().

Here is the call graph for this function:

◆ ConsumerThread()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::ConsumerThread ( )
private

Definition at line 255 of file MissingBlocksUploader.cpp.

256{
257 while (mIsRunning.load(std::memory_order_consume))
258 {
259 auto item = PopBlockFromQueue();
260 ConsumeBlock(std::move(item));
261 }
262}

Referenced by Start().

Here is the caller graph for this function:

◆ Create()

std::shared_ptr< MissingBlocksUploader > audacity::cloud::audiocom::sync::MissingBlocksUploader::Create ( CancellationContextPtr  cancellationContex,
const ServiceConfig serviceConfig,
std::vector< BlockUploadTask uploadTasks,
MissingBlocksUploadProgressCallback  progress 
)
static

Definition at line 27 of file MissingBlocksUploader.cpp.

31{
32 auto uploader =
33 std::make_shared<MissingBlocksUploader>(Tag {}, serviceConfig);
34
35 if (!cancellationContex)
36 cancellationContex = concurrency::CancellationContext::Create();
37
38 cancellationContex->OnCancelled(uploader);
39
40 uploader->Start(
41 std::move(cancellationContex), std::move(uploadTasks),
42 std::move(progressCallback));
43
44 return uploader;
45}

References audacity::concurrency::CancellationContext::Create().

Referenced by audacity::cloud::audiocom::sync::LocalProjectSnapshot::OnSnapshotCreated(), and audacity::cloud::audiocom::sync::anonymous_namespace{ResumedSnaphotUploadOperation.cpp}::ResumedSnaphotUploadOperation::UploadBlocks().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ HandleFailedBlock()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::HandleFailedBlock ( const ResponseResult result,
BlockUploadTask  task 
)
private

Definition at line 193 of file MissingBlocksUploader.cpp.

195{
196 MissingBlocksUploadProgress progressData;
197 {
198 std::lock_guard<std::mutex> lock(mProgressDataMutex);
199
201 mProgressData.UploadErrors.push_back(result);
202 progressData = mProgressData;
203 }
204
205 mProgressCallback(progressData, task.Block, result);
206
207 {
208 std::lock_guard<std::mutex> lock(mUploadsMutex);
210 mUploadsNotFull.notify_one();
211 }
212
213 mCancellationContext->Cancel();
214}

References audacity::cloud::audiocom::sync::BlockUploadTask::Block, audacity::cloud::audiocom::sync::MissingBlocksUploadProgress::FailedBlocks, mCancellationContext, mConcurrentUploads, mProgressCallback, mProgressData, mProgressDataMutex, mUploadsMutex, mUploadsNotFull, and audacity::cloud::audiocom::sync::MissingBlocksUploadProgress::UploadErrors.

Referenced by ConsumeBlock().

Here is the caller graph for this function:

◆ PopBlockFromQueue()

MissingBlocksUploader::ProducedItem audacity::cloud::audiocom::sync::MissingBlocksUploader::PopBlockFromQueue ( )
private

Definition at line 153 of file MissingBlocksUploader.cpp.

154{
155 std::unique_lock<std::mutex> lock(mRingBufferMutex);
157 lock,
158 [this]
159 {
161 !mIsRunning.load(std::memory_order_consume);
162 });
163
164 if (!mIsRunning.load(std::memory_order_relaxed))
165 return {};
166
167 auto item = std::move(mRingBuffer[mRingBufferReadIndex]);
169
170 mRingBufferNotFull.notify_one();
171
172 return std::move(item);
173}
std::array< ProducedItem, RING_BUFFER_SIZE > mRingBuffer

References mIsRunning, mRingBuffer, mRingBufferMutex, mRingBufferNotEmpty, mRingBufferNotFull, mRingBufferReadIndex, mRingBufferWriteIndex, and RING_BUFFER_SIZE.

◆ ProduceBlock()

MissingBlocksUploader::ProducedItem audacity::cloud::audiocom::sync::MissingBlocksUploader::ProduceBlock ( )
private

Definition at line 88 of file MissingBlocksUploader.cpp.

89{
90 const auto index = mFirstUnprocessedBlockIndex++;
91 const auto& task = mUploadTasks[index];
92
93 return { task, CompressBlock(task.Block) };
94}
std::vector< uint8_t > CompressBlock(const LockedBlock &block)

References audacity::cloud::audiocom::sync::CompressBlock(), mFirstUnprocessedBlockIndex, and mUploadTasks.

Here is the call graph for this function:

◆ ProducerThread()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::ProducerThread ( )
private

Definition at line 216 of file MissingBlocksUploader.cpp.

217{
218 while (mIsRunning.load(std::memory_order_consume))
219 {
220 BlockUploadTask task;
221
222 {
223 std::lock_guard<std::mutex> lock(mBlocksMutex);
224
226 return;
227
228 const auto index = mFirstUnprocessedBlockIndex++;
229 task = std::move(mUploadTasks[index]);
230 }
231
232 auto compressedData = CompressBlock(task.Block);
233
234 if (compressedData.empty())
235 {
236 MissingBlocksUploadProgress progressData;
237 {
238 std::lock_guard<std::mutex> lock(mProgressDataMutex);
240 progressData = mProgressData;
241 }
242
244 progressData, task.Block,
245 { SyncResultCode::InternalClientError, {} });
246 }
247 else
248 {
250 ProducedItem { std::move(task), std::move(compressedData) });
251 }
252 }
253}

References audacity::cloud::audiocom::sync::BlockUploadTask::Block, audacity::cloud::audiocom::sync::CompressBlock(), audacity::cloud::audiocom::sync::MissingBlocksUploadProgress::FailedBlocks, mBlocksMutex, mFirstUnprocessedBlockIndex, mIsRunning, mProgressCallback, mProgressData, mProgressDataMutex, mUploadTasks, and PushBlockToQueue().

Referenced by Start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ PushBlockToQueue()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::PushBlockToQueue ( ProducedItem  item)
private

Definition at line 132 of file MissingBlocksUploader.cpp.

133{
134 std::unique_lock<std::mutex> lock(mRingBufferMutex);
136 lock,
137 [this]
138 {
139 return ((mRingBufferWriteIndex + 1) % RING_BUFFER_SIZE) !=
141 !mIsRunning.load(std::memory_order_consume);
142 });
143
144 if (!mIsRunning.load(std::memory_order_relaxed))
145 return;
146
147 mRingBuffer[mRingBufferWriteIndex] = std::move(item);
149
150 mRingBufferNotEmpty.notify_one();
151}

References mIsRunning, mRingBuffer, mRingBufferMutex, mRingBufferNotEmpty, mRingBufferNotFull, mRingBufferReadIndex, mRingBufferWriteIndex, and RING_BUFFER_SIZE.

Referenced by ProducerThread().

Here is the caller graph for this function:

◆ Start()

void audacity::cloud::audiocom::sync::MissingBlocksUploader::Start ( CancellationContextPtr  cancellationContex,
std::vector< BlockUploadTask uploadTasks,
MissingBlocksUploadProgressCallback  progress 
)
private

Definition at line 52 of file MissingBlocksUploader.cpp.

56{
57 mCancellationContext = std::move(cancellationContex);
58 mUploadTasks = std::move(uploadTasks);
59 mProgressCallback = std::move(progressCallback);
61 mProgressCallback = [](auto...) {};
62
64
65 for (auto& thread : mProducerThread)
66 thread = std::thread([this] { ProducerThread(); });
67
68 mConsumerThread = std::thread([this] { ConsumerThread(); });
69}

References ConsumerThread(), mCancellationContext, mConsumerThread, mProducerThread, mProgressCallback, mProgressData, mUploadTasks, ProducerThread(), and audacity::cloud::audiocom::sync::MissingBlocksUploadProgress::TotalBlocks.

Here is the call graph for this function:

Member Data Documentation

◆ mBlocksMutex

std::mutex audacity::cloud::audiocom::sync::MissingBlocksUploader::mBlocksMutex
private

Definition at line 121 of file MissingBlocksUploader.h.

Referenced by ProducerThread().

◆ mCancellationContext

CancellationContextPtr audacity::cloud::audiocom::sync::MissingBlocksUploader::mCancellationContext
private

Definition at line 140 of file MissingBlocksUploader.h.

Referenced by ConsumeBlock(), HandleFailedBlock(), and Start().

◆ mConcurrentUploads

size_t audacity::cloud::audiocom::sync::MissingBlocksUploader::mConcurrentUploads { 0 }
private

Definition at line 126 of file MissingBlocksUploader.h.

Referenced by ConfirmBlock(), ConsumeBlock(), and HandleFailedBlock().

◆ mConsumerThread

std::thread audacity::cloud::audiocom::sync::MissingBlocksUploader::mConsumerThread
private

Definition at line 119 of file MissingBlocksUploader.h.

Referenced by Cancel(), and Start().

◆ mFirstUnprocessedBlockIndex

size_t audacity::cloud::audiocom::sync::MissingBlocksUploader::mFirstUnprocessedBlockIndex { 0 }
private

Definition at line 122 of file MissingBlocksUploader.h.

Referenced by ProduceBlock(), and ProducerThread().

◆ mIsRunning

std::atomic_bool audacity::cloud::audiocom::sync::MissingBlocksUploader::mIsRunning { true }
private

◆ mProducerThread

std::thread audacity::cloud::audiocom::sync::MissingBlocksUploader::mProducerThread[NUM_PRODUCERS]
private

Definition at line 118 of file MissingBlocksUploader.h.

Referenced by Cancel(), and Start().

◆ mProgressCallback

MissingBlocksUploadProgressCallback audacity::cloud::audiocom::sync::MissingBlocksUploader::mProgressCallback
private

Definition at line 114 of file MissingBlocksUploader.h.

Referenced by ConfirmBlock(), HandleFailedBlock(), ProducerThread(), and Start().

◆ mProgressData

MissingBlocksUploadProgress audacity::cloud::audiocom::sync::MissingBlocksUploader::mProgressData
private

Definition at line 138 of file MissingBlocksUploader.h.

Referenced by ConfirmBlock(), HandleFailedBlock(), ProducerThread(), and Start().

◆ mProgressDataMutex

std::mutex audacity::cloud::audiocom::sync::MissingBlocksUploader::mProgressDataMutex
private

Definition at line 137 of file MissingBlocksUploader.h.

Referenced by Cancel(), ConfirmBlock(), HandleFailedBlock(), and ProducerThread().

◆ mRingBuffer

std::array<ProducedItem, RING_BUFFER_SIZE> audacity::cloud::audiocom::sync::MissingBlocksUploader::mRingBuffer
private

Definition at line 133 of file MissingBlocksUploader.h.

Referenced by PopBlockFromQueue(), and PushBlockToQueue().

◆ mRingBufferMutex

std::mutex audacity::cloud::audiocom::sync::MissingBlocksUploader::mRingBufferMutex
private

Definition at line 128 of file MissingBlocksUploader.h.

Referenced by PopBlockFromQueue(), and PushBlockToQueue().

◆ mRingBufferNotEmpty

std::condition_variable audacity::cloud::audiocom::sync::MissingBlocksUploader::mRingBufferNotEmpty
private

Definition at line 130 of file MissingBlocksUploader.h.

Referenced by Cancel(), PopBlockFromQueue(), and PushBlockToQueue().

◆ mRingBufferNotFull

std::condition_variable audacity::cloud::audiocom::sync::MissingBlocksUploader::mRingBufferNotFull
private

Definition at line 131 of file MissingBlocksUploader.h.

Referenced by Cancel(), PopBlockFromQueue(), and PushBlockToQueue().

◆ mRingBufferReadIndex

size_t audacity::cloud::audiocom::sync::MissingBlocksUploader::mRingBufferReadIndex { 0 }
private

Definition at line 135 of file MissingBlocksUploader.h.

Referenced by PopBlockFromQueue(), and PushBlockToQueue().

◆ mRingBufferWriteIndex

size_t audacity::cloud::audiocom::sync::MissingBlocksUploader::mRingBufferWriteIndex { 0 }
private

Definition at line 134 of file MissingBlocksUploader.h.

Referenced by PopBlockFromQueue(), and PushBlockToQueue().

◆ mServiceConfig

const ServiceConfig& audacity::cloud::audiocom::sync::MissingBlocksUploader::mServiceConfig
private

Definition at line 111 of file MissingBlocksUploader.h.

Referenced by ConsumeBlock().

◆ mUploadsMutex

std::mutex audacity::cloud::audiocom::sync::MissingBlocksUploader::mUploadsMutex
private

Definition at line 124 of file MissingBlocksUploader.h.

Referenced by ConfirmBlock(), ConsumeBlock(), and HandleFailedBlock().

◆ mUploadsNotFull

std::condition_variable audacity::cloud::audiocom::sync::MissingBlocksUploader::mUploadsNotFull
private

Definition at line 125 of file MissingBlocksUploader.h.

Referenced by Cancel(), ConfirmBlock(), ConsumeBlock(), and HandleFailedBlock().

◆ mUploadTasks

std::vector<BlockUploadTask> audacity::cloud::audiocom::sync::MissingBlocksUploader::mUploadTasks
private

Definition at line 113 of file MissingBlocksUploader.h.

Referenced by ProduceBlock(), ProducerThread(), and Start().

◆ NUM_PRODUCERS

constexpr auto audacity::cloud::audiocom::sync::MissingBlocksUploader::NUM_PRODUCERS = 3
staticconstexpr

Definition at line 72 of file MissingBlocksUploader.h.

◆ NUM_UPLOADERS

constexpr auto audacity::cloud::audiocom::sync::MissingBlocksUploader::NUM_UPLOADERS = 6
staticconstexpr

Definition at line 73 of file MissingBlocksUploader.h.

Referenced by ConsumeBlock().

◆ RING_BUFFER_SIZE

constexpr auto audacity::cloud::audiocom::sync::MissingBlocksUploader::RING_BUFFER_SIZE = 16
staticconstexpr

Definition at line 74 of file MissingBlocksUploader.h.

Referenced by PopBlockFromQueue(), and PushBlockToQueue().


The documentation for this class was generated from the following files: