Audacity 3.2.0
Public Member Functions | Private Attributes | Static Private Attributes | List of all members
BufferedIPCChannel Class Referencefinal

Socket-based implementation of IPCChannel that uses intermediate buffer for data exchange between client and server. More...

#include <BufferedIPCChannel.h>

Inheritance diagram for BufferedIPCChannel:
[legend]
Collaboration diagram for BufferedIPCChannel:
[legend]

Public Member Functions

 BufferedIPCChannel ()
 
 ~BufferedIPCChannel () override
 Destroys channel and stops any data exchange. More...
 
void Send (const void *bytes, size_t length) override
 Thread-safe. More...
 
void StartConversation (SOCKET socket, IPCChannelStatusCallback &callback)
 Allowed to be called only once during object lifetime. Takes ownership over a socket. Callback should be guaranteed to be alive between IPCChannelStatusCallback::OnConnect and IPCChannelStatusCallback::OnDisconnect, and will be accessed from multiple threads. More...
 
- Public Member Functions inherited from IPCChannel
virtual ~IPCChannel ()
 
virtual void Send (const void *bytes, size_t length)=0
 Write data to the channel. More...
 

Private Attributes

bool mAlive {true}
 
std::mutex mSocketSync
 
std::condition_variable mSendCondition
 
std::unique_ptr< std::thread > mRecvRoutine
 
std::unique_ptr< std::thread > mSendRoutine
 
SOCKET mSocket {INVALID_SOCKET}
 
std::vector< char > mOutputBuffer
 

Static Private Attributes

static constexpr int DefaultOutputBufferCapacity { 2048 }
 
static constexpr int DefaultInputBufferSize { 2048 }
 

Detailed Description

Socket-based implementation of IPCChannel that uses intermediate buffer for data exchange between client and server.

Definition at line 30 of file BufferedIPCChannel.h.

Constructor & Destructor Documentation

◆ BufferedIPCChannel()

BufferedIPCChannel::BufferedIPCChannel ( )

Definition at line 21 of file BufferedIPCChannel.cpp.

22{
24}
std::vector< char > mOutputBuffer
static constexpr int DefaultOutputBufferCapacity

References DefaultOutputBufferCapacity, and mOutputBuffer.

◆ ~BufferedIPCChannel()

BufferedIPCChannel::~BufferedIPCChannel ( )
override

Destroys channel and stops any data exchange.

Definition at line 26 of file BufferedIPCChannel.cpp.

27{
29 {
30 //Shut down connection and wake up select
31 //No need to check possible error codes set by this call
32#ifdef _WIN32
33 shutdown(mSocket, SD_BOTH);
34#else
35 shutdown(mSocket, SHUT_RDWR);
36#endif
37 //Make sure all socket IO operations complete before close
38 if(mSendRoutine)
39 mSendRoutine->join();
40 if(mRecvRoutine)
41 mRecvRoutine->join();
42
44 }
45}
std::unique_ptr< std::thread > mSendRoutine
std::unique_ptr< std::thread > mRecvRoutine
#define INVALID_SOCKET
Definition: ipc-types.h:28
#define CLOSE_SOCKET
Definition: ipc-types.h:31

References CLOSE_SOCKET, INVALID_SOCKET, mRecvRoutine, mSendRoutine, and mSocket.

Member Function Documentation

◆ Send()

void BufferedIPCChannel::Send ( const void *  bytes,
size_t  length 
)
overridevirtual

Thread-safe.

Implements IPCChannel.

Definition at line 47 of file BufferedIPCChannel.cpp.

48{
49 assert(length > 0);
50 if(length == 0)
51 return;
52
53 {
54 std::lock_guard lck(mSocketSync);
55
56 auto offset = mOutputBuffer.size();
57 mOutputBuffer.resize(offset + length);
58 std::memcpy(mOutputBuffer.data() + offset, bytes, length);
59 }
60 mSendCondition.notify_one();
61}
std::condition_variable mSendCondition

References mOutputBuffer, mSendCondition, and mSocketSync.

◆ StartConversation()

void BufferedIPCChannel::StartConversation ( SOCKET  socket,
IPCChannelStatusCallback callback 
)

Allowed to be called only once during object lifetime. Takes ownership over a socket. Callback should be guaranteed to be alive between IPCChannelStatusCallback::OnConnect and IPCChannelStatusCallback::OnDisconnect, and will be accessed from multiple threads.

Parameters
socketA valid socket on which data exchange should happen
callbackUsed to send status updates

Definition at line 63 of file BufferedIPCChannel.cpp.

64{
65 assert(socket != INVALID_SOCKET);
67 mSocket = socket;
68
69 //create "sending" thread first, it should be blocked
70 //on condition until both IPCChannelStatusCallback::OnConnect
71 //and IPCChannel::Send are called (in that sequence)
72 mSendRoutine = std::make_unique<std::thread>([this]
73 {
74 std::vector<char> secondaryOutputBuffer;
75 secondaryOutputBuffer.reserve(DefaultOutputBufferCapacity);
76
77 while(true)
78 {
79 std::unique_lock lck(mSocketSync);
80 mSendCondition.wait(lck, [this]{ return !mAlive || !mOutputBuffer.empty(); });
81
82 if(!mAlive)
83 return;
84
85 std::swap(secondaryOutputBuffer, mOutputBuffer);
86 mOutputBuffer.clear();
87
88 lck.unlock();
89
90 {
91 int offset = 0;
92 while(offset < static_cast<int>(secondaryOutputBuffer.size()))
93 {
94 fd_set writefds, exceptfds;
95 FD_ZERO(&writefds);
96 FD_ZERO(&exceptfds);
97 FD_SET(mSocket, &writefds);
98 FD_SET(mSocket, &exceptfds);
99 auto ret = select(NFDS(mSocket), nullptr, &writefds, &exceptfds, nullptr);
100 if(ret == 1)
101 {
102 //try to send data...
103 ret = send(
104 mSocket,
105 secondaryOutputBuffer.data() + offset,
106 static_cast<int>(secondaryOutputBuffer.size()) - offset,
107 0);
108 if(ret > 0)
109 offset += ret;
110 else if (ret == SOCKET_ERROR)
111 break;//error
112 }
113 else
114 break;//error
115 }
116 }
117 }
118 });
119
120 mRecvRoutine = std::make_unique<std::thread>([this, &callback]{
121 //such order guarantees that IPCStatusCallback::OnConnect will be called
122 //only if both routines have started successfully
123 callback.OnConnect(*this);
124
125 auto terminate = finally([this, &callback]
126 {
127 {
128 //Let "sending" thread know that we're done
129 std::lock_guard lck(mSocketSync);
130 mAlive = false;
131 }
132 mSendCondition.notify_one();
133
134 //It may happen so, that "sending" thread sends some data while
135 //"reading" thread notifies callback about disconnection. It's
136 //not a big deal since a) sending may fail b) from the user code
137 //perspective IPCChannel::Send was called before it receives OnDisconnect
138 callback.OnDisconnect();
139 });
140
141 std::vector<char> buffer(DefaultInputBufferSize);
142 while(true)
143 {
144 fd_set readfds, exceptfds;
145 FD_ZERO(&readfds);
146 FD_ZERO(&exceptfds);
147 FD_SET(mSocket, &readfds);
148 FD_SET(mSocket, &exceptfds);
149
150 auto ret = select(NFDS(mSocket), &readfds, nullptr, &exceptfds, nullptr);
151 if(ret == 1)
152 {
153 //try fetch data...
154 ret = recv(
155 mSocket,
156 buffer.data(),
157 static_cast<int>(buffer.size()),
158 0);
159 if(ret > 0)
160 //success
161 callback.OnDataAvailable(buffer.data(), ret);
162 else if (ret == SOCKET_ERROR)
163 {
164#ifdef _WIN32
165 auto err = WSAGetLastError();
166 if(err != WSAEWOULDBLOCK && err != EAGAIN)
167 break;
168#else
169 if(errno != EWOULDBLOCK && errno != EAGAIN)
170 break;
171#endif
172 }
173 else
174 break;//closed by remote(0)
175 }
176 else//SOCKET_ERROR
177 break;
178 }
179
180 });
181}
static constexpr int DefaultInputBufferSize
virtual void OnDataAvailable(const void *data, size_t size) noexcept=0
Called when data chunk received as a result of a preceding call to IPCChannel::Send....
virtual void OnDisconnect() noexcept=0
Invalidates IPCChannel passed as argument in OnConnect.
virtual void OnConnect(IPCChannel &channel) noexcept=0
Called when connection established.
#define NFDS(x)
Definition: ipc-types.h:32
#define SOCKET_ERROR
Definition: ipc-types.h:29
void swap(std::unique_ptr< Alg_seq > &a, std::unique_ptr< Alg_seq > &b)
Definition: NoteTrack.cpp:628

References DefaultInputBufferSize, DefaultOutputBufferCapacity, INVALID_SOCKET, mAlive, mOutputBuffer, mRecvRoutine, mSendCondition, mSendRoutine, mSocket, mSocketSync, NFDS, IPCChannelStatusCallback::OnConnect(), IPCChannelStatusCallback::OnDataAvailable(), IPCChannelStatusCallback::OnDisconnect(), SOCKET_ERROR, and anonymous_namespace{NoteTrack.cpp}::swap().

Here is the call graph for this function:

Member Data Documentation

◆ DefaultInputBufferSize

constexpr int BufferedIPCChannel::DefaultInputBufferSize { 2048 }
staticconstexprprivate

Definition at line 33 of file BufferedIPCChannel.h.

Referenced by StartConversation().

◆ DefaultOutputBufferCapacity

constexpr int BufferedIPCChannel::DefaultOutputBufferCapacity { 2048 }
staticconstexprprivate

Definition at line 32 of file BufferedIPCChannel.h.

Referenced by BufferedIPCChannel(), and StartConversation().

◆ mAlive

bool BufferedIPCChannel::mAlive {true}
private

Definition at line 35 of file BufferedIPCChannel.h.

Referenced by StartConversation().

◆ mOutputBuffer

std::vector<char> BufferedIPCChannel::mOutputBuffer
private

Definition at line 44 of file BufferedIPCChannel.h.

Referenced by BufferedIPCChannel(), Send(), and StartConversation().

◆ mRecvRoutine

std::unique_ptr<std::thread> BufferedIPCChannel::mRecvRoutine
private

Definition at line 39 of file BufferedIPCChannel.h.

Referenced by StartConversation(), and ~BufferedIPCChannel().

◆ mSendCondition

std::condition_variable BufferedIPCChannel::mSendCondition
private

Definition at line 37 of file BufferedIPCChannel.h.

Referenced by Send(), and StartConversation().

◆ mSendRoutine

std::unique_ptr<std::thread> BufferedIPCChannel::mSendRoutine
private

Definition at line 40 of file BufferedIPCChannel.h.

Referenced by StartConversation(), and ~BufferedIPCChannel().

◆ mSocket

SOCKET BufferedIPCChannel::mSocket {INVALID_SOCKET}
private

Definition at line 42 of file BufferedIPCChannel.h.

Referenced by StartConversation(), and ~BufferedIPCChannel().

◆ mSocketSync

std::mutex BufferedIPCChannel::mSocketSync
private

Definition at line 36 of file BufferedIPCChannel.h.

Referenced by Send(), and StartConversation().


The documentation for this class was generated from the following files: