14#include <unordered_set>
16#include <wx/datetime.h>
39 auto result = statement->Prepare().Run();
41 std::vector<std::string> attachedDBs;
43 for (
auto row : result)
47 if (!row.Get(1, dbName))
50 if (dbName ==
"main" || dbName ==
"temp")
53 attachedDBs.push_back(std::move(dbName));
63 : mSnapshotDBName {
std::string(
"s_") + projectInfo.Id }
64 , mProjectInfo {
std::move(projectInfo) }
65 , mSnapshotInfo {
std::move(snapshotInfo) }
66 , mPath {
std::move(path) }
67 , mCallback {
std::move(callback) }
68 , mDownloadDetached { downloadDetached }
128 [
this](
auto response) { OnProjectBlobDownloaded(response); }));
132 if (knownBlocks.find(
ToUpper(block.Hash)) != knownBlocks.end())
136 block.Url, [
this, hash =
ToUpper(block.Hash)](
auto response)
137 { OnBlockDownloaded(std::move(hash), response); }));
164 detachStmt->Prepare(dbName).Run();
172 auto snapshot = std::make_shared<RemoteProjectSnapshot>(
173 Tag {}, std::move(projectInfo), std::move(snapshotInfo), std::move(path),
174 std::move(callback), downloadDetached);
176 if (snapshot->mAttachedDBNames.empty())
181 XO(
"Failed to attach to the Cloud project database")
190 if (snapshot->mNothingToDo)
208 const auto duration =
218 std::chrono::duration_cast<TransferStats::Duration>(duration));
230 const auto projectData =
241 auto result = attachStmt->Prepare(projectData->LocalPath, dbName).Run();
252 const std::string& dbName, std::unordered_set<std::string> blocks)
263 [
this, dbName = dbName, blocks = std::move(blocks)]()
265 const auto queryString =
270 ".sampleblocks WHERE blockid IN (SELECT block_id FROM block_hashes WHERE hash = ?)";
274 for (
const auto& block : blocks)
283 if (!copyBlocksStatement)
293 auto result = copyBlocksStatement->Prepare(block).Run();
305 const auto rowsUpdated = result.GetModifiedRowsCount();
306 mCopiedBlocks.fetch_add(rowsUpdated, std::memory_order_acq_rel);
316 const std::string& attachedDbName)
const
318 std::unordered_set<std::string> remoteBlocks;
321 remoteBlocks.insert(
ToUpper(block.Hash));
325 auto fn = db->CreateScalarFunction(
326 "inRemoteBlocks", [&remoteBlocks](
const std::string& hash)
327 {
return remoteBlocks.find(hash) != remoteBlocks.end(); });
329 auto statement = db->CreateStatement(
330 "SELECT hash FROM block_hashes WHERE project_id = ? AND inRemoteBlocks(hash) AND block_id IN (SELECT blockid FROM " +
331 attachedDbName +
".sampleblocks)");
338 std::unordered_set<std::string> knownBlocks;
340 for (
auto row : result)
344 if (!row.Get(0, hash))
347 knownBlocks.insert(hash);
383 response->setRequestFinishedCallback(
384 [
this, onSuccess = std::move(onSuccess), retries, response](
auto)
387 response->getBytesAvailable(), std::memory_order_acq_rel);
400 OnFailure(std::move(responseResult));
408 OnFailure(std::move(responseResult));
413 response->getRequest().getURL(), std::move(onSuccess),
431 return response.
readAll<std::vector<uint8_t>>();
433 std::vector<uint8_t> data(
size);
441void RemoteProjectSnapshot::OnProjectBlobDownloaded(
445 uint64_t dictSize = 0;
447 if (data.size() <
sizeof(uint64_t))
453 std::memcpy(&dictSize, data.data(),
sizeof(uint64_t));
458 if (data.size() <
sizeof(uint64_t) + dictSize)
465 auto transaction = db->BeginTransaction(
"p_" + mProjectInfo.Id);
467 auto updateProjectStatement = db->CreateStatement(
468 "INSERT INTO " + mSnapshotDBName +
469 ".project (id, dict, doc) VALUES (1, ?1, ?2) "
470 "ON CONFLICT(id) DO UPDATE SET dict = ?1, doc = ?2");
472 if (!updateProjectStatement)
481 auto& preparedUpdateProjectStatement = updateProjectStatement->Prepare();
483 preparedUpdateProjectStatement.Bind(
484 1, data.data() +
sizeof(uint64_t), dictSize,
false);
486 preparedUpdateProjectStatement.Bind(
487 2, data.data() +
sizeof(uint64_t) + dictSize,
488 data.size() -
sizeof(uint64_t) - dictSize,
false);
490 auto result = preparedUpdateProjectStatement.Run();
497 result.GetErrors().front().GetErrorString().Translation()) });
502 auto deleteAutosaveStatement = db->CreateStatement(
503 "DELETE FROM " + mSnapshotDBName +
".autosave WHERE id = 1");
505 if (!deleteAutosaveStatement)
514 result = deleteAutosaveStatement->Prepare().Run();
521 result.GetErrors().front().GetErrorString().Translation()) });
525 if (
auto error = transaction.Commit(); error.IsError())
532 mProjectDownloaded.store(
true, std::memory_order_release);
536void RemoteProjectSnapshot::OnBlockDownloaded(
541 const auto blockData =
554 auto transaction = db->BeginTransaction(
"b_" + blockHash);
556 auto hashesStatement = db->CreateStatement(
557 "INSERT INTO block_hashes (project_id, block_id, hash) VALUES (?1, ?2, ?3) "
558 "ON CONFLICT(project_id, block_id) DO UPDATE SET hash = ?3");
561 hashesStatement->Prepare(mProjectInfo.Id, blockData->BlockId, blockHash)
569 result.GetErrors().front().GetErrorString().Translation()) });
573 auto blockStatement = db->CreateStatement(
574 "INSERT INTO " + mSnapshotDBName +
575 ".sampleblocks (blockid, sampleformat, summin, summax, sumrms, summary256, summary64k, samples) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) "
576 "ON CONFLICT(blockid) DO UPDATE SET sampleformat = ?2, summin = ?3, summax = ?4, sumrms = ?5, summary256 = ?6, summary64k = ?7, samples = ?8");
583 blockStatement.GetError().GetErrorString().Translation()) });
587 auto& preparedStatement = blockStatement->Prepare();
589 preparedStatement.Bind(1, blockData->BlockId);
590 preparedStatement.Bind(2,
static_cast<int64_t
>(blockData->Format));
591 preparedStatement.Bind(3, blockData->BlockMinMaxRMS.Min);
592 preparedStatement.Bind(4, blockData->BlockMinMaxRMS.Max);
593 preparedStatement.Bind(5, blockData->BlockMinMaxRMS.RMS);
594 preparedStatement.Bind(
595 6, blockData->Summary256.data(),
596 blockData->Summary256.size() *
sizeof(
MinMaxRMS),
false);
597 preparedStatement.Bind(
598 7, blockData->Summary64k.data(),
599 blockData->Summary64k.size() *
sizeof(
MinMaxRMS),
false);
600 preparedStatement.Bind(
601 8, blockData->Data.data(), blockData->Data.size(),
false);
603 result = preparedStatement.Run();
610 result.GetErrors().front().GetErrorString().Translation()) });
614 if (
auto error = transaction.Commit(); error.IsError())
621 mDownloadedBlocks.fetch_add(1, std::memory_order_acq_rel);
628 SetState(State::Failed);
630 mDownloadedBlocks.load(std::memory_order_acquire) +
631 mCopiedBlocks.load(std::memory_order_acquire),
633 mProjectDownloaded.load(std::memory_order_acquire) });
636void RemoteProjectSnapshot::RemoveResponse(
640 auto lock = std::lock_guard { mResponsesMutex };
643 mResponses.begin(), mResponses.end(),
644 [response](
auto& r) { return r.get() == response; }),
647 if (mResponses.empty())
648 mResponsesEmptyCV.notify_all();
651 auto lock = std::lock_guard { mRequestsMutex };
652 mRequestsInProgress--;
653 mRequestsCV.notify_one();
657void RemoteProjectSnapshot::MarkProjectInDB(
bool successfulDownload)
659 if (mDownloadDetached)
663 auto currentData = db.GetProjectData(mProjectInfo.Id);
668 data.SnapshotId = mSnapshotInfo.Id;
669 data.SyncStatus = successfulDownload ? DBProjectData::SyncStatusSynced :
670 DBProjectData::SyncStatusDownloading;
671 data.LastRead = wxDateTime::Now().GetTicks();
672 data.LocalPath = mPath;
674 if (data.SavesCount == 0)
678 data.FirstSyncDialogShown =
true;
680 db.UpdateProjectData(data);
682 if (successfulDownload)
683 db.SetProjectUserSlug(mProjectInfo.Id, mProjectInfo.Username);
686void RemoteProjectSnapshot::ReportProgress()
688 if (
mState.load(std::memory_order_acquire) != State::Downloading)
691 const auto projectDownloaded =
692 mProjectDownloaded.load(std::memory_order_acquire);
693 const auto blocksDownloaded =
694 mDownloadedBlocks.load(std::memory_order_acquire);
696 const auto blockCopied = mCopiedBlocks.load(std::memory_order_acquire);
698 const auto processedBlocks = blocksDownloaded + blockCopied;
700 const auto completed =
701 processedBlocks == mMissingBlocks && projectDownloaded;
705 CleanupOrphanBlocks();
706 SetState(State::Succeeded);
707 MarkProjectInDB(
true);
710 mCallback({ {}, processedBlocks, mMissingBlocks, projectDownloaded });
713bool RemoteProjectSnapshot::InProgress()
const
715 return mState.load(std::memory_order_acquire) == State::Downloading;
718void RemoteProjectSnapshot::RequestsThread()
720 constexpr auto MAX_CONCURRENT_REQUESTS = 6;
724 std::pair<std::string, SuccessHandler> request;
727 auto lock = std::unique_lock { mRequestsMutex };
729 if (mRequestsInProgress >= MAX_CONCURRENT_REQUESTS)
733 [
this, MAX_CONCURRENT_REQUESTS] {
734 return mRequestsInProgress < MAX_CONCURRENT_REQUESTS ||
742 if (mNextRequestIndex >= mRequests.size())
745 request = mRequests[mNextRequestIndex++];
746 mRequestsInProgress++;
749 DownloadBlob(std::move(request.first), std::move(request.second), 3);
752 std::this_thread::sleep_for(std::chrono::milliseconds(50));
756void RemoteProjectSnapshot::SetState(
State state)
758 if (state != State::Downloading)
759 mEndTime = Clock::now();
768 auto transaction = db->BeginTransaction(
"d_" + mProjectInfo.Id);
770 std::unordered_set<std::string> snaphotBlockHashes;
772 for (
const auto& block : mSnapshotInfo.Blocks)
773 snaphotBlockHashes.insert(
ToUpper(block.Hash));
775 auto inSnaphotFunction = db->CreateScalarFunction(
776 "inSnapshot", [&snaphotBlockHashes](
const std::string& hash)
777 {
return snaphotBlockHashes.find(hash) != snaphotBlockHashes.end(); });
780 auto deleteBlocksStatement = db->CreateStatement(
781 "DELETE FROM " + mSnapshotDBName +
782 ".sampleblocks WHERE blockid NOT IN (SELECT block_id FROM block_hashes WHERE project_id = ? AND inSnapshot(hash))");
784 if (!deleteBlocksStatement)
787 auto result = deleteBlocksStatement->Prepare(mProjectInfo.Id).Run();
792 auto deleteHashesStatement = db->CreateStatement(
793 "DELETE FROM block_hashes WHERE project_id = ? AND NOT inSnapshot(hash)");
795 if (!deleteHashesStatement)
798 result = deleteHashesStatement->Prepare(mProjectInfo.Id).Run();
803 transaction.Commit();
806bool RemoteProjectSnapshotState::IsComplete() const noexcept
808 return (BlocksDownloaded == BlocksTotal && ProjectDownloaded) ||
Declare functions to perform UTF-8 to std::wstring conversions.
const TranslatableString name
Declare an interface for HTTP response.
constexpr IntType SwapIntBytes(IntType value) noexcept
Swap bytes in an integer.
bool IsLittleEndian() noexcept
Check that machine is little-endian.
Declare a class for performing HTTP requests.
Declare a class for constructing HTTP requests.
std::string ToUpper(const std::string &str)
static CloudProjectsDatabase & Get()
std::optional< DBProjectData > GetProjectData(std::string_view projectId) const
sqlite::SafeConnection::Lock GetConnection()
std::unordered_set< std::string > CalculateKnownBlocks(const std::string &attachedDbName) const
std::atomic< State > mState
const SnapshotInfo mSnapshotInfo
void DownloadBlob(std::string url, SuccessHandler onSuccess, int retries=3)
void OnFailure(ResponseResult result)
std::atomic< int64_t > mDownloadedBlocks
std::thread mRequestsThread
std::condition_variable mRequestsCV
std::optional< std::future< bool > > mCopyBlocksFuture
std::atomic< bool > mProjectDownloaded
std::atomic< int64_t > mDownloadedBytes
RemoteProjectSnapshotStateCallback mCallback
void RemoveResponse(audacity::network_manager::IResponse *response)
TransferStats GetTransferStats() const
static std::shared_ptr< RemoteProjectSnapshot > Sync(ProjectInfo projectInfo, SnapshotInfo snapshotInfo, std::string path, RemoteProjectSnapshotStateCallback callback, bool downloadDetached)
RemoteProjectSnapshot(Tag, ProjectInfo projectInfo, SnapshotInfo snapshotInfo, std::string path, RemoteProjectSnapshotStateCallback callback, bool downloadDetached)
std::vector< std::string > mAttachedDBNames
std::vector< std::shared_ptr< audacity::network_manager::IResponse > > mResponses
std::atomic< int64_t > mCopiedBlocks
std::mutex mResponsesMutex
std::function< void(audacity::network_manager::ResponsePtr)> SuccessHandler
const std::string mSnapshotDBName
void CleanupOrphanBlocks()
const bool mDownloadDetached
std::string_view GetProjectId() const
std::vector< std::pair< std::string, SuccessHandler > > mRequests
const ProjectInfo mProjectInfo
void SetupBlocksCopy(const std::string &dbName, std::unordered_set< std::string > blocks)
void SetState(State state)
std::condition_variable mResponsesEmptyCV
void MarkProjectInDB(bool successfulDownload)
std::string AttachOriginalDB()
Interface, that provides access to the data from the HTTP response.
virtual uint64_t getBytesAvailable() const noexcept=0
virtual uint64_t readData(void *buffer, uint64_t maxBytesCount)=0
static NetworkManager & GetInstance()
ResponsePtr doGet(const Request &request)
Result< Statement > CreateStatement(std::string_view sql) const
Prepares the given SQL statement for execution.
Services * Get()
Fetch the global instance, or nullptr if none is yet installed.
std::vector< std::string > ListAttachedDatabases()
std::vector< uint8_t > ReadResponseData(audacity::network_manager::IResponse &response)
std::optional< DecompressedBlock > DecompressBlock(const void *data, const std::size_t size)
std::function< void(RemoteProjectSnapshotState)> RemoteProjectSnapshotStateCallback
ResponseResult GetResponseResult(IResponse &response, bool readBody)
std::shared_ptr< IResponse > ResponsePtr
std::string ToUTF8(const std::wstring &wstr)
TransferStats & SetBytesTransferred(int64_t bytesTransferred)
TransferStats & SetProjectFilesTransferred(int64_t projectFilesTransferred)
TransferStats & SetBlocksTransferred(int64_t blocksTransferred)
TransferStats & SetTransferDuration(Duration transferDuration)
std::vector< SnapshotBlockInfo > Blocks