aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llmessage/lliosocket.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--linden/indra/llmessage/lliosocket.cpp615
1 files changed, 615 insertions, 0 deletions
diff --git a/linden/indra/llmessage/lliosocket.cpp b/linden/indra/llmessage/lliosocket.cpp
new file mode 100644
index 0000000..628f884
--- /dev/null
+++ b/linden/indra/llmessage/lliosocket.cpp
@@ -0,0 +1,615 @@
1/**
2 * @file lliosocket.cpp
3 * @author Phoenix
4 * @date 2005-07-31
5 * @brief Sockets declarations for use with the io pipes
6 *
7 * Copyright (c) 2005-2007, Linden Research, Inc.
8 *
9 * The source code in this file ("Source Code") is provided by Linden Lab
10 * to you under the terms of the GNU General Public License, version 2.0
11 * ("GPL"), unless you have obtained a separate licensing agreement
12 * ("Other License"), formally executed by you and Linden Lab. Terms of
13 * the GPL can be found in doc/GPL-license.txt in this distribution, or
14 * online at http://secondlife.com/developers/opensource/gplv2
15 *
16 * There are special exceptions to the terms and conditions of the GPL as
17 * it is applied to this Source Code. View the full text of the exception
18 * in the file doc/FLOSS-exception.txt in this software distribution, or
19 * online at http://secondlife.com/developers/opensource/flossexception
20 *
21 * By copying, modifying or distributing this software, you acknowledge
22 * that you have read and understood your obligations described above,
23 * and agree to abide by those obligations.
24 *
25 * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
26 * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
27 * COMPLETENESS OR PERFORMANCE.
28 */
29
30#include "linden_common.h"
31#include "lliosocket.h"
32
33#include "llapr.h"
34
35#include "llbuffer.h"
36#include "llhost.h"
37#include "llmemtype.h"
38#include "llpumpio.h"
39
40//
41// constants
42//
43
44static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10;
45static const S32 LL_SEND_BUFFER_SIZE = 40000;
46static const S32 LL_RECV_BUFFER_SIZE = 40000;
47//static const U16 LL_PORT_DISCOVERY_RANGE_MIN = 13000;
48//static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050;
49
50//
51// local methods
52//
53
54bool is_addr_in_use(apr_status_t status)
55{
56#if LL_WINDOWS
57 return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
58#else
59 return (EADDRINUSE == APR_TO_OS_ERROR(status));
60#endif
61}
62
63///
64/// LLSocket
65///
66
67// static
68LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port)
69{
70 LLMemType m1(LLMemType::MTYPE_IO_TCP);
71 LLSocket::ptr_t rv;
72 apr_socket_t* socket = NULL;
73 apr_pool_t* new_pool = NULL;
74 apr_status_t status = APR_EGENERAL;
75
76 // create a pool for the socket
77 status = apr_pool_create(&new_pool, pool);
78 if(ll_apr_warn_status(status))
79 {
80 if(new_pool) apr_pool_destroy(new_pool);
81 return rv;
82 }
83
84 if(STREAM_TCP == type)
85 {
86 status = apr_socket_create(
87 &socket,
88 APR_INET,
89 SOCK_STREAM,
90 APR_PROTO_TCP,
91 new_pool);
92 }
93 else if(DATAGRAM_UDP == type)
94 {
95 status = apr_socket_create(
96 &socket,
97 APR_INET,
98 SOCK_DGRAM,
99 APR_PROTO_UDP,
100 new_pool);
101 }
102 else
103 {
104 if(new_pool) apr_pool_destroy(new_pool);
105 return rv;
106 }
107 if(ll_apr_warn_status(status))
108 {
109 if(new_pool) apr_pool_destroy(new_pool);
110 return rv;
111 }
112 rv = ptr_t(new LLSocket(socket, new_pool));
113 if(port > 0)
114 {
115 apr_sockaddr_t* sa = NULL;
116 status = apr_sockaddr_info_get(
117 &sa,
118 APR_ANYADDR,
119 APR_UNSPEC,
120 port,
121 0,
122 new_pool);
123 if(ll_apr_warn_status(status))
124 {
125 rv.reset();
126 return rv;
127 }
128 // This allows us to reuse the address on quick down/up. This
129 // is unlikely to create problems.
130 ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
131 status = apr_socket_bind(socket, sa);
132 if(ll_apr_warn_status(status))
133 {
134 rv.reset();
135 return rv;
136 }
137 lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
138 << " socket to port: " << sa->port << llendl;
139 if(STREAM_TCP == type)
140 {
141 // If it's a stream based socket, we need to tell the OS
142 // to keep a queue of incoming connections for ACCEPT.
143 lldebugs << "Setting listen state for socket." << llendl;
144 status = apr_socket_listen(
145 socket,
146 LL_DEFAULT_LISTEN_BACKLOG);
147 if(ll_apr_warn_status(status))
148 {
149 rv.reset();
150 return rv;
151 }
152 }
153 }
154 else
155 {
156 // we need to indicate that we have an ephemeral port if the
157 // previous calls were successful. It will
158 port = PORT_EPHEMERAL;
159 }
160 rv->mPort = port;
161 rv->setOptions();
162 return rv;
163}
164
165// static
166LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
167{
168 LLMemType m1(LLMemType::MTYPE_IO_TCP);
169 LLSocket::ptr_t rv;
170 if(!socket)
171 {
172 return rv;
173 }
174 rv = ptr_t(new LLSocket(socket, pool));
175 rv->mPort = PORT_EPHEMERAL;
176 rv->setOptions();
177 return rv;
178}
179
180
181bool LLSocket::blockingConnect(const LLHost& host)
182{
183 if(!mSocket) return false;
184 apr_sockaddr_t* sa = NULL;
185 char ip_address[MAXADDRSTR]; /*Flawfinder: ignore*/
186 host.getIPString(ip_address, MAXADDRSTR);
187 if(ll_apr_warn_status(apr_sockaddr_info_get(
188 &sa,
189 ip_address,
190 APR_UNSPEC,
191 host.getPort(),
192 0,
193 mPool)))
194 {
195 return false;
196 }
197 apr_socket_timeout_set(mSocket, 1000);
198 if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
199 setOptions();
200 return true;
201}
202
203LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
204 mSocket(socket),
205 mPool(pool),
206 mPort(PORT_INVALID)
207{
208 LLMemType m1(LLMemType::MTYPE_IO_TCP);
209}
210
211LLSocket::~LLSocket()
212{
213 LLMemType m1(LLMemType::MTYPE_IO_TCP);
214 // *FIX: clean up memory we are holding.
215 //lldebugs << "Destroying LLSocket" << llendl;
216 if(mSocket)
217 {
218 apr_socket_close(mSocket);
219 }
220 if(mPool)
221 {
222 apr_pool_destroy(mPool);
223 }
224}
225
226void LLSocket::setOptions()
227{
228 LLMemType m1(LLMemType::MTYPE_IO_TCP);
229 // set up the socket options
230 ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
231 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
232 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
233
234}
235
236///
237/// LLIOSocketReader
238///
239
240LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
241 mSource(socket),
242 mInitialized(false)
243{
244 LLMemType m1(LLMemType::MTYPE_IO_TCP);
245}
246
247LLIOSocketReader::~LLIOSocketReader()
248{
249 LLMemType m1(LLMemType::MTYPE_IO_TCP);
250 //lldebugs << "Destroying LLIOSocketReader" << llendl;
251}
252
253// virtual
254LLIOPipe::EStatus LLIOSocketReader::process_impl(
255 const LLChannelDescriptors& channels,
256 buffer_ptr_t& buffer,
257 bool& eos,
258 LLSD& context,
259 LLPumpIO* pump)
260{
261 PUMP_DEBUG;
262 LLMemType m1(LLMemType::MTYPE_IO_TCP);
263 if(!mSource) return STATUS_PRECONDITION_NOT_MET;
264 if(!mInitialized)
265 {
266 PUMP_DEBUG;
267 // Since the read will not block, it's ok to initialize and
268 // attempt to read off the descriptor immediately.
269 mInitialized = true;
270 if(pump)
271 {
272 PUMP_DEBUG;
273 lldebugs << "Initializing poll descriptor for LLIOSocketReader."
274 << llendl;
275 apr_pollfd_t poll_fd;
276 poll_fd.p = NULL;
277 poll_fd.desc_type = APR_POLL_SOCKET;
278 poll_fd.reqevents = APR_POLLIN;
279 poll_fd.rtnevents = 0x0;
280 poll_fd.desc.s = mSource->getSocket();
281 poll_fd.client_data = NULL;
282 pump->setConditional(this, &poll_fd);
283 }
284 }
285 //if(!buffer)
286 //{
287 // buffer = new LLBufferArray;
288 //}
289 PUMP_DEBUG;
290 const apr_size_t READ_BUFFER_SIZE = 1024;
291 char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/
292 apr_size_t len;
293 apr_status_t status = APR_SUCCESS;
294 do
295 {
296 PUMP_DEBUG;
297 len = READ_BUFFER_SIZE;
298 status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
299 buffer->append(channels.out(), (U8*)read_buf, len);
300 } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
301 lldebugs << "socket read status: " << status << llendl;
302 LLIOPipe::EStatus rv = STATUS_OK;
303
304 PUMP_DEBUG;
305 // *FIX: Also need to check for broken pipe
306 if(APR_STATUS_IS_EOF(status))
307 {
308 // *FIX: Should we shut down the socket read?
309 if(pump)
310 {
311 pump->setConditional(this, NULL);
312 }
313 rv = STATUS_DONE;
314 eos = true;
315 }
316 else if(APR_STATUS_IS_EAGAIN(status))
317 {
318 // everything is fine, but we can terminate this process pump.
319 rv = STATUS_BREAK;
320 }
321 else
322 {
323 if(ll_apr_warn_status(status))
324 {
325 rv = STATUS_ERROR;
326 }
327 }
328 PUMP_DEBUG;
329 return rv;
330}
331
332///
333/// LLIOSocketWriter
334///
335
336LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
337 mDestination(socket),
338 mLastWritten(NULL),
339 mInitialized(false)
340{
341 LLMemType m1(LLMemType::MTYPE_IO_TCP);
342}
343
344LLIOSocketWriter::~LLIOSocketWriter()
345{
346 LLMemType m1(LLMemType::MTYPE_IO_TCP);
347 //lldebugs << "Destroying LLIOSocketWriter" << llendl;
348}
349
350// virtual
351LLIOPipe::EStatus LLIOSocketWriter::process_impl(
352 const LLChannelDescriptors& channels,
353 buffer_ptr_t& buffer,
354 bool& eos,
355 LLSD& context,
356 LLPumpIO* pump)
357{
358 PUMP_DEBUG;
359 LLMemType m1(LLMemType::MTYPE_IO_TCP);
360 if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
361 if(!mInitialized)
362 {
363 PUMP_DEBUG;
364 // Since the write will not block, it's ok to initialize and
365 // attempt to write immediately.
366 mInitialized = true;
367 if(pump)
368 {
369 PUMP_DEBUG;
370 lldebugs << "Initializing poll descriptor for LLIOSocketWriter."
371 << llendl;
372 apr_pollfd_t poll_fd;
373 poll_fd.p = NULL;
374 poll_fd.desc_type = APR_POLL_SOCKET;
375 poll_fd.reqevents = APR_POLLOUT;
376 poll_fd.rtnevents = 0x0;
377 poll_fd.desc.s = mDestination->getSocket();
378 poll_fd.client_data = NULL;
379 pump->setConditional(this, &poll_fd);
380 }
381 }
382
383 PUMP_DEBUG;
384 // *FIX: Some sort of writev implementation would be much more
385 // efficient - not only because writev() is better, but also
386 // because we won't have to do as much work to find the start
387 // address.
388 LLBufferArray::segment_iterator_t it;
389 LLBufferArray::segment_iterator_t end = buffer->endSegment();
390 LLSegment segment;
391 it = buffer->constructSegmentAfter(mLastWritten, segment);
392 /*
393 if(NULL == mLastWritten)
394 {
395 it = buffer->beginSegment();
396 segment = (*it);
397 }
398 else
399 {
400 it = buffer->getSegment(mLastWritten);
401 segment = (*it);
402 S32 size = segment.size();
403 U8* data = segment.data();
404 if((data + size) == mLastWritten)
405 {
406 ++it;
407 segment = (*it);
408 }
409 else
410 {
411 // *FIX: check the math on this one
412 segment = LLSegment(
413 (*it).getChannelMask(),
414 mLastWritten + 1,
415 size - (mLastWritten - data));
416 }
417 }
418 */
419
420 PUMP_DEBUG;
421 apr_size_t len;
422 bool done = false;
423 while(it != end)
424 {
425 PUMP_DEBUG;
426 if((*it).isOnChannel(channels.in()))
427 {
428 PUMP_DEBUG;
429 // *FIX: check return code - sockets will fail (broken, etc.)
430 len = (apr_size_t)segment.size();
431 apr_socket_send(
432 mDestination->getSocket(),
433 (const char*)segment.data(),
434 &len);
435 mLastWritten = segment.data() + len - 1;
436 PUMP_DEBUG;
437 if((S32)len < segment.size())
438 {
439 break;
440 }
441 }
442 ++it;
443 if(it != end)
444 {
445 segment = (*it);
446 }
447 else
448 {
449 done = true;
450 }
451 }
452 PUMP_DEBUG;
453 if(done && eos)
454 {
455 return STATUS_DONE;
456 }
457 return STATUS_OK;
458}
459
460
461///
462/// LLIOServerSocket
463///
464
465LLIOServerSocket::LLIOServerSocket(
466 apr_pool_t* pool,
467 LLIOServerSocket::socket_t listener,
468 factory_t factory) :
469 mPool(pool),
470 mListenSocket(listener),
471 mReactor(factory),
472 mInitialized(false),
473 mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
474{
475 LLMemType m1(LLMemType::MTYPE_IO_TCP);
476}
477
478LLIOServerSocket::~LLIOServerSocket()
479{
480 LLMemType m1(LLMemType::MTYPE_IO_TCP);
481 //lldebugs << "Destroying LLIOServerSocket" << llendl;
482}
483
484void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
485{
486 mResponseTimeout = timeout_secs;
487}
488
489// virtual
490LLIOPipe::EStatus LLIOServerSocket::process_impl(
491 const LLChannelDescriptors& channels,
492 buffer_ptr_t& buffer,
493 bool& eos,
494 LLSD& context,
495 LLPumpIO* pump)
496{
497 PUMP_DEBUG;
498 LLMemType m1(LLMemType::MTYPE_IO_TCP);
499 if(!pump)
500 {
501 llwarns << "Need a pump for server socket." << llendl;
502 return STATUS_ERROR;
503 }
504 if(!mInitialized)
505 {
506 PUMP_DEBUG;
507 // This segment sets up the pump so that we do not call
508 // process again until we have an incoming read, aka connect()
509 // from a remote host.
510 lldebugs << "Initializing poll descriptor for LLIOServerSocket."
511 << llendl;
512 apr_pollfd_t poll_fd;
513 poll_fd.p = NULL;
514 poll_fd.desc_type = APR_POLL_SOCKET;
515 poll_fd.reqevents = APR_POLLIN;
516 poll_fd.rtnevents = 0x0;
517 poll_fd.desc.s = mListenSocket->getSocket();
518 poll_fd.client_data = NULL;
519 pump->setConditional(this, &poll_fd);
520 mInitialized = true;
521 return STATUS_OK;
522 }
523
524 // we are initialized, and told to process, so we must have a
525 // socket waiting for a connection.
526 lldebugs << "accepting socket" << llendl;
527
528 PUMP_DEBUG;
529 apr_pool_t* new_pool = NULL;
530 apr_status_t status = apr_pool_create(&new_pool, mPool);
531 apr_socket_t* socket = NULL;
532 status = apr_socket_accept(
533 &socket,
534 mListenSocket->getSocket(),
535 new_pool);
536 LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
537 //EStatus rv = STATUS_ERROR;
538 if(llsocket)
539 {
540 PUMP_DEBUG;
541 LLPumpIO::chain_t chain;
542 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
543 if(mReactor->build(chain, LLSD()))
544 {
545 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
546 pump->addChain(chain, mResponseTimeout);
547 status = STATUS_OK;
548 }
549 else
550 {
551 llwarns << "Unable to build reactor to socket." << llendl;
552 }
553 }
554 else
555 {
556 llwarns << "Unable to create linden socket." << llendl;
557 }
558
559 PUMP_DEBUG;
560 // This needs to always return success, lest it get removed from
561 // the pump.
562 return STATUS_OK;
563}
564
565
566#if 0
567LLIODataSocket::LLIODataSocket(
568 U16 suggested_port,
569 U16 start_discovery_port,
570 apr_pool_t* pool) :
571 mSocket(NULL)
572{
573 if(!pool || (PORT_INVALID == suggested_port)) return;
574 if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
575 apr_sockaddr_t* sa = NULL;
576 if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
577 apr_status_t status = apr_socket_bind(mSocket, sa);
578 if((start_discovery_port > 0) && is_addr_in_use(status))
579 {
580 const U16 MAX_ATTEMPT_PORTS = 50;
581 for(U16 attempt_port = start_discovery_port;
582 attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
583 ++attempt_port)
584 {
585 sa->port = attempt_port;
586 sa->sa.sin.sin_port = htons(attempt_port);
587 status = apr_socket_bind(mSocket, sa);
588 if(APR_SUCCESS == status) break;
589 if(is_addr_in_use(status)) continue;
590 (void)ll_apr_warn_status(status);
591 }
592 }
593 if(ll_apr_warn_status(status)) return;
594 if(sa->port)
595 {
596 lldebugs << "Bound datagram socket to port: " << sa->port << llendl;
597 mPort = sa->port;
598 }
599 else
600 {
601 mPort = LLIOSocket::PORT_EPHEMERAL;
602 }
603
604 // set up the socket options options
605 ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
606 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
607 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
608}
609
610LLIODataSocket::~LLIODataSocket()
611{
612}
613
614
615#endif