14#include <unordered_set>
16#include <wx/datetime.h>
39 auto result = statement->Prepare().Run();
41 std::vector<std::string> attachedDBs;
43 for (
auto row : result)
47 if (!row.Get(1, dbName))
50 if (dbName ==
"main" || dbName ==
"temp")
53 attachedDBs.push_back(std::move(dbName));
63 : mSnapshotDBName {
std::string(
"s_") + projectInfo.Id }
64 , mProjectInfo {
std::move(projectInfo) }
65 , mSnapshotInfo {
std::move(snapshotInfo) }
66 , mPath {
std::move(path) }
67 , mCallback {
std::move(callback) }
68 , mDownloadDetached { downloadDetached }
128 [
this](
auto response) { OnProjectBlobDownloaded(response); }));
132 if (knownBlocks.find(
ToUpper(block.Hash)) != knownBlocks.end())
136 block.Url, [
this, hash =
ToUpper(block.Hash)](
auto response)
137 { OnBlockDownloaded(std::move(hash), response); }));
164 detachStmt->Prepare(dbName).Run();
172 auto snapshot = std::make_shared<RemoteProjectSnapshot>(
173 Tag {}, std::move(projectInfo), std::move(snapshotInfo), std::move(path),
174 std::move(callback), downloadDetached);
176 if (snapshot->mAttachedDBNames.empty())
181 XO(
"Failed to attach to the Cloud project database")
190 if (snapshot->mNothingToDo)
208 const auto duration =
218 std::chrono::duration_cast<TransferStats::Duration>(duration));
230 const auto projectData =
241 auto result = attachStmt->Prepare(projectData->LocalPath, dbName).Run();
252 const std::string& dbName, std::unordered_set<std::string> blocks)
263 [
this, dbName = dbName, blocks = std::move(blocks)]()
265 const auto queryString =
270 ".sampleblocks WHERE blockid IN (SELECT block_id FROM block_hashes WHERE hash = ?)";
274 for (
const auto& block : blocks)
283 if (!copyBlocksStatement)
293 auto result = copyBlocksStatement->Prepare(block).Run();
305 const auto rowsUpdated = result.GetModifiedRowsCount();
306 mCopiedBlocks.fetch_add(rowsUpdated, std::memory_order_acq_rel);
316 const std::string& attachedDbName)
const
318 std::unordered_set<std::string> remoteBlocks;
321 remoteBlocks.insert(
ToUpper(block.Hash));
325 auto fn = db->CreateScalarFunction(
326 "inRemoteBlocks", [&remoteBlocks](
const std::string& hash)
327 {
return remoteBlocks.find(hash) != remoteBlocks.end(); });
329 auto statement = db->CreateStatement(
330 "SELECT hash FROM block_hashes WHERE project_id = ? AND inRemoteBlocks(hash) AND block_id IN (SELECT blockid FROM " +
331 attachedDbName +
".sampleblocks)");
338 std::unordered_set<std::string> knownBlocks;
340 for (
auto row : result)
344 if (!row.Get(0, hash))
347 knownBlocks.insert(hash);
383 response->setRequestFinishedCallback(
384 [
this, self = weak_from_this(), onSuccess = std::move(onSuccess), retries, response](
auto)
386 auto strong = self.lock();
391 response->getBytesAvailable(), std::memory_order_acq_rel);
404 OnFailure(std::move(responseResult));
412 OnFailure(std::move(responseResult));
417 response->getRequest().getURL(), std::move(onSuccess),
435 return response.
readAll<std::vector<uint8_t>>();
437 std::vector<uint8_t> data(
size);
445void RemoteProjectSnapshot::OnProjectBlobDownloaded(
449 uint64_t dictSize = 0;
451 if (data.size() <
sizeof(uint64_t))
457 std::memcpy(&dictSize, data.data(),
sizeof(uint64_t));
462 if (data.size() <
sizeof(uint64_t) + dictSize)
469 auto transaction = db->BeginTransaction(
"p_" + mProjectInfo.Id);
471 auto updateProjectStatement = db->CreateStatement(
472 "INSERT INTO " + mSnapshotDBName +
473 ".project (id, dict, doc) VALUES (1, ?1, ?2) "
474 "ON CONFLICT(id) DO UPDATE SET dict = ?1, doc = ?2");
476 if (!updateProjectStatement)
485 auto& preparedUpdateProjectStatement = updateProjectStatement->Prepare();
487 preparedUpdateProjectStatement.Bind(
488 1, data.data() +
sizeof(uint64_t), dictSize,
false);
490 preparedUpdateProjectStatement.Bind(
491 2, data.data() +
sizeof(uint64_t) + dictSize,
492 data.size() -
sizeof(uint64_t) - dictSize,
false);
494 auto result = preparedUpdateProjectStatement.Run();
501 result.GetErrors().front().GetErrorString().Translation()) });
506 auto deleteAutosaveStatement = db->CreateStatement(
507 "DELETE FROM " + mSnapshotDBName +
".autosave WHERE id = 1");
509 if (!deleteAutosaveStatement)
518 result = deleteAutosaveStatement->Prepare().Run();
525 result.GetErrors().front().GetErrorString().Translation()) });
529 if (
auto error = transaction.Commit(); error.IsError())
536 mProjectDownloaded.store(
true, std::memory_order_release);
540void RemoteProjectSnapshot::OnBlockDownloaded(
545 const auto blockData =
558 auto transaction = db->BeginTransaction(
"b_" + blockHash);
560 auto hashesStatement = db->CreateStatement(
561 "INSERT INTO block_hashes (project_id, block_id, hash) VALUES (?1, ?2, ?3) "
562 "ON CONFLICT(project_id, block_id) DO UPDATE SET hash = ?3");
565 hashesStatement->Prepare(mProjectInfo.Id, blockData->BlockId, blockHash)
573 result.GetErrors().front().GetErrorString().Translation()) });
577 auto blockStatement = db->CreateStatement(
578 "INSERT INTO " + mSnapshotDBName +
579 ".sampleblocks (blockid, sampleformat, summin, summax, sumrms, summary256, summary64k, samples) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) "
580 "ON CONFLICT(blockid) DO UPDATE SET sampleformat = ?2, summin = ?3, summax = ?4, sumrms = ?5, summary256 = ?6, summary64k = ?7, samples = ?8");
587 blockStatement.GetError().GetErrorString().Translation()) });
591 auto& preparedStatement = blockStatement->Prepare();
593 preparedStatement.Bind(1, blockData->BlockId);
594 preparedStatement.Bind(2,
static_cast<int64_t
>(blockData->Format));
595 preparedStatement.Bind(3, blockData->BlockMinMaxRMS.Min);
596 preparedStatement.Bind(4, blockData->BlockMinMaxRMS.Max);
597 preparedStatement.Bind(5, blockData->BlockMinMaxRMS.RMS);
598 preparedStatement.Bind(
599 6, blockData->Summary256.data(),
600 blockData->Summary256.size() *
sizeof(
MinMaxRMS),
false);
601 preparedStatement.Bind(
602 7, blockData->Summary64k.data(),
603 blockData->Summary64k.size() *
sizeof(
MinMaxRMS),
false);
604 preparedStatement.Bind(
605 8, blockData->Data.data(), blockData->Data.size(),
false);
607 result = preparedStatement.Run();
614 result.GetErrors().front().GetErrorString().Translation()) });
618 if (
auto error = transaction.Commit(); error.IsError())
625 mDownloadedBlocks.fetch_add(1, std::memory_order_acq_rel);
632 SetState(State::Failed);
634 mDownloadedBlocks.load(std::memory_order_acquire) +
635 mCopiedBlocks.load(std::memory_order_acquire),
637 mProjectDownloaded.load(std::memory_order_acquire) });
640void RemoteProjectSnapshot::RemoveResponse(
644 auto lock = std::lock_guard { mResponsesMutex };
647 mResponses.begin(), mResponses.end(),
648 [response](
auto& r) { return r.get() == response; }),
651 if (mResponses.empty())
652 mResponsesEmptyCV.notify_all();
655 auto lock = std::lock_guard { mRequestsMutex };
656 mRequestsInProgress--;
657 mRequestsCV.notify_one();
661void RemoteProjectSnapshot::MarkProjectInDB(
bool successfulDownload)
663 if (mDownloadDetached)
667 auto currentData = db.GetProjectData(mProjectInfo.Id);
672 data.SnapshotId = mSnapshotInfo.Id;
673 data.SyncStatus = successfulDownload ? DBProjectData::SyncStatusSynced :
674 DBProjectData::SyncStatusDownloading;
675 data.LastRead = wxDateTime::Now().GetTicks();
676 data.LocalPath = mPath;
678 if (data.SavesCount == 0)
682 data.FirstSyncDialogShown =
true;
684 db.UpdateProjectData(data);
686 if (successfulDownload)
687 db.SetProjectUserSlug(mProjectInfo.Id, mProjectInfo.Username);
690void RemoteProjectSnapshot::ReportProgress()
692 if (
mState.load(std::memory_order_acquire) != State::Downloading)
695 const auto projectDownloaded =
696 mProjectDownloaded.load(std::memory_order_acquire);
697 const auto blocksDownloaded =
698 mDownloadedBlocks.load(std::memory_order_acquire);
700 const auto blockCopied = mCopiedBlocks.load(std::memory_order_acquire);
702 const auto processedBlocks = blocksDownloaded + blockCopied;
704 const auto completed =
705 processedBlocks == mMissingBlocks && projectDownloaded;
709 CleanupOrphanBlocks();
710 SetState(State::Succeeded);
711 MarkProjectInDB(
true);
714 mCallback({ {}, processedBlocks, mMissingBlocks, projectDownloaded });
717bool RemoteProjectSnapshot::InProgress()
const
719 return mState.load(std::memory_order_acquire) == State::Downloading;
722void RemoteProjectSnapshot::RequestsThread()
724 constexpr auto MAX_CONCURRENT_REQUESTS = 6;
728 std::pair<std::string, SuccessHandler> request;
731 auto lock = std::unique_lock { mRequestsMutex };
733 if (mRequestsInProgress >= MAX_CONCURRENT_REQUESTS)
737 [
this, MAX_CONCURRENT_REQUESTS] {
738 return mRequestsInProgress < MAX_CONCURRENT_REQUESTS ||
746 if (mNextRequestIndex >= mRequests.size())
749 request = mRequests[mNextRequestIndex++];
750 mRequestsInProgress++;
753 DownloadBlob(std::move(request.first), std::move(request.second), 3);
756 std::this_thread::sleep_for(std::chrono::milliseconds(50));
760void RemoteProjectSnapshot::SetState(
State state)
762 if (state != State::Downloading)
763 mEndTime = Clock::now();
772 auto transaction = db->BeginTransaction(
"d_" + mProjectInfo.Id);
774 std::unordered_set<std::string> snaphotBlockHashes;
776 for (
const auto& block : mSnapshotInfo.Blocks)
777 snaphotBlockHashes.insert(
ToUpper(block.Hash));
779 auto inSnaphotFunction = db->CreateScalarFunction(
780 "inSnapshot", [&snaphotBlockHashes](
const std::string& hash)
781 {
return snaphotBlockHashes.find(hash) != snaphotBlockHashes.end(); });
784 auto deleteBlocksStatement = db->CreateStatement(
785 "DELETE FROM " + mSnapshotDBName +
786 ".sampleblocks WHERE blockid NOT IN (SELECT block_id FROM block_hashes WHERE project_id = ? AND inSnapshot(hash))");
788 if (!deleteBlocksStatement)
791 auto result = deleteBlocksStatement->Prepare(mProjectInfo.Id).Run();
796 auto deleteHashesStatement = db->CreateStatement(
797 "DELETE FROM block_hashes WHERE project_id = ? AND NOT inSnapshot(hash)");
799 if (!deleteHashesStatement)
802 result = deleteHashesStatement->Prepare(mProjectInfo.Id).Run();
807 transaction.Commit();
810bool RemoteProjectSnapshotState::IsComplete() const noexcept
812 return (BlocksDownloaded == BlocksTotal && ProjectDownloaded) ||
Declare functions to perform UTF-8 to std::wstring conversions.
Declare an interface for HTTP response.
constexpr IntType SwapIntBytes(IntType value) noexcept
Swap bytes in an integer.
bool IsLittleEndian() noexcept
Check that machine is little-endian.
Declare a class for performing HTTP requests.
Declare a class for constructing HTTP requests.
std::string ToUpper(const std::string &str)
static CloudProjectsDatabase & Get()
std::optional< DBProjectData > GetProjectData(std::string_view projectId) const
sqlite::SafeConnection::Lock GetConnection()
std::unordered_set< std::string > CalculateKnownBlocks(const std::string &attachedDbName) const
std::atomic< State > mState
const SnapshotInfo mSnapshotInfo
void DownloadBlob(std::string url, SuccessHandler onSuccess, int retries=3)
void OnFailure(ResponseResult result)
std::atomic< int64_t > mDownloadedBlocks
std::thread mRequestsThread
std::condition_variable mRequestsCV
std::optional< std::future< bool > > mCopyBlocksFuture
std::atomic< bool > mProjectDownloaded
std::atomic< int64_t > mDownloadedBytes
RemoteProjectSnapshotStateCallback mCallback
void RemoveResponse(audacity::network_manager::IResponse *response)
TransferStats GetTransferStats() const
static std::shared_ptr< RemoteProjectSnapshot > Sync(ProjectInfo projectInfo, SnapshotInfo snapshotInfo, std::string path, RemoteProjectSnapshotStateCallback callback, bool downloadDetached)
RemoteProjectSnapshot(Tag, ProjectInfo projectInfo, SnapshotInfo snapshotInfo, std::string path, RemoteProjectSnapshotStateCallback callback, bool downloadDetached)
std::vector< std::string > mAttachedDBNames
std::vector< std::shared_ptr< audacity::network_manager::IResponse > > mResponses
std::atomic< int64_t > mCopiedBlocks
std::mutex mResponsesMutex
std::function< void(audacity::network_manager::ResponsePtr)> SuccessHandler
const std::string mSnapshotDBName
void CleanupOrphanBlocks()
const bool mDownloadDetached
std::string_view GetProjectId() const
std::vector< std::pair< std::string, SuccessHandler > > mRequests
const ProjectInfo mProjectInfo
void SetupBlocksCopy(const std::string &dbName, std::unordered_set< std::string > blocks)
void SetState(State state)
std::condition_variable mResponsesEmptyCV
void MarkProjectInDB(bool successfulDownload)
std::string AttachOriginalDB()
Interface, that provides access to the data from the HTTP response.
virtual uint64_t getBytesAvailable() const noexcept=0
virtual uint64_t readData(void *buffer, uint64_t maxBytesCount)=0
static NetworkManager & GetInstance()
ResponsePtr doGet(const Request &request)
Result< Statement > CreateStatement(std::string_view sql) const
Prepares the given SQL statement for execution.
Services * Get()
Fetch the global instance, or nullptr if none is yet installed.
std::vector< std::string > ListAttachedDatabases()
std::vector< uint8_t > ReadResponseData(audacity::network_manager::IResponse &response)
std::optional< DecompressedBlock > DecompressBlock(const void *data, const std::size_t size)
std::function< void(RemoteProjectSnapshotState)> RemoteProjectSnapshotStateCallback
ResponseResult GetResponseResult(IResponse &response, bool readBody)
std::shared_ptr< IResponse > ResponsePtr
std::string ToUTF8(const std::wstring &wstr)
TransferStats & SetBytesTransferred(int64_t bytesTransferred)
TransferStats & SetProjectFilesTransferred(int64_t projectFilesTransferred)
TransferStats & SetBlocksTransferred(int64_t blocksTransferred)
TransferStats & SetTransferDuration(Duration transferDuration)
std::vector< SnapshotBlockInfo > Blocks