Audacity 3.2.0
ResumedSnaphotUploadOperation.cpp
Go to the documentation of this file.
1/* SPDX-License-Identifier: GPL-2.0-or-later */
2/*!********************************************************************
3
4 Audacity: A Digital Audio Editor
5
6 ResumedSnaphotUploadOperation.cpp
7
8 Dmitry Vedenko
9
10**********************************************************************/
11
13
14#include <algorithm>
15#include <memory>
16#include <string>
17#include <string_view>
18
20
22#include "DataUploader.h"
26#include "ServiceConfig.h"
27
28#include "SampleBlock.h"
29#include "WaveTrack.h"
30
31#include "CodeConversions.h"
32#include "DateTimeConversions.h"
33#include "FromChars.h"
34#include "IResponse.h"
35#include "NetworkManager.h"
36#include "Request.h"
37#include "UriParser.h"
38
40{
41namespace
42{
43
44bool IsUrlExpired(const std::string& url)
45{
46 const auto parsedUri = ParseUri(url);
47 const auto parsedQuery = ParseUriQuery(parsedUri.Query);
48
49 const auto amzDateIt = parsedQuery.find("X-Amz-Date");
50
51 if (amzDateIt == parsedQuery.end())
52 return false;
53
54 SystemTime time;
55
56 if (!ParseISO8601Date(std::string(amzDateIt->second), &time))
57 return false;
58
59 const auto amzExpiresIt = parsedQuery.find("X-Amz-Expires");
60
61 if (amzExpiresIt == parsedQuery.end())
62 return false;
63
64 int64_t expiresSeconds;
65
66 auto expiresParseResult = FromChars(
67 amzExpiresIt->second.data(),
68 amzExpiresIt->second.data() + amzExpiresIt->second.size(),
69 expiresSeconds);
70
71 if (expiresParseResult.ec != std::errc {})
72 return false;
73
74 return (time + std::chrono::seconds { expiresSeconds }) <
75 std::chrono::system_clock::now();
76}
77
80 public std::enable_shared_from_this<ResumedSnaphotUploadOperation>
81{
82 struct Tag
83 {
84 };
85
86public:
88 Tag, ProjectCloudExtension& projectCloudExtension,
89 std::string_view snapshotId, std::string_view confirmationUrl)
90 : mProjectCloudExtension { projectCloudExtension }
91 , mProjectId { mProjectCloudExtension.GetCloudProjectId() }
92 , mSnapshotId { snapshotId }
93 , mConfirmationUrl { confirmationUrl }
94 , mCancellationContext { concurrency::CancellationContext::Create() }
95 {
96 }
97
99 {
100 }
101
102 static void Perform(
103 ProjectCloudExtension& projectCloudExtension, std::string_view snapshotId,
104 std::string_view confirmationUrl)
105 {
106 auto& cloudProjectsDatabase = CloudProjectsDatabase::Get();
107
108 auto operation = std::make_shared<ResumedSnaphotUploadOperation>(
109 Tag {}, projectCloudExtension, snapshotId, confirmationUrl);
110
111 const auto projectId = projectCloudExtension.GetCloudProjectId();
112
113 operation->mPendingProjectBlobData =
114 cloudProjectsDatabase.GetPendingProjectBlob(projectId, snapshotId);
115
116 operation->mPendingProjectBlocks =
117 cloudProjectsDatabase.GetPendingProjectBlocks(projectId, snapshotId);
118
119 const int64_t totalBlocks = operation->mPendingProjectBlocks.size();
120
121 if (operation->mPendingProjectBlobData)
122 {
123 operation->mHasExpiredUrls =
124 IsUrlExpired(operation->mPendingProjectBlobData->UploadUrl);
125 }
126
127 for (const auto& block : operation->mPendingProjectBlocks)
128 {
129 if (operation->mHasExpiredUrls)
130 break;
131
132 operation->mHasExpiredUrls = IsUrlExpired(block.UploadUrl);
133 }
134
135 projectCloudExtension.OnSyncResumed(
136 operation, totalBlocks,
137 operation->mPendingProjectBlobData.has_value());
138 }
139
140private:
142 {
143 const auto urls = UploadUrls { {},
144 mPendingProjectBlobData->UploadUrl,
145 mPendingProjectBlobData->ConfirmUrl,
146 mPendingProjectBlobData->FailUrl };
147
149 mCancellationContext, GetServiceConfig(), urls,
150 mPendingProjectBlobData->BlobData,
151 [this, weakThis = weak_from_this()](auto result)
152 {
153 auto strongThis = weakThis.lock();
154 if (!strongThis)
155 return;
156
157 if (!IsUploadRecoverable(result.Code))
159 mProjectId, mSnapshotId);
160
161 if (result.Code == SyncResultCode::Success)
162 {
163 mProjectCloudExtension.OnProjectDataUploaded(*this);
164 UploadBlocks();
165 }
166 else
167 FailSync(std::move(result));
168 });
169 }
170
172 {
173 if (!mCompleted.exchange(true))
174 mProjectCloudExtension.OnSyncCompleted(this, {});
175 }
176
178 {
179 if (!mCompleted.exchange(true))
180 mProjectCloudExtension.OnSyncCompleted(this, error);
181 }
182
184 {
185 FailSync(CloudSyncError { DeduceError(result.Code), result.Content });
186 }
187
189 {
190 if (mPendingProjectBlocks.empty())
191 MarkSnapshotSynced();
192
193 auto project = mProjectCloudExtension.GetProject().lock();
194
195 if (!project)
196 {
198 return;
199 }
200
201 auto& waveTrackFactory = WaveTrackFactory::Get(*project);
202 auto& sampleBlockFactory = waveTrackFactory.GetSampleBlockFactory();
203
204 std::vector<BlockUploadTask> blockTasks;
205 blockTasks.reserve(mPendingProjectBlocks.size());
206
207 for (const auto& pendingBlock : mPendingProjectBlocks)
208 {
209 BlockUploadTask task;
210
211 task.BlockUrls = { {},
212 pendingBlock.UploadUrl,
213 pendingBlock.ConfirmUrl,
214 pendingBlock.FailUrl };
215
216 task.Block.Format =
217 static_cast<sampleFormat>(pendingBlock.BlockSampleFormat);
218 task.Block.Hash = pendingBlock.BlockHash;
219 task.Block.Id = pendingBlock.BlockId;
220 try
221 {
222 task.Block.Block = sampleBlockFactory->CreateFromId(
223 task.Block.Format, pendingBlock.BlockId);
224
225 blockTasks.push_back(std::move(task));
226 }
227 catch (const FileException& e)
228 {
229 // We have failed to resume the upload, local data is missing
231 mProjectId, mSnapshotId);
232 FailSync(
234 ToUTF8(
235 XO("Local project data was removed before the sync has completed")
236 .Translation()) });
237 return;
238 }
239 }
240
241 mMissingBlocksUploader = MissingBlocksUploader::Create(
242 mCancellationContext, GetServiceConfig(), std::move(blockTasks),
243 [this, weakThis = weak_from_this()](
244 const MissingBlocksUploadProgress& progress,
245 const LockedBlock& block, ResponseResult blockResponseResult)
246 {
247 auto strongThis = weakThis.lock();
248 if (!strongThis)
249 return;
250
251 if (
252 blockResponseResult.Code != SyncResultCode::ConnectionFailed &&
253 blockResponseResult.Code != SyncResultCode::Cancelled)
255 mProjectId, block.Id);
256
257 mProjectCloudExtension.OnBlockUploaded(
258 *this, block.Hash,
259 blockResponseResult.Code == SyncResultCode::Success);
260
261 const auto completed =
262 progress.UploadedBlocks == progress.TotalBlocks ||
263 progress.FailedBlocks != 0;
264
265 const bool succeeded = completed && progress.FailedBlocks == 0;
266
267 if (!completed)
268 return;
269
270 if (succeeded)
271 MarkSnapshotSynced();
272 else
273 FailSync(std::move(blockResponseResult));
274 });
275 }
276
277 void Start() override
278 {
279 if (mHasExpiredUrls)
280 RefreshUrls();
281 else if (mPendingProjectBlobData.has_value())
282 UploadSnapshot();
283 else
284 UploadBlocks();
285 }
286
288 {
289 using namespace network_manager;
291 mProjectId, mSnapshotId) };
292
293 SetCommonHeaders(request);
294
295 auto response = NetworkManager::GetInstance().doGet(request);
296
297 response->setRequestFinishedCallback(
298 [this, response, weakThis = weak_from_this()](auto)
299 {
300 auto strongThis = weakThis.lock();
301 if (!strongThis)
302 return;
303
304 if (response->getError() != NetworkError::NoError)
305 {
306 FailSync(DeduceUploadError(*response));
307 return;
308 }
309
310 auto syncState =
311 DeserializeProjectSyncState(response->readAll<std::string>());
312
313 if (!syncState)
314 {
315 FailSync(
316 MakeClientFailure(XO("Failed to deserialize the response")));
317 return;
318 }
319
320 UpdateUrls(*syncState);
321
322 if (mPendingProjectBlobData.has_value())
323 UploadSnapshot();
324 else
325 UploadBlocks();
326 });
327 }
328
329 void UpdateUrls(const ProjectSyncState& syncState)
330 {
331 if (mPendingProjectBlobData.has_value())
332 {
333 if (syncState.FileUrls.UploadUrl.empty())
334 {
335 mPendingProjectBlobData = {};
337 mProjectId, mSnapshotId);
338 }
339 else
340 {
341 mPendingProjectBlobData->UploadUrl = syncState.FileUrls.UploadUrl;
342 mPendingProjectBlobData->ConfirmUrl = syncState.FileUrls.SuccessUrl;
343 mPendingProjectBlobData->FailUrl = syncState.FileUrls.SuccessUrl;
344 }
345 }
346
347 std::unordered_map<std::string, UploadUrls> urlsLookup;
348 for (const auto& urls : syncState.MissingBlocks)
349 urlsLookup.emplace(urls.Id, urls);
350
351 for (auto& block : mPendingProjectBlocks)
352 {
353 auto it = urlsLookup.find(block.BlockHash);
354
355 if (it == urlsLookup.end())
356 {
358 mProjectId, block.BlockId);
359
360 continue;
361 }
362
363 block.UploadUrl = urlsLookup[block.BlockHash].UploadUrl;
364 block.ConfirmUrl = urlsLookup[block.BlockHash].SuccessUrl;
365 block.FailUrl = urlsLookup[block.BlockHash].FailUrl;
366 }
367
368 mPendingProjectBlocks.erase(
369 std::remove_if(
370 mPendingProjectBlocks.begin(), mPendingProjectBlocks.end(),
371 [&urlsLookup](auto& block)
372 { return urlsLookup.find(block.BlockHash) == urlsLookup.end(); }),
373 mPendingProjectBlocks.end());
374 }
375
377 {
378 using namespace network_manager;
379 Request request { mConfirmationUrl };
380
381 SetCommonHeaders(request);
382
383 auto response = NetworkManager::GetInstance().doPost(request, nullptr, 0);
384
385 response->setRequestFinishedCallback(
386 [this, response, weakThis = weak_from_this()](auto)
387 {
388 auto strongThis = weakThis.lock();
389 if (!strongThis)
390 return;
391
393 mProjectId, mSnapshotId);
394
395 if (response->getError() != NetworkError::NoError)
396 {
397 FailSync(DeduceUploadError(*response));
398 return;
399 }
400
401 CompleteSync();
402 });
403
404 mCancellationContext->OnCancelled(response);
405 }
406
407 void SetUploadData(const ProjectUploadData& data) override
408 {
409 // This method will never be called for resumed operations
410 }
411
412 bool IsCompleted() const override
413 {
414 return mCompleted.load();
415 }
416
417 void Cancel() override
418 {
419 mCancellationContext->Cancel();
421 }
422
424
425 std::string mProjectId;
426 std::string mSnapshotId;
427 std::string mConfirmationUrl;
428
430
431 std::optional<PendingProjectBlobData> mPendingProjectBlobData;
432 std::vector<PendingProjectBlockData> mPendingProjectBlocks;
433
434 std::shared_ptr<MissingBlocksUploader> mMissingBlocksUploader;
435
436 std::atomic<bool> mCompleted { false };
437
438 bool mHasExpiredUrls { false };
439}; // class ResumedProjectUploadOperation
440
441} // namespace
442
444 ProjectCloudExtension& projectCloudExtension,
445 std::function<void()> onBeforeUploadStarts)
446{
447 auto& cloudProjectsDatabase = CloudProjectsDatabase::Get();
448
449 auto pendingSnapshots = cloudProjectsDatabase.GetPendingSnapshots(
450 projectCloudExtension.GetCloudProjectId());
451
452 if (!pendingSnapshots.empty() && onBeforeUploadStarts)
453 onBeforeUploadStarts();
454
455 for (const auto& snapshot : pendingSnapshots)
456 ResumedSnaphotUploadOperation::Perform(
457 projectCloudExtension, snapshot.SnapshotId, snapshot.ConfirmUrl);
458}
459
460} // namespace audacity::cloud::audiocom::sync
Declare functions to perform UTF-8 to std::wstring conversions.
Declare functions to work with date and time string representations.
FromCharsResult FromChars(const char *buffer, const char *last, float &value) noexcept
Parse a string into a single precision floating point value, always uses the dot as decimal.
Definition: FromChars.cpp:153
Declare functions to convert numeric types to string representation.
XO("Cut/Copy/Paste")
Declare an interface for HTTP response.
Declare a class for performing HTTP requests.
Declare a class for constructing HTTP requests.
sampleFormat
The ordering of these values with operator < agrees with the order of increasing bit width.
Definition: SampleFormat.h:30
const auto project
QueryFields ParseUriQuery(std::string_view query, std::string_view delimiter) noexcept
Parses URI query and returns QueryFields structure with parsed fields.
Definition: UriParser.cpp:59
UriFields ParseUri(std::string_view uri) noexcept
Definition: UriParser.cpp:9
Thrown for failure of file or database operations in deeply nested places.
Definition: FileException.h:19
static WaveTrackFactory & Get(AudacityProject &project)
Definition: WaveTrack.cpp:3349
std::string GetSnapshotSyncUrl(std::string_view projectId, std::string_view snapshotId) const
void RemovePendingProjectBlock(std::string_view projectId, int64_t blockId)
void RemovePendingSnapshot(std::string_view projectId, std::string_view snapshotId)
void RemovePendingProjectBlob(std::string_view projectId, std::string_view snapshotId)
void Upload(CancellationContextPtr cancellationContex, const ServiceConfig &config, const UploadUrls &target, std::vector< uint8_t > data, std::function< void(ResponseResult)> callback, std::function< void(double)> progressCallback={})
static std::shared_ptr< MissingBlocksUploader > Create(CancellationContextPtr cancellationContex, const ServiceConfig &serviceConfig, std::vector< BlockUploadTask > uploadTasks, MissingBlocksUploadProgressCallback progress)
void OnSyncResumed(std::shared_ptr< ProjectUploadOperation > uploadOperation, int64_t missingBlocksCount, bool needsProjectUpload)
This method is called from the UI thread.
static void Perform(ProjectCloudExtension &projectCloudExtension, std::string_view snapshotId, std::string_view confirmationUrl)
ResumedSnaphotUploadOperation(Tag, ProjectCloudExtension &projectCloudExtension, std::string_view snapshotId, std::string_view confirmationUrl)
ResponsePtr doPost(const Request &request, const void *data, size_t size)
ResponsePtr doGet(const Request &request)
CloudSyncError::ErrorType DeduceError(SyncResultCode code)
std::optional< ProjectSyncState > DeserializeProjectSyncState(const std::string &data)
void ResumeProjectUpload(ProjectCloudExtension &projectCloudExtension, std::function< void()> onBeforeUploadStarts)
CloudSyncError MakeClientFailure(const TranslatableString &message)
CLOUD_AUDIOCOM_API CloudSyncError DeduceUploadError(audacity::network_manager::IResponse &response)
bool IsUploadRecoverable(SyncResultCode code)
void SetCommonHeaders(Request &request)
const ServiceConfig & GetServiceConfig()
Returns the instance of the ServiceConfig.
std::shared_ptr< CancellationContext > CancellationContextPtr
std::string ToUTF8(const std::wstring &wstr)
bool ParseISO8601Date(const std::string &dateString, SystemTime *time)
std::chrono::system_clock::time_point SystemTime