Audacity 3.2.0
MissingBlocksUploader.cpp
Go to the documentation of this file.
1/* SPDX-License-Identifier: GPL-2.0-or-later */
2/*!********************************************************************
3
4 Audacity: A Digital Audio Editor
5
6 MissingBlocksUploader.cpp
7
8 Dmitry Vedenko
9
10**********************************************************************/
11
13
14#include "DataUploader.h"
15
16#include "WavPackCompressor.h"
17
19{
20
22 Tag, const ServiceConfig& serviceConfig)
23 : mServiceConfig { serviceConfig }
24{
25}
26
27std::shared_ptr<MissingBlocksUploader> MissingBlocksUploader::Create(
28 CancellationContextPtr cancellationContex, const ServiceConfig& serviceConfig,
29 std::vector<BlockUploadTask> uploadTasks,
31{
32 auto uploader =
33 std::make_shared<MissingBlocksUploader>(Tag {}, serviceConfig);
34
35 if (!cancellationContex)
36 cancellationContex = concurrency::CancellationContext::Create();
37
38 cancellationContex->OnCancelled(uploader);
39
40 uploader->Start(
41 std::move(cancellationContex), std::move(uploadTasks),
42 std::move(progressCallback));
43
44 return uploader;
45}
46
48{
49 Cancel();
50}
51
53 CancellationContextPtr cancellationContex,
54 std::vector<BlockUploadTask> uploadTasks,
56{
57 mCancellationContext = std::move(cancellationContex);
58 mUploadTasks = std::move(uploadTasks);
59 mProgressCallback = std::move(progressCallback);
61 mProgressCallback = [](auto...) {};
62
64
65 for (auto& thread : mProducerThread)
66 thread = std::thread([this] { ProducerThread(); });
67
68 mConsumerThread = std::thread([this] { ConsumerThread(); });
69}
70
72{
73 if (!mIsRunning.exchange(false))
74 return;
75
76 mRingBufferNotEmpty.notify_all();
77 mRingBufferNotFull.notify_all();
78 mUploadsNotFull.notify_all();
79
80 for (auto& thread : mProducerThread)
81 thread.join();
82
83 mConsumerThread.join();
84
85 std::lock_guard lock(mProgressDataMutex);
86}
87
89{
90 const auto index = mFirstUnprocessedBlockIndex++;
91 const auto& task = mUploadTasks[index];
92
93 return { task, CompressBlock(task.Block) };
94}
95
97{
98 {
99 std::unique_lock<std::mutex> lock(mUploadsMutex);
100 mUploadsNotFull.wait(
101 lock,
102 [this]
103 {
105 !mIsRunning.load(std::memory_order_consume);
106 });
107
108 if (!mIsRunning.load(std::memory_order_relaxed))
109 return;
110
112 }
113
116 std::move(item.CompressedData),
117 [this, task = item.Task,
118 weakThis = weak_from_this()](ResponseResult result)
119 {
120 auto lock = weakThis.lock();
121
122 if (!lock)
123 return;
124
125 if (result.Code != SyncResultCode::Success)
126 HandleFailedBlock(result, task);
127 else
128 ConfirmBlock(task);
129 });
130}
131
133{
134 std::unique_lock<std::mutex> lock(mRingBufferMutex);
136 lock,
137 [this]
138 {
139 return ((mRingBufferWriteIndex + 1) % RING_BUFFER_SIZE) !=
141 !mIsRunning.load(std::memory_order_consume);
142 });
143
144 if (!mIsRunning.load(std::memory_order_relaxed))
145 return;
146
147 mRingBuffer[mRingBufferWriteIndex] = std::move(item);
149
150 mRingBufferNotEmpty.notify_one();
151}
152
154{
155 std::unique_lock<std::mutex> lock(mRingBufferMutex);
157 lock,
158 [this]
159 {
161 !mIsRunning.load(std::memory_order_consume);
162 });
163
164 if (!mIsRunning.load(std::memory_order_relaxed))
165 return {};
166
167 auto item = std::move(mRingBuffer[mRingBufferReadIndex]);
169
170 mRingBufferNotFull.notify_one();
171
172 return std::move(item);
173}
174
176{
177 MissingBlocksUploadProgress progressData;
178 {
179 std::lock_guard<std::mutex> lock(mProgressDataMutex);
181 progressData = mProgressData;
182 }
183
184 mProgressCallback(progressData, item.Block, {});
185
186 {
187 std::lock_guard<std::mutex> lock(mUploadsMutex);
189 mUploadsNotFull.notify_one();
190 }
191}
192
194 const ResponseResult& result, BlockUploadTask task)
195{
196 MissingBlocksUploadProgress progressData;
197 {
198 std::lock_guard<std::mutex> lock(mProgressDataMutex);
199
201 mProgressData.UploadErrors.push_back(result);
202 progressData = mProgressData;
203 }
204
205 mProgressCallback(progressData, task.Block, result);
206
207 {
208 std::lock_guard<std::mutex> lock(mUploadsMutex);
210 mUploadsNotFull.notify_one();
211 }
212
213 mCancellationContext->Cancel();
214}
215
217{
218 while (mIsRunning.load(std::memory_order_consume))
219 {
220 BlockUploadTask task;
221
222 {
223 std::lock_guard<std::mutex> lock(mBlocksMutex);
224
226 return;
227
228 const auto index = mFirstUnprocessedBlockIndex++;
229 task = std::move(mUploadTasks[index]);
230 }
231
232 auto compressedData = CompressBlock(task.Block);
233
234 if (compressedData.empty())
235 {
236 MissingBlocksUploadProgress progressData;
237 {
238 std::lock_guard<std::mutex> lock(mProgressDataMutex);
240 progressData = mProgressData;
241 }
242
244 progressData, task.Block,
245 { SyncResultCode::InternalClientError, {} });
246 }
247 else
248 {
250 ProducedItem { std::move(task), std::move(compressedData) });
251 }
252 }
253}
254
255void MissingBlocksUploader::ConsumerThread()
256{
257 while (mIsRunning.load(std::memory_order_consume))
258 {
259 auto item = PopBlockFromQueue();
260 ConsumeBlock(std::move(item));
261 }
262}
263
264} // namespace audacity::cloud::audiocom::sync
Configuration for the audio.com.
Definition: ServiceConfig.h:23
void Upload(CancellationContextPtr cancellationContex, const ServiceConfig &config, const UploadUrls &target, std::vector< uint8_t > data, std::function< void(ResponseResult)> callback, std::function< void(double)> progressCallback={})
void HandleFailedBlock(const ResponseResult &result, BlockUploadTask task)
std::array< ProducedItem, RING_BUFFER_SIZE > mRingBuffer
MissingBlocksUploader(Tag, const ServiceConfig &serviceConfig)
static std::shared_ptr< MissingBlocksUploader > Create(CancellationContextPtr cancellationContex, const ServiceConfig &serviceConfig, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
void Start(CancellationContextPtr cancellationContex, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
std::function< void(const MissingBlocksUploadProgress &, const LockedBlock &, ResponseResult blockResponseResult)> MissingBlocksUploadProgressCallback
std::vector< uint8_t > CompressBlock(const LockedBlock &block)
std::shared_ptr< CancellationContext > CancellationContextPtr