aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llplugin/llpluginmessagepipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llplugin/llpluginmessagepipe.cpp')
-rwxr-xr-xlinden/indra/llplugin/llpluginmessagepipe.cpp121
1 files changed, 91 insertions, 30 deletions
diff --git a/linden/indra/llplugin/llpluginmessagepipe.cpp b/linden/indra/llplugin/llpluginmessagepipe.cpp
index b16381c..8168b32 100755
--- a/linden/indra/llplugin/llpluginmessagepipe.cpp
+++ b/linden/indra/llplugin/llpluginmessagepipe.cpp
@@ -98,11 +98,14 @@ void LLPluginMessagePipeOwner::killMessagePipe(void)
98 } 98 }
99} 99}
100 100
101LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket) 101LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket):
102 mInputMutex(gAPRPoolp),
103 mOutputMutex(gAPRPoolp),
104 mOwner(owner),
105 mSocket(socket)
102{ 106{
103 mOwner = owner; 107
104 mOwner->setMessagePipe(this); 108 mOwner->setMessagePipe(this);
105 mSocket = socket;
106} 109}
107 110
108LLPluginMessagePipe::~LLPluginMessagePipe() 111LLPluginMessagePipe::~LLPluginMessagePipe()
@@ -116,6 +119,7 @@ LLPluginMessagePipe::~LLPluginMessagePipe()
116bool LLPluginMessagePipe::addMessage(const std::string &message) 119bool LLPluginMessagePipe::addMessage(const std::string &message)
117{ 120{
118 // queue the message for later output 121 // queue the message for later output
122 LLMutexLock lock(&mOutputMutex);
119 mOutput += message; 123 mOutput += message;
120 mOutput += MESSAGE_DELIMITER; // message separator 124 mOutput += MESSAGE_DELIMITER; // message separator
121 125
@@ -151,6 +155,18 @@ void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec)
151 155
152bool LLPluginMessagePipe::pump(F64 timeout) 156bool LLPluginMessagePipe::pump(F64 timeout)
153{ 157{
158 bool result = pumpOutput();
159
160 if(result)
161 {
162 result = pumpInput(timeout);
163 }
164
165 return result;
166}
167
168bool LLPluginMessagePipe::pumpOutput()
169{
154 bool result = true; 170 bool result = true;
155 171
156 if(mSocket) 172 if(mSocket)
@@ -158,6 +174,7 @@ bool LLPluginMessagePipe::pump(F64 timeout)
158 apr_status_t status; 174 apr_status_t status;
159 apr_size_t size; 175 apr_size_t size;
160 176
177 LLMutexLock lock(&mOutputMutex);
161 if(!mOutput.empty()) 178 if(!mOutput.empty())
162 { 179 {
163 // write any outgoing messages 180 // write any outgoing messages
@@ -185,6 +202,17 @@ bool LLPluginMessagePipe::pump(F64 timeout)
185 // remove the written part from the buffer and try again later. 202 // remove the written part from the buffer and try again later.
186 mOutput = mOutput.substr(size); 203 mOutput = mOutput.substr(size);
187 } 204 }
205 else if(APR_STATUS_IS_EOF(status))
206 {
207 // This is what we normally expect when a plugin exits.
208 llinfos << "Got EOF from plugin socket. " << llendl;
209
210 if(mOwner)
211 {
212 mOwner->socketError(status);
213 }
214 result = false;
215 }
188 else 216 else
189 { 217 {
190 // some other error 218 // some other error
@@ -198,6 +226,19 @@ bool LLPluginMessagePipe::pump(F64 timeout)
198 result = false; 226 result = false;
199 } 227 }
200 } 228 }
229 }
230
231 return result;
232}
233
234bool LLPluginMessagePipe::pumpInput(F64 timeout)
235{
236 bool result = true;
237
238 if(mSocket)
239 {
240 apr_status_t status;
241 apr_size_t size;
201 242
202 // FIXME: For some reason, the apr timeout stuff isn't working properly on windows. 243 // FIXME: For some reason, the apr timeout stuff isn't working properly on windows.
203 // Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead. 244 // Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead.
@@ -218,8 +259,16 @@ bool LLPluginMessagePipe::pump(F64 timeout)
218 char input_buf[1024]; 259 char input_buf[1024];
219 apr_size_t request_size; 260 apr_size_t request_size;
220 261
221 // Start out by reading one byte, so that any data received will wake us up. 262 if(timeout == 0.0f)
222 request_size = 1; 263 {
264 // If we have no timeout, start out with a full read.
265 request_size = sizeof(input_buf);
266 }
267 else
268 {
269 // Start out by reading one byte, so that any data received will wake us up.
270 request_size = 1;
271 }
223 272
224 // and use the timeout so we'll sleep if no data is available. 273 // and use the timeout so we'll sleep if no data is available.
225 setSocketTimeout((apr_interval_time_t)(timeout * 1000000)); 274 setSocketTimeout((apr_interval_time_t)(timeout * 1000000));
@@ -238,11 +287,14 @@ bool LLPluginMessagePipe::pump(F64 timeout)
238// LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL; 287// LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL;
239 288
240 if(size > 0) 289 if(size > 0)
290 {
291 LLMutexLock lock(&mInputMutex);
241 mInput.append(input_buf, size); 292 mInput.append(input_buf, size);
293 }
242 294
243 if(status == APR_SUCCESS) 295 if(status == APR_SUCCESS)
244 { 296 {
245// llinfos << "success, read " << size << llendl; 297 LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL;
246 298
247 if(size != request_size) 299 if(size != request_size)
248 { 300 {
@@ -252,16 +304,28 @@ bool LLPluginMessagePipe::pump(F64 timeout)
252 } 304 }
253 else if(APR_STATUS_IS_TIMEUP(status)) 305 else if(APR_STATUS_IS_TIMEUP(status))
254 { 306 {
255// llinfos << "TIMEUP, read " << size << llendl; 307 LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL;
256 308
257 // Timeout was hit. Since the initial read is 1 byte, this should never be a partial read. 309 // Timeout was hit. Since the initial read is 1 byte, this should never be a partial read.
258 break; 310 break;
259 } 311 }
260 else if(APR_STATUS_IS_EAGAIN(status)) 312 else if(APR_STATUS_IS_EAGAIN(status))
261 { 313 {
262// llinfos << "EAGAIN, read " << size << llendl; 314 LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL;
263 315
264 // We've been doing partial reads, and we're done now. 316 // Non-blocking read returned immediately.
317 break;
318 }
319 else if(APR_STATUS_IS_EOF(status))
320 {
321 // This is what we normally expect when a plugin exits.
322 LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL;
323
324 if(mOwner)
325 {
326 mOwner->socketError(status);
327 }
328 result = false;
265 break; 329 break;
266 } 330 }
267 else 331 else
@@ -278,22 +342,18 @@ bool LLPluginMessagePipe::pump(F64 timeout)
278 break; 342 break;
279 } 343 }
280 344
281 // Second and subsequent reads should not use the timeout 345 if(timeout != 0.0f)
282 setSocketTimeout(0); 346 {
283 // and should try to fill the input buffer 347 // Second and subsequent reads should not use the timeout
284 request_size = sizeof(input_buf); 348 setSocketTimeout(0);
349 // and should try to fill the input buffer
350 request_size = sizeof(input_buf);
351 }
285 } 352 }
286 353
287 processInput(); 354 processInput();
288 } 355 }
289 } 356 }
290
291 if(!result)
292 {
293 // If we got an error, we're done.
294 LL_INFOS("Plugin") << "Error from socket, cleaning up." << LL_ENDL;
295 delete this;
296 }
297 357
298 return result; 358 return result;
299} 359}
@@ -301,26 +361,27 @@ bool LLPluginMessagePipe::pump(F64 timeout)
301void LLPluginMessagePipe::processInput(void) 361void LLPluginMessagePipe::processInput(void)
302{ 362{
303 // Look for input delimiter(s) in the input buffer. 363 // Look for input delimiter(s) in the input buffer.
304 int start = 0;
305 int delim; 364 int delim;
306 while((delim = mInput.find(MESSAGE_DELIMITER, start)) != std::string::npos) 365 mInputMutex.lock();
366 while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos)
307 { 367 {
308 // Let the owner process this message 368 // Let the owner process this message
309 if (mOwner) 369 if (mOwner)
310 { 370 {
311 mOwner->receiveMessageRaw(mInput.substr(start, delim - start)); 371 // Pull the message out of the input buffer before calling receiveMessageRaw.
372 // It's now possible for this function to get called recursively (in the case where the plugin makes a blocking request)
373 // and this guarantees that the messages will get dequeued correctly.
374 std::string message(mInput, 0, delim);
375 mInput.erase(0, delim + 1);
376 mInputMutex.unlock();
377 mOwner->receiveMessageRaw(message);
378 mInputMutex.lock();
312 } 379 }
313 else 380 else
314 { 381 {
315 LL_WARNS("Plugin") << "!mOwner" << LL_ENDL; 382 LL_WARNS("Plugin") << "!mOwner" << LL_ENDL;
316 } 383 }
317
318 start = delim + 1;
319 } 384 }
320 385 mInputMutex.unlock();
321 // Remove delivered messages from the input buffer.
322 if(start != 0)
323 mInput = mInput.substr(start);
324
325} 386}
326 387