Audacity 3.2.0
BufferedIPCChannel.cpp
Go to the documentation of this file.
1/**********************************************************************
2
3 Audacity: A Digital Audio Editor
4
5 @file BufferedIPCChannel.cpp
6
7 @author Vitaly Sverchinsky
8
9 Part of lib-ipc library
10
11**********************************************************************/
12
13#include "BufferedIPCChannel.h"
14
15#include <cassert>
16#include <cstring>
17#include <mutex>
18#include "MemoryX.h"
19#include "socket_guard.h"
20
22{
24}
25
27{
29 {
30 //Shut down connection and wake up select
31 //No need to check possible error codes set by this call
32#ifdef _WIN32
33 shutdown(mSocket, SD_BOTH);
34#else
35 shutdown(mSocket, SHUT_RDWR);
36#endif
37 //Make sure all socket IO operations complete before close
38 if(mSendRoutine)
39 mSendRoutine->join();
40 if(mRecvRoutine)
41 mRecvRoutine->join();
42
44 }
45}
46
47void BufferedIPCChannel::Send(const void* bytes, size_t length)
48{
49 assert(length > 0);
50 if(length == 0)
51 return;
52
53 {
54 std::lock_guard lck(mSocketSync);
55
56 auto offset = mOutputBuffer.size();
57 mOutputBuffer.resize(offset + length);
58 std::memcpy(mOutputBuffer.data() + offset, bytes, length);
59 }
60 mSendCondition.notify_one();
61}
62
64{
65 assert(socket != INVALID_SOCKET);
67 mSocket = socket;
68
69 //create "sending" thread first, it should be blocked
70 //on condition until both IPCChannelStatusCallback::OnConnect
71 //and IPCChannel::Send are called (in that sequence)
72 mSendRoutine = std::make_unique<std::thread>([this]
73 {
74 std::vector<char> secondaryOutputBuffer;
75 secondaryOutputBuffer.reserve(DefaultOutputBufferCapacity);
76
77 while(true)
78 {
79 std::unique_lock lck(mSocketSync);
80 mSendCondition.wait(lck, [this]{ return !mAlive || !mOutputBuffer.empty(); });
81
82 if(!mAlive)
83 return;
84
85 std::swap(secondaryOutputBuffer, mOutputBuffer);
86 mOutputBuffer.clear();
87
88 lck.unlock();
89
90 {
91 int offset = 0;
92 while(offset < static_cast<int>(secondaryOutputBuffer.size()))
93 {
94 fd_set writefds, exceptfds;
95 FD_ZERO(&writefds);
96 FD_ZERO(&exceptfds);
97 FD_SET(mSocket, &writefds);
98 FD_SET(mSocket, &exceptfds);
99 auto ret = select(NFDS(mSocket), nullptr, &writefds, &exceptfds, nullptr);
100 if(ret == 1)
101 {
102 //try to send data...
103 ret = send(
104 mSocket,
105 secondaryOutputBuffer.data() + offset,
106 static_cast<int>(secondaryOutputBuffer.size()) - offset,
107 0);
108 if(ret > 0)
109 offset += ret;
110 else if (ret == SOCKET_ERROR)
111 break;//error
112 }
113 else
114 break;//error
115 }
116 }
117 }
118 });
119
120 mRecvRoutine = std::make_unique<std::thread>([this, &callback]{
121 //such order guarantees that IPCStatusCallback::OnConnect will be called
122 //only if both routines have started successfully
123 callback.OnConnect(*this);
124
125 auto terminate = finally([this, &callback]
126 {
127 {
128 //Let "sending" thread know that we're done
129 std::lock_guard lck(mSocketSync);
130 mAlive = false;
131 }
132 mSendCondition.notify_one();
133
134 //It may happen so, that "sending" thread sends some data while
135 //"reading" thread notifies callback about disconnection. It's
136 //not a big deal since a) sending may fail b) from the user code
137 //perspective IPCChannel::Send was called before it receives OnDisconnect
138 callback.OnDisconnect();
139 });
140
141 std::vector<char> buffer(DefaultInputBufferSize);
142 while(true)
143 {
144 fd_set readfds, exceptfds;
145 FD_ZERO(&readfds);
146 FD_ZERO(&exceptfds);
147 FD_SET(mSocket, &readfds);
148 FD_SET(mSocket, &exceptfds);
149
150 auto ret = select(NFDS(mSocket), &readfds, nullptr, &exceptfds, nullptr);
151 if(ret == 1)
152 {
153 //try fetch data...
154 ret = recv(
155 mSocket,
156 buffer.data(),
157 static_cast<int>(buffer.size()),
158 0);
159 if(ret > 0)
160 //success
161 callback.OnDataAvailable(buffer.data(), ret);
162 else if (ret == SOCKET_ERROR)
163 {
164#ifdef _WIN32
165 auto err = WSAGetLastError();
166 if(err != WSAEWOULDBLOCK && err != EAGAIN)
167 break;
168#else
169 if(errno != EWOULDBLOCK && errno != EAGAIN)
170 break;
171#endif
172 }
173 else
174 break;//closed by remote(0)
175 }
176 else//SOCKET_ERROR
177 break;
178 }
179
180 });
181}
182
std::condition_variable mSendCondition
void Send(const void *bytes, size_t length) override
Thread-safe.
std::vector< char > mOutputBuffer
static constexpr int DefaultOutputBufferCapacity
static constexpr int DefaultInputBufferSize
~BufferedIPCChannel() override
Destroys channel and stops any data exchange.
std::unique_ptr< std::thread > mSendRoutine
std::unique_ptr< std::thread > mRecvRoutine
void StartConversation(SOCKET socket, IPCChannelStatusCallback &callback)
Allowed to be called only once during object lifetime. Takes ownership over a socket....
Interface for listening connection status changes.
Definition: IPCChannel.h:37
virtual void OnDataAvailable(const void *data, size_t size) noexcept=0
Called when data chunk received as a result of a preceding call to IPCChannel::Send....
virtual void OnDisconnect() noexcept=0
Invalidates IPCChannel passed as argument in OnConnect.
virtual void OnConnect(IPCChannel &channel) noexcept=0
Called when connection established.
#define NFDS(x)
Definition: ipc-types.h:32
#define INVALID_SOCKET
Definition: ipc-types.h:28
#define SOCKET_ERROR
Definition: ipc-types.h:29
#define CLOSE_SOCKET
Definition: ipc-types.h:31
#define SOCKET
Definition: ipc-types.h:30
void swap(std::unique_ptr< Alg_seq > &a, std::unique_ptr< Alg_seq > &b)
Definition: NoteTrack.cpp:634