From a5b9971fd77c0c4bf70656be7f3e7999f59d9f85 Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Fri, 9 Oct 2009 01:53:06 -0700 Subject: * Added a lock object for the write functions in LLUDPClientCollection (immutable != concurrent write safety) * Allow the UDP server to bind to a user-specified port again * Updated to a newer version of OpenSimUDPBase that streamlines the code even more. This also reintroduces the highly concurrent packet handling which needs more testing --- .../Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | 303 ++++++++------------- 1 file changed, 111 insertions(+), 192 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs index 218aaac..9b1751d 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs @@ -29,101 +29,90 @@ using System; using System.Net; using System.Net.Sockets; using System.Threading; -using OpenMetaverse; +using log4net; -namespace OpenSim.Region.ClientStack.LindenUDP +namespace OpenMetaverse { /// - /// + /// Base UDP server /// public abstract class OpenSimUDPBase { - // these abstract methods must be implemented in a derived class to actually do - // something with the packets that are sent and received. + private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + + /// + /// This method is called when an incoming packet is received + /// + /// Incoming packet buffer protected abstract void PacketReceived(UDPPacketBuffer buffer); + + /// + /// This method is called when an outgoing packet is sent + /// + /// Outgoing packet buffer + /// Number of bytes written to the wire protected abstract void PacketSent(UDPPacketBuffer buffer, int bytesSent); - // the port to listen on - internal int udpPort; - - // the UDP socket - private Socket udpSocket; + /// UDP port to bind to in server mode + protected int m_udpPort; - // the ReaderWriterLock is used solely for the purposes of shutdown (Stop()). - // since there are potentially many "reader" threads in the internal .NET IOCP - // thread pool, this is a cheaper synchronization primitive than using - // a Mutex object. This allows many UDP socket "reads" concurrently - when - // Stop() is called, it attempts to obtain a writer lock which will then - // wait until all outstanding operations are completed before shutting down. - // this avoids the problem of closing the socket with outstanding operations - // and trying to catch the inevitable ObjectDisposedException. - private ReaderWriterLock rwLock = new ReaderWriterLock(); + /// Local IP address to bind to in server mode + protected IPAddress m_localBindAddress; - // number of outstanding operations. This is a reference count - // which we use to ensure that the threads exit cleanly. Note that - // we need this because the threads will potentially still need to process - // data even after the socket is closed. - private int rwOperationCount = 0; + /// UDP socket, used in either client or server mode + private Socket m_udpSocket; - // the all important shutdownFlag. This is synchronized through the ReaderWriterLock. - private volatile bool shutdownFlag = true; - - // the remote endpoint to communicate with - protected IPEndPoint remoteEndPoint = null; + /// The all important shutdown flag + private volatile bool m_shutdownFlag = true; + /// Returns true if the server is currently listening, otherwise false + public bool IsRunning { get { return !m_shutdownFlag; } } /// - /// Initialize the UDP packet handler in server mode + /// Default constructor /// + /// Local IP address to bind the server to /// Port to listening for incoming UDP packets on - public OpenSimUDPBase(int port) - { - udpPort = port; - } - - /// - /// Initialize the UDP packet handler in client mode - /// - /// Remote UDP server to connect to - public OpenSimUDPBase(IPEndPoint endPoint) + public OpenSimUDPBase(IPAddress bindAddress, int port) { - remoteEndPoint = endPoint; - udpPort = 0; + m_localBindAddress = bindAddress; + m_udpPort = port; } /// - /// + /// Start the UDP server /// + /// This method will attempt to set the SIO_UDP_CONNRESET flag + /// on the socket to get newer versions of Windows to behave in a sane + /// manner (not throwing an exception when the remote side resets the + /// connection). This call is ignored on Mono where the flag is not + /// necessary public void Start() { - if (shutdownFlag) + if (m_shutdownFlag) { - if (remoteEndPoint == null) - { - // Server mode + const int SIO_UDP_CONNRESET = -1744830452; - // create and bind the socket - IPEndPoint ipep = new IPEndPoint(Settings.BIND_ADDR, udpPort); - udpSocket = new Socket( - AddressFamily.InterNetwork, - SocketType.Dgram, - ProtocolType.Udp); - udpSocket.Bind(ipep); + IPEndPoint ipep = new IPEndPoint(m_localBindAddress, m_udpPort); + m_udpSocket = new Socket( + AddressFamily.InterNetwork, + SocketType.Dgram, + ProtocolType.Udp); + try + { + // this udp socket flag is not supported under mono, + // so we'll catch the exception and continue + m_udpSocket.IOControl(SIO_UDP_CONNRESET, new byte[] { 0 }, null); + m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag set"); } - else + catch (SocketException) { - // Client mode - IPEndPoint ipep = new IPEndPoint(Settings.BIND_ADDR, udpPort); - udpSocket = new Socket( - AddressFamily.InterNetwork, - SocketType.Dgram, - ProtocolType.Udp); - udpSocket.Bind(ipep); - //udpSocket.Connect(remoteEndPoint); + m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag not supported on this platform, ignoring"); } + m_udpSocket.Bind(ipep); // we're not shutting down, we're starting up - shutdownFlag = false; + m_shutdownFlag = false; // kick off an async receive. The Start() method will return, the // actual receives will occur asynchronously and will be caught in @@ -133,104 +122,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP } /// - /// + /// Stops the UDP server /// public void Stop() { - if (!shutdownFlag) + if (!m_shutdownFlag) { // wait indefinitely for a writer lock. Once this is called, the .NET runtime // will deny any more reader locks, in effect blocking all other send/receive // threads. Once we have the lock, we set shutdownFlag to inform the other // threads that the socket is closed. - rwLock.AcquireWriterLock(-1); - shutdownFlag = true; - udpSocket.Close(); - rwLock.ReleaseWriterLock(); - - // wait for any pending operations to complete on other - // threads before exiting. - const int FORCE_STOP = 100; - int i = 0; - while (rwOperationCount > 0 && i < FORCE_STOP) - { - Thread.Sleep(10); - ++i; - } - - if (i >= FORCE_STOP) - { - Logger.Log("UDPBase.Stop() forced shutdown while waiting on pending operations", - Helpers.LogLevel.Warning); - } + m_shutdownFlag = true; + m_udpSocket.Close(); } } - /// - /// - /// - public bool IsRunning - { - get { return !shutdownFlag; } - } - private void AsyncBeginReceive() { - // this method actually kicks off the async read on the socket. - // we aquire a reader lock here to ensure that no other thread - // is trying to set shutdownFlag and close the socket. - rwLock.AcquireReaderLock(-1); + // allocate a packet buffer + //WrappedObject wrappedBuffer = Pool.CheckOut(); + UDPPacketBuffer buf = new UDPPacketBuffer(); - if (!shutdownFlag) + if (!m_shutdownFlag) { - // increment the count of pending operations - Interlocked.Increment(ref rwOperationCount); - - // allocate a packet buffer - //WrappedObject wrappedBuffer = Pool.CheckOut(); - UDPPacketBuffer buf = new UDPPacketBuffer(); - try { // kick off an async read - udpSocket.BeginReceiveFrom( + m_udpSocket.BeginReceiveFrom( //wrappedBuffer.Instance.Data, buf.Data, 0, UDPPacketBuffer.BUFFER_SIZE, SocketFlags.None, - //ref wrappedBuffer.Instance.RemoteEndPoint, ref buf.RemoteEndPoint, - new AsyncCallback(AsyncEndReceive), + AsyncEndReceive, //wrappedBuffer); buf); } - catch (SocketException) + catch (SocketException e) { - // something bad happened - //Logger.Log( - // "A SocketException occurred in UDPServer.AsyncBeginReceive()", - // Helpers.LogLevel.Error, se); - - // an error occurred, therefore the operation is void. Decrement the reference count. - Interlocked.Decrement(ref rwOperationCount); + if (e.SocketErrorCode == SocketError.ConnectionReset) + { + m_log.Warn("[UDPBASE]: SIO_UDP_CONNRESET was ignored, attempting to salvage the UDP listener on port " + m_udpPort); + bool salvaged = false; + while (!salvaged) + { + try + { + m_udpSocket.BeginReceiveFrom( + //wrappedBuffer.Instance.Data, + buf.Data, + 0, + UDPPacketBuffer.BUFFER_SIZE, + SocketFlags.None, + ref buf.RemoteEndPoint, + AsyncEndReceive, + //wrappedBuffer); + buf); + salvaged = true; + } + catch (SocketException) { } + catch (ObjectDisposedException) { return; } + } + + m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort); + } } + catch (ObjectDisposedException) { } } - - // we're done with the socket for now, release the reader lock. - rwLock.ReleaseReaderLock(); } private void AsyncEndReceive(IAsyncResult iar) { // Asynchronous receive operations will complete here through the call // to AsyncBeginReceive - - // aquire a reader lock - rwLock.AcquireReaderLock(-1); - - if (!shutdownFlag) + if (!m_shutdownFlag) { + // start another receive - this keeps the server going! + AsyncBeginReceive(); + // get the buffer that was created in AsyncBeginReceive // this is the received data //WrappedObject wrappedBuffer = (WrappedObject)iar.AsyncState; @@ -241,100 +211,49 @@ namespace OpenSim.Region.ClientStack.LindenUDP { // get the length of data actually read from the socket, store it with the // buffer - buffer.DataLength = udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint); - - // this operation is now complete, decrement the reference count - Interlocked.Decrement(ref rwOperationCount); - - // we're done with the socket, release the reader lock - rwLock.ReleaseReaderLock(); + buffer.DataLength = m_udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint); // call the abstract method PacketReceived(), passing the buffer that // has just been filled from the socket read. PacketReceived(buffer); } - catch (SocketException) - { - // an error occurred, therefore the operation is void. Decrement the reference count. - Interlocked.Decrement(ref rwOperationCount); - - // we're done with the socket for now, release the reader lock. - rwLock.ReleaseReaderLock(); - } - finally - { - // start another receive - this keeps the server going! - AsyncBeginReceive(); - - //wrappedBuffer.Dispose(); - } - } - else - { - // nothing bad happened, but we are done with the operation - // decrement the reference count and release the reader lock - Interlocked.Decrement(ref rwOperationCount); - rwLock.ReleaseReaderLock(); + catch (SocketException) { } + catch (ObjectDisposedException) { } + //finally { wrappedBuffer.Dispose(); } } } public void AsyncBeginSend(UDPPacketBuffer buf) { - rwLock.AcquireReaderLock(-1); - - if (!shutdownFlag) + if (!m_shutdownFlag) { try { - Interlocked.Increment(ref rwOperationCount); - udpSocket.BeginSendTo( + m_udpSocket.BeginSendTo( buf.Data, 0, buf.DataLength, SocketFlags.None, buf.RemoteEndPoint, - new AsyncCallback(AsyncEndSend), + AsyncEndSend, buf); } - catch (SocketException) - { - //Logger.Log( - // "A SocketException occurred in UDPServer.AsyncBeginSend()", - // Helpers.LogLevel.Error, se); - } + catch (SocketException) { } + catch (ObjectDisposedException) { } } - - rwLock.ReleaseReaderLock(); } - private void AsyncEndSend(IAsyncResult iar) + void AsyncEndSend(IAsyncResult result) { - rwLock.AcquireReaderLock(-1); - - if (!shutdownFlag) + try { - UDPPacketBuffer buffer = (UDPPacketBuffer)iar.AsyncState; + UDPPacketBuffer buf = (UDPPacketBuffer)result.AsyncState; + int bytesSent = m_udpSocket.EndSendTo(result); - try - { - int bytesSent = udpSocket.EndSendTo(iar); - - // note that call to the abstract PacketSent() method - we are passing the number - // of bytes sent in a separate parameter, since we can't use buffer.DataLength which - // is the number of bytes to send (or bytes received depending upon whether this - // buffer was part of a send or a receive). - PacketSent(buffer, bytesSent); - } - catch (SocketException) - { - //Logger.Log( - // "A SocketException occurred in UDPServer.AsyncEndSend()", - // Helpers.LogLevel.Error, se); - } + PacketSent(buf, bytesSent); } - - Interlocked.Decrement(ref rwOperationCount); - rwLock.ReleaseReaderLock(); + catch (SocketException) { } + catch (ObjectDisposedException) { } } } } -- cgit v1.1 From 23586b69a16c75b8c7cc9a99d1a7693686285ff7 Mon Sep 17 00:00:00 2001 From: Melanie Date: Fri, 9 Oct 2009 12:17:55 +0100 Subject: Slow down the packet receiving code again after new reports of thread storms. --- OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs index 9b1751d..e78a4fe 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs @@ -198,9 +198,6 @@ namespace OpenMetaverse // to AsyncBeginReceive if (!m_shutdownFlag) { - // start another receive - this keeps the server going! - AsyncBeginReceive(); - // get the buffer that was created in AsyncBeginReceive // this is the received data //WrappedObject wrappedBuffer = (WrappedObject)iar.AsyncState; @@ -219,7 +216,14 @@ namespace OpenMetaverse } catch (SocketException) { } catch (ObjectDisposedException) { } - //finally { wrappedBuffer.Dispose(); } + finally + { + // wrappedBuffer.Dispose(); + + // start another receive - this keeps the server going! + AsyncBeginReceive(); + } + } } -- cgit v1.1 From 29543514e6a8ad91a90e244a9488e9d0408f45bf Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Fri, 9 Oct 2009 16:33:50 -0700 Subject: * Changed the "Packet exceeded buffer size" log line to debug and include the packet type. This message is normal, but could be evidence of a message marked for zerocoding that probably shouldn't be * Changing OpenSimUDPBase back to high concurrency. High concurrency mode seems to make other problems happen faster, so it's helpful for working out other bugs and will probably --- OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs index e78a4fe..fad2ea8 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs @@ -198,6 +198,9 @@ namespace OpenMetaverse // to AsyncBeginReceive if (!m_shutdownFlag) { + // start another receive - this keeps the server going! + AsyncBeginReceive(); + // get the buffer that was created in AsyncBeginReceive // this is the received data //WrappedObject wrappedBuffer = (WrappedObject)iar.AsyncState; @@ -216,13 +219,7 @@ namespace OpenMetaverse } catch (SocketException) { } catch (ObjectDisposedException) { } - finally - { - // wrappedBuffer.Dispose(); - - // start another receive - this keeps the server going! - AsyncBeginReceive(); - } + //finally { wrappedBuffer.Dispose(); } } } -- cgit v1.1 From 0d2e6463d714bce8a6a628bd647c625feeeae8f6 Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Wed, 14 Oct 2009 11:43:31 -0700 Subject: * Minimized the number of times textures are pulled off the priority queue * OnQueueEmpty is still called async, but will not be called for a given category if the previous callback for that category is still running. This is the most balanced behavior I could find, and seems to work well * Added support for the old [ClientStack.LindenUDP] settings (including setting the receive buffer size) and added the new token bucket and global throttle settings * Added the AssetLoaderEnabled config variable to optionally disable loading assets from XML every startup. This gives a dramatic improvement in startup times for those who don't need the functionality every startup --- OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs index fad2ea8..44a6ed6 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs @@ -73,6 +73,7 @@ namespace OpenMetaverse /// /// Local IP address to bind the server to /// Port to listening for incoming UDP packets on + /// public OpenSimUDPBase(IPAddress bindAddress, int port) { m_localBindAddress = bindAddress; @@ -82,25 +83,31 @@ namespace OpenMetaverse /// /// Start the UDP server /// + /// The size of the receive buffer for + /// the UDP socket. This value is passed up to the operating system + /// and used in the system networking stack. Use zero to leave this + /// value as the default /// This method will attempt to set the SIO_UDP_CONNRESET flag /// on the socket to get newer versions of Windows to behave in a sane /// manner (not throwing an exception when the remote side resets the /// connection). This call is ignored on Mono where the flag is not /// necessary - public void Start() + public void Start(int recvBufferSize) { if (m_shutdownFlag) { const int SIO_UDP_CONNRESET = -1744830452; IPEndPoint ipep = new IPEndPoint(m_localBindAddress, m_udpPort); + m_udpSocket = new Socket( AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + try { - // this udp socket flag is not supported under mono, + // This udp socket flag is not supported under mono, // so we'll catch the exception and continue m_udpSocket.IOControl(SIO_UDP_CONNRESET, new byte[] { 0 }, null); m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag set"); @@ -109,6 +116,10 @@ namespace OpenMetaverse { m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag not supported on this platform, ignoring"); } + + if (recvBufferSize != 0) + m_udpSocket.ReceiveBufferSize = recvBufferSize; + m_udpSocket.Bind(ipep); // we're not shutting down, we're starting up -- cgit v1.1 From 06990b074c17c2201ed379bf1ae4c7191ab3187f Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Wed, 14 Oct 2009 16:48:27 -0700 Subject: Allow the LLUDP server to run in either synchronous or asynchronous mode with a config setting, defaulting to synchronous mode --- .../Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | 28 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs index 44a6ed6..d16837d 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs @@ -62,6 +62,9 @@ namespace OpenMetaverse /// UDP socket, used in either client or server mode private Socket m_udpSocket; + /// Flag to process packets asynchronously or synchronously + private bool m_asyncPacketHandling; + /// The all important shutdown flag private volatile bool m_shutdownFlag = true; @@ -73,7 +76,6 @@ namespace OpenMetaverse /// /// Local IP address to bind the server to /// Port to listening for incoming UDP packets on - /// public OpenSimUDPBase(IPAddress bindAddress, int port) { m_localBindAddress = bindAddress; @@ -87,13 +89,19 @@ namespace OpenMetaverse /// the UDP socket. This value is passed up to the operating system /// and used in the system networking stack. Use zero to leave this /// value as the default + /// Set this to true to start + /// receiving more packets while current packet handler callbacks are + /// still running. Setting this to false will complete each packet + /// callback before the next packet is processed /// This method will attempt to set the SIO_UDP_CONNRESET flag /// on the socket to get newer versions of Windows to behave in a sane /// manner (not throwing an exception when the remote side resets the /// connection). This call is ignored on Mono where the flag is not /// necessary - public void Start(int recvBufferSize) + public void Start(int recvBufferSize, bool asyncPacketHandling) { + m_asyncPacketHandling = asyncPacketHandling; + if (m_shutdownFlag) { const int SIO_UDP_CONNRESET = -1744830452; @@ -209,8 +217,10 @@ namespace OpenMetaverse // to AsyncBeginReceive if (!m_shutdownFlag) { - // start another receive - this keeps the server going! - AsyncBeginReceive(); + // Asynchronous mode will start another receive before the + // callback for this packet is even fired. Very parallel :-) + if (m_asyncPacketHandling) + AsyncBeginReceive(); // get the buffer that was created in AsyncBeginReceive // this is the received data @@ -230,7 +240,15 @@ namespace OpenMetaverse } catch (SocketException) { } catch (ObjectDisposedException) { } - //finally { wrappedBuffer.Dispose(); } + finally + { + //wrappedBuffer.Dispose(); + + // Synchronous mode waits until the packet callback completes + // before starting the receive to fetch another packet + if (!m_asyncPacketHandling) + AsyncBeginReceive(); + } } } -- cgit v1.1