29 void run()
override { owner.runThread(); }
37 : useMessageThread (callbacksOnMessageThread),
38 magicMessageHeader (magicMessageHeaderNumber)
45 callbackConnectionState =
false;
47 masterReference.clear();
53 int portNumber,
int timeOutMillisecs)
60 if (socket->connect (hostName, portNumber, timeOutMillisecs))
62 threadIsRunning =
true;
64 thread->startThread();
76 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
78 if (newPipe->openExisting (pipeName))
81 pipeReceiveMessageTimeout = timeoutMs;
82 initialiseWithPipe (newPipe.release());
93 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
95 if (newPipe->createNewPipe (pipeName, mustNotExist))
98 pipeReceiveMessageTimeout = timeoutMs;
99 initialiseWithPipe (newPipe.release());
108 thread->signalThreadShouldExit();
112 if (socket !=
nullptr) socket->close();
113 if (pipe !=
nullptr) pipe->close();
116 thread->stopThread (4000);
117 deletePipeAndSocket();
121void InterprocessConnection::deletePipeAndSocket()
132 return ((socket !=
nullptr && socket->isConnected())
133 || (pipe !=
nullptr && pipe->isOpen()))
142 if (pipe ==
nullptr && socket ==
nullptr)
145 if (socket !=
nullptr && ! socket->isLocal())
146 return socket->getHostName();
149 return IPAddress::local().toString();
159 messageData.
copyFrom (messageHeader, 0,
sizeof (messageHeader));
162 return writeData (messageData.
getData(), (
int) messageData.
getSize()) == (int) messageData.
getSize();
165int InterprocessConnection::writeData (
void* data,
int dataSize)
169 if (socket !=
nullptr)
170 return socket->write (data, dataSize);
173 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
179void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
181 jassert (socket ==
nullptr && pipe ==
nullptr);
182 socket.reset (newSocket);
184 threadIsRunning =
true;
186 thread->startThread();
189void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
191 jassert (socket ==
nullptr && pipe ==
nullptr);
192 pipe.reset (newPipe);
194 threadIsRunning =
true;
196 thread->startThread();
203 : owner (ipc), connectionMade (connected)
206 void messageCallback()
override
208 if (
auto* ipc = owner.get())
223void InterprocessConnection::connectionMadeInt()
225 if (! callbackConnectionState)
227 callbackConnectionState =
true;
229 if (useMessageThread)
236void InterprocessConnection::connectionLostInt()
238 if (callbackConnectionState)
240 callbackConnectionState =
false;
242 if (useMessageThread)
243 (
new ConnectionStateMessage (
this,
false))->post();
252 : owner (ipc), data (d)
255 void messageCallback()
override
257 if (
auto* ipc = owner.get())
265void InterprocessConnection::deliverDataInt (
const MemoryBlock& data)
267 jassert (callbackConnectionState);
269 if (useMessageThread)
276int InterprocessConnection::readData (
void* data,
int num)
278 if (socket !=
nullptr)
279 return socket->read (data, num,
true);
282 return pipe->read (data, num, pipeReceiveMessageTimeout);
288bool InterprocessConnection::readNextMessage()
290 uint32 messageHeader[2];
291 auto bytes = readData (messageHeader,
sizeof (messageHeader));
293 if (bytes ==
sizeof (messageHeader)
298 if (bytesInMessage > 0)
300 MemoryBlock messageData ((
size_t) bytesInMessage,
true);
303 while (bytesInMessage > 0)
305 if (thread->threadShouldExit())
308 auto numThisTime = jmin (bytesInMessage, 65536);
309 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
314 bytesRead += bytesIn;
315 bytesInMessage -= bytesIn;
319 deliverDataInt (messageData);
327 if (socket !=
nullptr)
328 deletePipeAndSocket();
336void InterprocessConnection::runThread()
338 while (! thread->threadShouldExit())
340 if (socket !=
nullptr)
342 auto ready = socket->waitUntilReady (
true, 100);
346 deletePipeAndSocket();
357 else if (pipe !=
nullptr)
359 if (! pipe->isOpen())
361 deletePipeAndSocket();
371 if (thread->threadShouldExit() || ! readNextMessage())
375 threadIsRunning =
false;
static Type swapIfBigEndian(Type value) noexcept
Swaps the byte order of a signed or unsigned integer if the CPU is big-endian.
Automatically locks and unlocks a mutex object.
Manages a simple two-way messaging connection to another process, using either a socket or a named pi...
virtual void connectionMade()=0
Called when the connection is first connected.
virtual void messageReceived(const MemoryBlock &message)=0
Called when a message arrives.
String getConnectedHostName() const
Returns the name of the machine at the other end of this connection.
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
Creates a connection.
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
Tries to create a new pipe for other processes to connect to.
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
Tries to connect the object to an existing named pipe.
void disconnect()
Disconnects and closes any currently-open sockets or pipes.
bool isConnected() const
True if a socket or pipe is currently active.
virtual ~InterprocessConnection()
Destructor.
virtual void connectionLost()=0
Called when the connection is broken.
bool sendMessage(const MemoryBlock &message)
Tries to send a message to the other end of this connection.
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
Tries to connect this object to a socket.
A class to hold a resizable block of raw data.
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
Copies data into this MemoryBlock from a memory address.
size_t getSize() const noexcept
Returns the block's current allocated size, in bytes.
void * getData() const noexcept
Returns a void pointer to the data.
Internal class used as the base class for all message objects.
The base class for objects that can be sent to a MessageListener.
A cross-process pipe that can have data written to and read from it.
A wrapper for a streaming (TCP) socket.
This class acts as a pointer which will automatically become null if the object to which it points is...
void run() override
Must be implemented to perform the thread's actual code.