From a5b9971fd77c0c4bf70656be7f3e7999f59d9f85 Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Fri, 9 Oct 2009 01:53:06 -0700
Subject: * Added a lock object for the write functions in
LLUDPClientCollection (immutable != concurrent write safety) * Allow the UDP
server to bind to a user-specified port again * Updated to a newer version of
OpenSimUDPBase that streamlines the code even more. This also reintroduces
the highly concurrent packet handling which needs more testing
---
.../ClientStack/LindenUDP/LLUDPClientCollection.cs | 56 +++-
.../Region/ClientStack/LindenUDP/LLUDPServer.cs | 20 +-
.../Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | 303 ++++++++-------------
3 files changed, 169 insertions(+), 210 deletions(-)
(limited to 'OpenSim')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs
index abf3882..2222a33 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs
@@ -36,6 +36,9 @@ using ReaderWriterLockImpl = OpenMetaverse.ReaderWriterLockSlim;
namespace OpenSim.Region.ClientStack.LindenUDP
{
+ ///
+ /// A thread safe mapping from endpoints to client references
+ ///
public sealed class UDPClientCollection
{
#region IComparers
@@ -52,43 +55,80 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#endregion IComparers
+ /// An immutable dictionary mapping from
+ /// to references
private ImmutableMap m_dict;
+ /// Immutability grants thread safety for concurrent reads and
+ /// read-writes, but not concurrent writes
+ private object m_writeLock;
+ /// Number of clients in the collection
+ public int Count { get { return m_dict.Count; } }
+
+ ///
+ /// Default constructor
+ ///
public UDPClientCollection()
{
m_dict = new ImmutableMap(new IPEndPointComparer());
}
+ ///
+ /// Add a client reference to the collection
+ ///
+ /// Remote endpoint of the client
+ /// Reference to the client object
public void Add(IPEndPoint key, LLUDPClient value)
{
- m_dict = m_dict.Add(key, value);
+ lock (m_writeLock)
+ m_dict = m_dict.Add(key, value);
}
+ ///
+ /// Remove a client from the collection
+ ///
+ /// Remote endpoint of the client
public void Remove(IPEndPoint key)
{
- m_dict = m_dict.Delete(key);
+ lock (m_writeLock)
+ m_dict = m_dict.Delete(key);
}
+ ///
+ /// Resets the client collection
+ ///
public void Clear()
{
- m_dict = new ImmutableMap(new IPEndPointComparer());
- }
-
- public int Count
- {
- get { return m_dict.Count; }
+ lock (m_writeLock)
+ m_dict = new ImmutableMap(new IPEndPointComparer());
}
+ ///
+ /// Checks if an endpoint is in the collection
+ ///
+ /// Endpoint to check for
+ /// True if the endpoint was found in the collection, otherwise false
public bool ContainsKey(IPEndPoint key)
{
return m_dict.ContainsKey(key);
}
+ ///
+ /// Attempts to fetch a value out of the collection
+ ///
+ /// Endpoint of the client to retrieve
+ /// Retrieved client, or null on lookup failure
+ /// True if the lookup succeeded, otherwise false
public bool TryGetValue(IPEndPoint key, out LLUDPClient value)
{
return m_dict.TryGetValue(key, out value);
}
+ ///
+ /// Performs a given task in parallel for each of the elements in the
+ /// collection
+ ///
+ /// Action to perform on each element
public void ForEach(Action action)
{
Parallel.ForEach(m_dict.Values, action);
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index a6aa048..9aeea9a 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -96,7 +96,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// Incoming packets that are awaiting handling
private OpenMetaverse.BlockingQueue packetInbox = new OpenMetaverse.BlockingQueue();
///
- private UDPClientCollection clients = new UDPClientCollection();
+ private UDPClientCollection m_clients = new UDPClientCollection();
/// Bandwidth throttle for this UDP server
private TokenBucket m_throttle;
/// Bandwidth throttle rates for this UDP server
@@ -115,7 +115,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public Socket Server { get { return null; } }
public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager)
- : base((int)port)
+ : base(listenIP, (int)port)
{
#region Environment.TickCount Measurement
@@ -143,7 +143,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public new void Start()
{
if (m_scene == null)
- throw new InvalidOperationException("Cannot LLUDPServer.Start() without an IScene reference");
+ throw new InvalidOperationException("[LLUDPSERVER]: Cannot LLUDPServer.Start() without an IScene reference");
base.Start();
@@ -188,7 +188,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_scene.ClientManager.Remove(udpClient.CircuitCode);
udpClient.ClientAPI.Close(false);
udpClient.Shutdown();
- clients.Remove(udpClient.RemoteEndPoint);
+ m_clients.Remove(udpClient.RemoteEndPoint);
}
public void BroadcastPacket(Packet packet, ThrottleOutPacketType category, bool sendToPausedAgents, bool allowSplitting)
@@ -208,7 +208,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
for (int i = 0; i < packetCount; i++)
{
byte[] data = datas[i];
- clients.ForEach(
+ m_clients.ForEach(
delegate(LLUDPClient client)
{ SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); });
}
@@ -216,7 +216,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
else
{
byte[] data = packet.ToBytes();
- clients.ForEach(
+ m_clients.ForEach(
delegate(LLUDPClient client)
{ SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); });
}
@@ -502,7 +502,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
// Determine which agent this packet came from
- if (!clients.TryGetValue(address, out client))
+ if (!m_clients.TryGetValue(address, out client))
{
m_log.Warn("[LLUDPSERVER]: Received a " + packet.Type + " packet from an unrecognized source: " + address);
return;
@@ -606,7 +606,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (m_scene.RegionStatus != RegionStatus.SlaveScene)
{
AuthenticateResponse sessionInfo;
- bool isNewCircuit = !clients.ContainsKey(remoteEndPoint);
+ bool isNewCircuit = !m_clients.ContainsKey(remoteEndPoint);
if (!IsClientAuthorized(useCircuitCode, out sessionInfo))
{
@@ -648,7 +648,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
udpClient.ClientAPI = clientApi;
// Add the new client to our list of tracked clients
- clients.Add(udpClient.RemoteEndPoint, udpClient);
+ m_clients.Add(udpClient.RemoteEndPoint, udpClient);
}
private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend)
@@ -726,7 +726,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
elapsed500MS = 0;
}
- clients.ForEach(
+ m_clients.ForEach(
delegate(LLUDPClient client)
{
if (client.DequeueOutgoing())
diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs
index 218aaac..9b1751d 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs
@@ -29,101 +29,90 @@ using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
-using OpenMetaverse;
+using log4net;
-namespace OpenSim.Region.ClientStack.LindenUDP
+namespace OpenMetaverse
{
///
- ///
+ /// Base UDP server
///
public abstract class OpenSimUDPBase
{
- // these abstract methods must be implemented in a derived class to actually do
- // something with the packets that are sent and received.
+ private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
+
+ ///
+ /// This method is called when an incoming packet is received
+ ///
+ /// Incoming packet buffer
protected abstract void PacketReceived(UDPPacketBuffer buffer);
+
+ ///
+ /// This method is called when an outgoing packet is sent
+ ///
+ /// Outgoing packet buffer
+ /// Number of bytes written to the wire
protected abstract void PacketSent(UDPPacketBuffer buffer, int bytesSent);
- // the port to listen on
- internal int udpPort;
-
- // the UDP socket
- private Socket udpSocket;
+ /// UDP port to bind to in server mode
+ protected int m_udpPort;
- // the ReaderWriterLock is used solely for the purposes of shutdown (Stop()).
- // since there are potentially many "reader" threads in the internal .NET IOCP
- // thread pool, this is a cheaper synchronization primitive than using
- // a Mutex object. This allows many UDP socket "reads" concurrently - when
- // Stop() is called, it attempts to obtain a writer lock which will then
- // wait until all outstanding operations are completed before shutting down.
- // this avoids the problem of closing the socket with outstanding operations
- // and trying to catch the inevitable ObjectDisposedException.
- private ReaderWriterLock rwLock = new ReaderWriterLock();
+ /// Local IP address to bind to in server mode
+ protected IPAddress m_localBindAddress;
- // number of outstanding operations. This is a reference count
- // which we use to ensure that the threads exit cleanly. Note that
- // we need this because the threads will potentially still need to process
- // data even after the socket is closed.
- private int rwOperationCount = 0;
+ /// UDP socket, used in either client or server mode
+ private Socket m_udpSocket;
- // the all important shutdownFlag. This is synchronized through the ReaderWriterLock.
- private volatile bool shutdownFlag = true;
-
- // the remote endpoint to communicate with
- protected IPEndPoint remoteEndPoint = null;
+ /// The all important shutdown flag
+ private volatile bool m_shutdownFlag = true;
+ /// Returns true if the server is currently listening, otherwise false
+ public bool IsRunning { get { return !m_shutdownFlag; } }
///
- /// Initialize the UDP packet handler in server mode
+ /// Default constructor
///
+ /// Local IP address to bind the server to
/// Port to listening for incoming UDP packets on
- public OpenSimUDPBase(int port)
- {
- udpPort = port;
- }
-
- ///
- /// Initialize the UDP packet handler in client mode
- ///
- /// Remote UDP server to connect to
- public OpenSimUDPBase(IPEndPoint endPoint)
+ public OpenSimUDPBase(IPAddress bindAddress, int port)
{
- remoteEndPoint = endPoint;
- udpPort = 0;
+ m_localBindAddress = bindAddress;
+ m_udpPort = port;
}
///
- ///
+ /// Start the UDP server
///
+ /// This method will attempt to set the SIO_UDP_CONNRESET flag
+ /// on the socket to get newer versions of Windows to behave in a sane
+ /// manner (not throwing an exception when the remote side resets the
+ /// connection). This call is ignored on Mono where the flag is not
+ /// necessary
public void Start()
{
- if (shutdownFlag)
+ if (m_shutdownFlag)
{
- if (remoteEndPoint == null)
- {
- // Server mode
+ const int SIO_UDP_CONNRESET = -1744830452;
- // create and bind the socket
- IPEndPoint ipep = new IPEndPoint(Settings.BIND_ADDR, udpPort);
- udpSocket = new Socket(
- AddressFamily.InterNetwork,
- SocketType.Dgram,
- ProtocolType.Udp);
- udpSocket.Bind(ipep);
+ IPEndPoint ipep = new IPEndPoint(m_localBindAddress, m_udpPort);
+ m_udpSocket = new Socket(
+ AddressFamily.InterNetwork,
+ SocketType.Dgram,
+ ProtocolType.Udp);
+ try
+ {
+ // this udp socket flag is not supported under mono,
+ // so we'll catch the exception and continue
+ m_udpSocket.IOControl(SIO_UDP_CONNRESET, new byte[] { 0 }, null);
+ m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag set");
}
- else
+ catch (SocketException)
{
- // Client mode
- IPEndPoint ipep = new IPEndPoint(Settings.BIND_ADDR, udpPort);
- udpSocket = new Socket(
- AddressFamily.InterNetwork,
- SocketType.Dgram,
- ProtocolType.Udp);
- udpSocket.Bind(ipep);
- //udpSocket.Connect(remoteEndPoint);
+ m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag not supported on this platform, ignoring");
}
+ m_udpSocket.Bind(ipep);
// we're not shutting down, we're starting up
- shutdownFlag = false;
+ m_shutdownFlag = false;
// kick off an async receive. The Start() method will return, the
// actual receives will occur asynchronously and will be caught in
@@ -133,104 +122,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
///
- ///
+ /// Stops the UDP server
///
public void Stop()
{
- if (!shutdownFlag)
+ if (!m_shutdownFlag)
{
// wait indefinitely for a writer lock. Once this is called, the .NET runtime
// will deny any more reader locks, in effect blocking all other send/receive
// threads. Once we have the lock, we set shutdownFlag to inform the other
// threads that the socket is closed.
- rwLock.AcquireWriterLock(-1);
- shutdownFlag = true;
- udpSocket.Close();
- rwLock.ReleaseWriterLock();
-
- // wait for any pending operations to complete on other
- // threads before exiting.
- const int FORCE_STOP = 100;
- int i = 0;
- while (rwOperationCount > 0 && i < FORCE_STOP)
- {
- Thread.Sleep(10);
- ++i;
- }
-
- if (i >= FORCE_STOP)
- {
- Logger.Log("UDPBase.Stop() forced shutdown while waiting on pending operations",
- Helpers.LogLevel.Warning);
- }
+ m_shutdownFlag = true;
+ m_udpSocket.Close();
}
}
- ///
- ///
- ///
- public bool IsRunning
- {
- get { return !shutdownFlag; }
- }
-
private void AsyncBeginReceive()
{
- // this method actually kicks off the async read on the socket.
- // we aquire a reader lock here to ensure that no other thread
- // is trying to set shutdownFlag and close the socket.
- rwLock.AcquireReaderLock(-1);
+ // allocate a packet buffer
+ //WrappedObject wrappedBuffer = Pool.CheckOut();
+ UDPPacketBuffer buf = new UDPPacketBuffer();
- if (!shutdownFlag)
+ if (!m_shutdownFlag)
{
- // increment the count of pending operations
- Interlocked.Increment(ref rwOperationCount);
-
- // allocate a packet buffer
- //WrappedObject wrappedBuffer = Pool.CheckOut();
- UDPPacketBuffer buf = new UDPPacketBuffer();
-
try
{
// kick off an async read
- udpSocket.BeginReceiveFrom(
+ m_udpSocket.BeginReceiveFrom(
//wrappedBuffer.Instance.Data,
buf.Data,
0,
UDPPacketBuffer.BUFFER_SIZE,
SocketFlags.None,
- //ref wrappedBuffer.Instance.RemoteEndPoint,
ref buf.RemoteEndPoint,
- new AsyncCallback(AsyncEndReceive),
+ AsyncEndReceive,
//wrappedBuffer);
buf);
}
- catch (SocketException)
+ catch (SocketException e)
{
- // something bad happened
- //Logger.Log(
- // "A SocketException occurred in UDPServer.AsyncBeginReceive()",
- // Helpers.LogLevel.Error, se);
-
- // an error occurred, therefore the operation is void. Decrement the reference count.
- Interlocked.Decrement(ref rwOperationCount);
+ if (e.SocketErrorCode == SocketError.ConnectionReset)
+ {
+ m_log.Warn("[UDPBASE]: SIO_UDP_CONNRESET was ignored, attempting to salvage the UDP listener on port " + m_udpPort);
+ bool salvaged = false;
+ while (!salvaged)
+ {
+ try
+ {
+ m_udpSocket.BeginReceiveFrom(
+ //wrappedBuffer.Instance.Data,
+ buf.Data,
+ 0,
+ UDPPacketBuffer.BUFFER_SIZE,
+ SocketFlags.None,
+ ref buf.RemoteEndPoint,
+ AsyncEndReceive,
+ //wrappedBuffer);
+ buf);
+ salvaged = true;
+ }
+ catch (SocketException) { }
+ catch (ObjectDisposedException) { return; }
+ }
+
+ m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort);
+ }
}
+ catch (ObjectDisposedException) { }
}
-
- // we're done with the socket for now, release the reader lock.
- rwLock.ReleaseReaderLock();
}
private void AsyncEndReceive(IAsyncResult iar)
{
// Asynchronous receive operations will complete here through the call
// to AsyncBeginReceive
-
- // aquire a reader lock
- rwLock.AcquireReaderLock(-1);
-
- if (!shutdownFlag)
+ if (!m_shutdownFlag)
{
+ // start another receive - this keeps the server going!
+ AsyncBeginReceive();
+
// get the buffer that was created in AsyncBeginReceive
// this is the received data
//WrappedObject wrappedBuffer = (WrappedObject)iar.AsyncState;
@@ -241,100 +211,49 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{
// get the length of data actually read from the socket, store it with the
// buffer
- buffer.DataLength = udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint);
-
- // this operation is now complete, decrement the reference count
- Interlocked.Decrement(ref rwOperationCount);
-
- // we're done with the socket, release the reader lock
- rwLock.ReleaseReaderLock();
+ buffer.DataLength = m_udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint);
// call the abstract method PacketReceived(), passing the buffer that
// has just been filled from the socket read.
PacketReceived(buffer);
}
- catch (SocketException)
- {
- // an error occurred, therefore the operation is void. Decrement the reference count.
- Interlocked.Decrement(ref rwOperationCount);
-
- // we're done with the socket for now, release the reader lock.
- rwLock.ReleaseReaderLock();
- }
- finally
- {
- // start another receive - this keeps the server going!
- AsyncBeginReceive();
-
- //wrappedBuffer.Dispose();
- }
- }
- else
- {
- // nothing bad happened, but we are done with the operation
- // decrement the reference count and release the reader lock
- Interlocked.Decrement(ref rwOperationCount);
- rwLock.ReleaseReaderLock();
+ catch (SocketException) { }
+ catch (ObjectDisposedException) { }
+ //finally { wrappedBuffer.Dispose(); }
}
}
public void AsyncBeginSend(UDPPacketBuffer buf)
{
- rwLock.AcquireReaderLock(-1);
-
- if (!shutdownFlag)
+ if (!m_shutdownFlag)
{
try
{
- Interlocked.Increment(ref rwOperationCount);
- udpSocket.BeginSendTo(
+ m_udpSocket.BeginSendTo(
buf.Data,
0,
buf.DataLength,
SocketFlags.None,
buf.RemoteEndPoint,
- new AsyncCallback(AsyncEndSend),
+ AsyncEndSend,
buf);
}
- catch (SocketException)
- {
- //Logger.Log(
- // "A SocketException occurred in UDPServer.AsyncBeginSend()",
- // Helpers.LogLevel.Error, se);
- }
+ catch (SocketException) { }
+ catch (ObjectDisposedException) { }
}
-
- rwLock.ReleaseReaderLock();
}
- private void AsyncEndSend(IAsyncResult iar)
+ void AsyncEndSend(IAsyncResult result)
{
- rwLock.AcquireReaderLock(-1);
-
- if (!shutdownFlag)
+ try
{
- UDPPacketBuffer buffer = (UDPPacketBuffer)iar.AsyncState;
+ UDPPacketBuffer buf = (UDPPacketBuffer)result.AsyncState;
+ int bytesSent = m_udpSocket.EndSendTo(result);
- try
- {
- int bytesSent = udpSocket.EndSendTo(iar);
-
- // note that call to the abstract PacketSent() method - we are passing the number
- // of bytes sent in a separate parameter, since we can't use buffer.DataLength which
- // is the number of bytes to send (or bytes received depending upon whether this
- // buffer was part of a send or a receive).
- PacketSent(buffer, bytesSent);
- }
- catch (SocketException)
- {
- //Logger.Log(
- // "A SocketException occurred in UDPServer.AsyncEndSend()",
- // Helpers.LogLevel.Error, se);
- }
+ PacketSent(buf, bytesSent);
}
-
- Interlocked.Decrement(ref rwOperationCount);
- rwLock.ReleaseReaderLock();
+ catch (SocketException) { }
+ catch (ObjectDisposedException) { }
}
}
}
--
cgit v1.1