diff options
author | John Hurliman | 2009-10-09 01:53:06 -0700 |
---|---|---|
committer | John Hurliman | 2009-10-09 01:53:06 -0700 |
commit | a5b9971fd77c0c4bf70656be7f3e7999f59d9f85 (patch) | |
tree | 9c062f865511d9efde455b800c55f9d94b4921b4 /OpenSim | |
parent | Simplified LLUDPClientCollection from three collections down to one. This wil... (diff) | |
download | opensim-SC-a5b9971fd77c0c4bf70656be7f3e7999f59d9f85.zip opensim-SC-a5b9971fd77c0c4bf70656be7f3e7999f59d9f85.tar.gz opensim-SC-a5b9971fd77c0c4bf70656be7f3e7999f59d9f85.tar.bz2 opensim-SC-a5b9971fd77c0c4bf70656be7f3e7999f59d9f85.tar.xz |
* Added a lock object for the write functions in LLUDPClientCollection (immutable != concurrent write safety)
* Allow the UDP server to bind to a user-specified port again
* Updated to a newer version of OpenSimUDPBase that streamlines the code even more. This also reintroduces the highly concurrent packet handling which needs more testing
Diffstat (limited to 'OpenSim')
3 files changed, 169 insertions, 210 deletions
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs index abf3882..2222a33 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClientCollection.cs | |||
@@ -36,6 +36,9 @@ using ReaderWriterLockImpl = OpenMetaverse.ReaderWriterLockSlim; | |||
36 | 36 | ||
37 | namespace OpenSim.Region.ClientStack.LindenUDP | 37 | namespace OpenSim.Region.ClientStack.LindenUDP |
38 | { | 38 | { |
39 | /// <summary> | ||
40 | /// A thread safe mapping from endpoints to client references | ||
41 | /// </summary> | ||
39 | public sealed class UDPClientCollection | 42 | public sealed class UDPClientCollection |
40 | { | 43 | { |
41 | #region IComparers | 44 | #region IComparers |
@@ -52,43 +55,80 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
52 | 55 | ||
53 | #endregion IComparers | 56 | #endregion IComparers |
54 | 57 | ||
58 | /// <summary>An immutable dictionary mapping from <seealso cref="IPEndPoint"/> | ||
59 | /// to <seealso cref="LLUDPClient"/> references</summary> | ||
55 | private ImmutableMap<IPEndPoint, LLUDPClient> m_dict; | 60 | private ImmutableMap<IPEndPoint, LLUDPClient> m_dict; |
61 | /// <summary>Immutability grants thread safety for concurrent reads and | ||
62 | /// read-writes, but not concurrent writes</summary> | ||
63 | private object m_writeLock; | ||
56 | 64 | ||
65 | /// <summary>Number of clients in the collection</summary> | ||
66 | public int Count { get { return m_dict.Count; } } | ||
67 | |||
68 | /// <summary> | ||
69 | /// Default constructor | ||
70 | /// </summary> | ||
57 | public UDPClientCollection() | 71 | public UDPClientCollection() |
58 | { | 72 | { |
59 | m_dict = new ImmutableMap<IPEndPoint, LLUDPClient>(new IPEndPointComparer()); | 73 | m_dict = new ImmutableMap<IPEndPoint, LLUDPClient>(new IPEndPointComparer()); |
60 | } | 74 | } |
61 | 75 | ||
76 | /// <summary> | ||
77 | /// Add a client reference to the collection | ||
78 | /// </summary> | ||
79 | /// <param name="key">Remote endpoint of the client</param> | ||
80 | /// <param name="value">Reference to the client object</param> | ||
62 | public void Add(IPEndPoint key, LLUDPClient value) | 81 | public void Add(IPEndPoint key, LLUDPClient value) |
63 | { | 82 | { |
64 | m_dict = m_dict.Add(key, value); | 83 | lock (m_writeLock) |
84 | m_dict = m_dict.Add(key, value); | ||
65 | } | 85 | } |
66 | 86 | ||
87 | /// <summary> | ||
88 | /// Remove a client from the collection | ||
89 | /// </summary> | ||
90 | /// <param name="key">Remote endpoint of the client</param> | ||
67 | public void Remove(IPEndPoint key) | 91 | public void Remove(IPEndPoint key) |
68 | { | 92 | { |
69 | m_dict = m_dict.Delete(key); | 93 | lock (m_writeLock) |
94 | m_dict = m_dict.Delete(key); | ||
70 | } | 95 | } |
71 | 96 | ||
97 | /// <summary> | ||
98 | /// Resets the client collection | ||
99 | /// </summary> | ||
72 | public void Clear() | 100 | public void Clear() |
73 | { | 101 | { |
74 | m_dict = new ImmutableMap<IPEndPoint, LLUDPClient>(new IPEndPointComparer()); | 102 | lock (m_writeLock) |
75 | } | 103 | m_dict = new ImmutableMap<IPEndPoint, LLUDPClient>(new IPEndPointComparer()); |
76 | |||
77 | public int Count | ||
78 | { | ||
79 | get { return m_dict.Count; } | ||
80 | } | 104 | } |
81 | 105 | ||
106 | /// <summary> | ||
107 | /// Checks if an endpoint is in the collection | ||
108 | /// </summary> | ||
109 | /// <param name="key">Endpoint to check for</param> | ||
110 | /// <returns>True if the endpoint was found in the collection, otherwise false</returns> | ||
82 | public bool ContainsKey(IPEndPoint key) | 111 | public bool ContainsKey(IPEndPoint key) |
83 | { | 112 | { |
84 | return m_dict.ContainsKey(key); | 113 | return m_dict.ContainsKey(key); |
85 | } | 114 | } |
86 | 115 | ||
116 | /// <summary> | ||
117 | /// Attempts to fetch a value out of the collection | ||
118 | /// </summary> | ||
119 | /// <param name="key">Endpoint of the client to retrieve</param> | ||
120 | /// <param name="value">Retrieved client, or null on lookup failure</param> | ||
121 | /// <returns>True if the lookup succeeded, otherwise false</returns> | ||
87 | public bool TryGetValue(IPEndPoint key, out LLUDPClient value) | 122 | public bool TryGetValue(IPEndPoint key, out LLUDPClient value) |
88 | { | 123 | { |
89 | return m_dict.TryGetValue(key, out value); | 124 | return m_dict.TryGetValue(key, out value); |
90 | } | 125 | } |
91 | 126 | ||
127 | /// <summary> | ||
128 | /// Performs a given task in parallel for each of the elements in the | ||
129 | /// collection | ||
130 | /// </summary> | ||
131 | /// <param name="action">Action to perform on each element</param> | ||
92 | public void ForEach(Action<LLUDPClient> action) | 132 | public void ForEach(Action<LLUDPClient> action) |
93 | { | 133 | { |
94 | Parallel.ForEach<LLUDPClient>(m_dict.Values, action); | 134 | Parallel.ForEach<LLUDPClient>(m_dict.Values, action); |
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs index a6aa048..9aeea9a 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs | |||
@@ -96,7 +96,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
96 | /// <summary>Incoming packets that are awaiting handling</summary> | 96 | /// <summary>Incoming packets that are awaiting handling</summary> |
97 | private OpenMetaverse.BlockingQueue<IncomingPacket> packetInbox = new OpenMetaverse.BlockingQueue<IncomingPacket>(); | 97 | private OpenMetaverse.BlockingQueue<IncomingPacket> packetInbox = new OpenMetaverse.BlockingQueue<IncomingPacket>(); |
98 | /// <summary></summary> | 98 | /// <summary></summary> |
99 | private UDPClientCollection clients = new UDPClientCollection(); | 99 | private UDPClientCollection m_clients = new UDPClientCollection(); |
100 | /// <summary>Bandwidth throttle for this UDP server</summary> | 100 | /// <summary>Bandwidth throttle for this UDP server</summary> |
101 | private TokenBucket m_throttle; | 101 | private TokenBucket m_throttle; |
102 | /// <summary>Bandwidth throttle rates for this UDP server</summary> | 102 | /// <summary>Bandwidth throttle rates for this UDP server</summary> |
@@ -115,7 +115,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
115 | public Socket Server { get { return null; } } | 115 | public Socket Server { get { return null; } } |
116 | 116 | ||
117 | public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager) | 117 | public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager) |
118 | : base((int)port) | 118 | : base(listenIP, (int)port) |
119 | { | 119 | { |
120 | #region Environment.TickCount Measurement | 120 | #region Environment.TickCount Measurement |
121 | 121 | ||
@@ -143,7 +143,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
143 | public new void Start() | 143 | public new void Start() |
144 | { | 144 | { |
145 | if (m_scene == null) | 145 | if (m_scene == null) |
146 | throw new InvalidOperationException("Cannot LLUDPServer.Start() without an IScene reference"); | 146 | throw new InvalidOperationException("[LLUDPSERVER]: Cannot LLUDPServer.Start() without an IScene reference"); |
147 | 147 | ||
148 | base.Start(); | 148 | base.Start(); |
149 | 149 | ||
@@ -188,7 +188,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
188 | m_scene.ClientManager.Remove(udpClient.CircuitCode); | 188 | m_scene.ClientManager.Remove(udpClient.CircuitCode); |
189 | udpClient.ClientAPI.Close(false); | 189 | udpClient.ClientAPI.Close(false); |
190 | udpClient.Shutdown(); | 190 | udpClient.Shutdown(); |
191 | clients.Remove(udpClient.RemoteEndPoint); | 191 | m_clients.Remove(udpClient.RemoteEndPoint); |
192 | } | 192 | } |
193 | 193 | ||
194 | public void BroadcastPacket(Packet packet, ThrottleOutPacketType category, bool sendToPausedAgents, bool allowSplitting) | 194 | public void BroadcastPacket(Packet packet, ThrottleOutPacketType category, bool sendToPausedAgents, bool allowSplitting) |
@@ -208,7 +208,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
208 | for (int i = 0; i < packetCount; i++) | 208 | for (int i = 0; i < packetCount; i++) |
209 | { | 209 | { |
210 | byte[] data = datas[i]; | 210 | byte[] data = datas[i]; |
211 | clients.ForEach( | 211 | m_clients.ForEach( |
212 | delegate(LLUDPClient client) | 212 | delegate(LLUDPClient client) |
213 | { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); }); | 213 | { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); }); |
214 | } | 214 | } |
@@ -216,7 +216,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
216 | else | 216 | else |
217 | { | 217 | { |
218 | byte[] data = packet.ToBytes(); | 218 | byte[] data = packet.ToBytes(); |
219 | clients.ForEach( | 219 | m_clients.ForEach( |
220 | delegate(LLUDPClient client) | 220 | delegate(LLUDPClient client) |
221 | { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); }); | 221 | { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); }); |
222 | } | 222 | } |
@@ -502,7 +502,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
502 | } | 502 | } |
503 | 503 | ||
504 | // Determine which agent this packet came from | 504 | // Determine which agent this packet came from |
505 | if (!clients.TryGetValue(address, out client)) | 505 | if (!m_clients.TryGetValue(address, out client)) |
506 | { | 506 | { |
507 | m_log.Warn("[LLUDPSERVER]: Received a " + packet.Type + " packet from an unrecognized source: " + address); | 507 | m_log.Warn("[LLUDPSERVER]: Received a " + packet.Type + " packet from an unrecognized source: " + address); |
508 | return; | 508 | return; |
@@ -606,7 +606,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
606 | if (m_scene.RegionStatus != RegionStatus.SlaveScene) | 606 | if (m_scene.RegionStatus != RegionStatus.SlaveScene) |
607 | { | 607 | { |
608 | AuthenticateResponse sessionInfo; | 608 | AuthenticateResponse sessionInfo; |
609 | bool isNewCircuit = !clients.ContainsKey(remoteEndPoint); | 609 | bool isNewCircuit = !m_clients.ContainsKey(remoteEndPoint); |
610 | 610 | ||
611 | if (!IsClientAuthorized(useCircuitCode, out sessionInfo)) | 611 | if (!IsClientAuthorized(useCircuitCode, out sessionInfo)) |
612 | { | 612 | { |
@@ -648,7 +648,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
648 | udpClient.ClientAPI = clientApi; | 648 | udpClient.ClientAPI = clientApi; |
649 | 649 | ||
650 | // Add the new client to our list of tracked clients | 650 | // Add the new client to our list of tracked clients |
651 | clients.Add(udpClient.RemoteEndPoint, udpClient); | 651 | m_clients.Add(udpClient.RemoteEndPoint, udpClient); |
652 | } | 652 | } |
653 | 653 | ||
654 | private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend) | 654 | private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend) |
@@ -726,7 +726,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
726 | elapsed500MS = 0; | 726 | elapsed500MS = 0; |
727 | } | 727 | } |
728 | 728 | ||
729 | clients.ForEach( | 729 | m_clients.ForEach( |
730 | delegate(LLUDPClient client) | 730 | delegate(LLUDPClient client) |
731 | { | 731 | { |
732 | if (client.DequeueOutgoing()) | 732 | if (client.DequeueOutgoing()) |
diff --git a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs index 218aaac..9b1751d 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OpenSimUDPBase.cs | |||
@@ -29,101 +29,90 @@ using System; | |||
29 | using System.Net; | 29 | using System.Net; |
30 | using System.Net.Sockets; | 30 | using System.Net.Sockets; |
31 | using System.Threading; | 31 | using System.Threading; |
32 | using OpenMetaverse; | 32 | using log4net; |
33 | 33 | ||
34 | namespace OpenSim.Region.ClientStack.LindenUDP | 34 | namespace OpenMetaverse |
35 | { | 35 | { |
36 | /// <summary> | 36 | /// <summary> |
37 | /// | 37 | /// Base UDP server |
38 | /// </summary> | 38 | /// </summary> |
39 | public abstract class OpenSimUDPBase | 39 | public abstract class OpenSimUDPBase |
40 | { | 40 | { |
41 | // these abstract methods must be implemented in a derived class to actually do | 41 | private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); |
42 | // something with the packets that are sent and received. | 42 | |
43 | /// <summary> | ||
44 | /// This method is called when an incoming packet is received | ||
45 | /// </summary> | ||
46 | /// <param name="buffer">Incoming packet buffer</param> | ||
43 | protected abstract void PacketReceived(UDPPacketBuffer buffer); | 47 | protected abstract void PacketReceived(UDPPacketBuffer buffer); |
48 | |||
49 | /// <summary> | ||
50 | /// This method is called when an outgoing packet is sent | ||
51 | /// </summary> | ||
52 | /// <param name="buffer">Outgoing packet buffer</param> | ||
53 | /// <param name="bytesSent">Number of bytes written to the wire</param> | ||
44 | protected abstract void PacketSent(UDPPacketBuffer buffer, int bytesSent); | 54 | protected abstract void PacketSent(UDPPacketBuffer buffer, int bytesSent); |
45 | 55 | ||
46 | // the port to listen on | 56 | /// <summary>UDP port to bind to in server mode</summary> |
47 | internal int udpPort; | 57 | protected int m_udpPort; |
48 | |||
49 | // the UDP socket | ||
50 | private Socket udpSocket; | ||
51 | 58 | ||
52 | // the ReaderWriterLock is used solely for the purposes of shutdown (Stop()). | 59 | /// <summary>Local IP address to bind to in server mode</summary> |
53 | // since there are potentially many "reader" threads in the internal .NET IOCP | 60 | protected IPAddress m_localBindAddress; |
54 | // thread pool, this is a cheaper synchronization primitive than using | ||
55 | // a Mutex object. This allows many UDP socket "reads" concurrently - when | ||
56 | // Stop() is called, it attempts to obtain a writer lock which will then | ||
57 | // wait until all outstanding operations are completed before shutting down. | ||
58 | // this avoids the problem of closing the socket with outstanding operations | ||
59 | // and trying to catch the inevitable ObjectDisposedException. | ||
60 | private ReaderWriterLock rwLock = new ReaderWriterLock(); | ||
61 | 61 | ||
62 | // number of outstanding operations. This is a reference count | 62 | /// <summary>UDP socket, used in either client or server mode</summary> |
63 | // which we use to ensure that the threads exit cleanly. Note that | 63 | private Socket m_udpSocket; |
64 | // we need this because the threads will potentially still need to process | ||
65 | // data even after the socket is closed. | ||
66 | private int rwOperationCount = 0; | ||
67 | 64 | ||
68 | // the all important shutdownFlag. This is synchronized through the ReaderWriterLock. | 65 | /// <summary>The all important shutdown flag</summary> |
69 | private volatile bool shutdownFlag = true; | 66 | private volatile bool m_shutdownFlag = true; |
70 | |||
71 | // the remote endpoint to communicate with | ||
72 | protected IPEndPoint remoteEndPoint = null; | ||
73 | 67 | ||
68 | /// <summary>Returns true if the server is currently listening, otherwise false</summary> | ||
69 | public bool IsRunning { get { return !m_shutdownFlag; } } | ||
74 | 70 | ||
75 | /// <summary> | 71 | /// <summary> |
76 | /// Initialize the UDP packet handler in server mode | 72 | /// Default constructor |
77 | /// </summary> | 73 | /// </summary> |
74 | /// <param name="bindAddress">Local IP address to bind the server to</param> | ||
78 | /// <param name="port">Port to listening for incoming UDP packets on</param> | 75 | /// <param name="port">Port to listening for incoming UDP packets on</param> |
79 | public OpenSimUDPBase(int port) | 76 | public OpenSimUDPBase(IPAddress bindAddress, int port) |
80 | { | ||
81 | udpPort = port; | ||
82 | } | ||
83 | |||
84 | /// <summary> | ||
85 | /// Initialize the UDP packet handler in client mode | ||
86 | /// </summary> | ||
87 | /// <param name="endPoint">Remote UDP server to connect to</param> | ||
88 | public OpenSimUDPBase(IPEndPoint endPoint) | ||
89 | { | 77 | { |
90 | remoteEndPoint = endPoint; | 78 | m_localBindAddress = bindAddress; |
91 | udpPort = 0; | 79 | m_udpPort = port; |
92 | } | 80 | } |
93 | 81 | ||
94 | /// <summary> | 82 | /// <summary> |
95 | /// | 83 | /// Start the UDP server |
96 | /// </summary> | 84 | /// </summary> |
85 | /// <remarks>This method will attempt to set the SIO_UDP_CONNRESET flag | ||
86 | /// on the socket to get newer versions of Windows to behave in a sane | ||
87 | /// manner (not throwing an exception when the remote side resets the | ||
88 | /// connection). This call is ignored on Mono where the flag is not | ||
89 | /// necessary</remarks> | ||
97 | public void Start() | 90 | public void Start() |
98 | { | 91 | { |
99 | if (shutdownFlag) | 92 | if (m_shutdownFlag) |
100 | { | 93 | { |
101 | if (remoteEndPoint == null) | 94 | const int SIO_UDP_CONNRESET = -1744830452; |
102 | { | ||
103 | // Server mode | ||
104 | 95 | ||
105 | // create and bind the socket | 96 | IPEndPoint ipep = new IPEndPoint(m_localBindAddress, m_udpPort); |
106 | IPEndPoint ipep = new IPEndPoint(Settings.BIND_ADDR, udpPort); | 97 | m_udpSocket = new Socket( |
107 | udpSocket = new Socket( | 98 | AddressFamily.InterNetwork, |
108 | AddressFamily.InterNetwork, | 99 | SocketType.Dgram, |
109 | SocketType.Dgram, | 100 | ProtocolType.Udp); |
110 | ProtocolType.Udp); | 101 | try |
111 | udpSocket.Bind(ipep); | 102 | { |
103 | // this udp socket flag is not supported under mono, | ||
104 | // so we'll catch the exception and continue | ||
105 | m_udpSocket.IOControl(SIO_UDP_CONNRESET, new byte[] { 0 }, null); | ||
106 | m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag set"); | ||
112 | } | 107 | } |
113 | else | 108 | catch (SocketException) |
114 | { | 109 | { |
115 | // Client mode | 110 | m_log.Debug("[UDPBASE]: SIO_UDP_CONNRESET flag not supported on this platform, ignoring"); |
116 | IPEndPoint ipep = new IPEndPoint(Settings.BIND_ADDR, udpPort); | ||
117 | udpSocket = new Socket( | ||
118 | AddressFamily.InterNetwork, | ||
119 | SocketType.Dgram, | ||
120 | ProtocolType.Udp); | ||
121 | udpSocket.Bind(ipep); | ||
122 | //udpSocket.Connect(remoteEndPoint); | ||
123 | } | 111 | } |
112 | m_udpSocket.Bind(ipep); | ||
124 | 113 | ||
125 | // we're not shutting down, we're starting up | 114 | // we're not shutting down, we're starting up |
126 | shutdownFlag = false; | 115 | m_shutdownFlag = false; |
127 | 116 | ||
128 | // kick off an async receive. The Start() method will return, the | 117 | // kick off an async receive. The Start() method will return, the |
129 | // actual receives will occur asynchronously and will be caught in | 118 | // actual receives will occur asynchronously and will be caught in |
@@ -133,104 +122,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
133 | } | 122 | } |
134 | 123 | ||
135 | /// <summary> | 124 | /// <summary> |
136 | /// | 125 | /// Stops the UDP server |
137 | /// </summary> | 126 | /// </summary> |
138 | public void Stop() | 127 | public void Stop() |
139 | { | 128 | { |
140 | if (!shutdownFlag) | 129 | if (!m_shutdownFlag) |
141 | { | 130 | { |
142 | // wait indefinitely for a writer lock. Once this is called, the .NET runtime | 131 | // wait indefinitely for a writer lock. Once this is called, the .NET runtime |
143 | // will deny any more reader locks, in effect blocking all other send/receive | 132 | // will deny any more reader locks, in effect blocking all other send/receive |
144 | // threads. Once we have the lock, we set shutdownFlag to inform the other | 133 | // threads. Once we have the lock, we set shutdownFlag to inform the other |
145 | // threads that the socket is closed. | 134 | // threads that the socket is closed. |
146 | rwLock.AcquireWriterLock(-1); | 135 | m_shutdownFlag = true; |
147 | shutdownFlag = true; | 136 | m_udpSocket.Close(); |
148 | udpSocket.Close(); | ||
149 | rwLock.ReleaseWriterLock(); | ||
150 | |||
151 | // wait for any pending operations to complete on other | ||
152 | // threads before exiting. | ||
153 | const int FORCE_STOP = 100; | ||
154 | int i = 0; | ||
155 | while (rwOperationCount > 0 && i < FORCE_STOP) | ||
156 | { | ||
157 | Thread.Sleep(10); | ||
158 | ++i; | ||
159 | } | ||
160 | |||
161 | if (i >= FORCE_STOP) | ||
162 | { | ||
163 | Logger.Log("UDPBase.Stop() forced shutdown while waiting on pending operations", | ||
164 | Helpers.LogLevel.Warning); | ||
165 | } | ||
166 | } | 137 | } |
167 | } | 138 | } |
168 | 139 | ||
169 | /// <summary> | ||
170 | /// | ||
171 | /// </summary> | ||
172 | public bool IsRunning | ||
173 | { | ||
174 | get { return !shutdownFlag; } | ||
175 | } | ||
176 | |||
177 | private void AsyncBeginReceive() | 140 | private void AsyncBeginReceive() |
178 | { | 141 | { |
179 | // this method actually kicks off the async read on the socket. | 142 | // allocate a packet buffer |
180 | // we aquire a reader lock here to ensure that no other thread | 143 | //WrappedObject<UDPPacketBuffer> wrappedBuffer = Pool.CheckOut(); |
181 | // is trying to set shutdownFlag and close the socket. | 144 | UDPPacketBuffer buf = new UDPPacketBuffer(); |
182 | rwLock.AcquireReaderLock(-1); | ||
183 | 145 | ||
184 | if (!shutdownFlag) | 146 | if (!m_shutdownFlag) |
185 | { | 147 | { |
186 | // increment the count of pending operations | ||
187 | Interlocked.Increment(ref rwOperationCount); | ||
188 | |||
189 | // allocate a packet buffer | ||
190 | //WrappedObject<UDPPacketBuffer> wrappedBuffer = Pool.CheckOut(); | ||
191 | UDPPacketBuffer buf = new UDPPacketBuffer(); | ||
192 | |||
193 | try | 148 | try |
194 | { | 149 | { |
195 | // kick off an async read | 150 | // kick off an async read |
196 | udpSocket.BeginReceiveFrom( | 151 | m_udpSocket.BeginReceiveFrom( |
197 | //wrappedBuffer.Instance.Data, | 152 | //wrappedBuffer.Instance.Data, |
198 | buf.Data, | 153 | buf.Data, |
199 | 0, | 154 | 0, |
200 | UDPPacketBuffer.BUFFER_SIZE, | 155 | UDPPacketBuffer.BUFFER_SIZE, |
201 | SocketFlags.None, | 156 | SocketFlags.None, |
202 | //ref wrappedBuffer.Instance.RemoteEndPoint, | ||
203 | ref buf.RemoteEndPoint, | 157 | ref buf.RemoteEndPoint, |
204 | new AsyncCallback(AsyncEndReceive), | 158 | AsyncEndReceive, |
205 | //wrappedBuffer); | 159 | //wrappedBuffer); |
206 | buf); | 160 | buf); |
207 | } | 161 | } |
208 | catch (SocketException) | 162 | catch (SocketException e) |
209 | { | 163 | { |
210 | // something bad happened | 164 | if (e.SocketErrorCode == SocketError.ConnectionReset) |
211 | //Logger.Log( | 165 | { |
212 | // "A SocketException occurred in UDPServer.AsyncBeginReceive()", | 166 | m_log.Warn("[UDPBASE]: SIO_UDP_CONNRESET was ignored, attempting to salvage the UDP listener on port " + m_udpPort); |
213 | // Helpers.LogLevel.Error, se); | 167 | bool salvaged = false; |
214 | 168 | while (!salvaged) | |
215 | // an error occurred, therefore the operation is void. Decrement the reference count. | 169 | { |
216 | Interlocked.Decrement(ref rwOperationCount); | 170 | try |
171 | { | ||
172 | m_udpSocket.BeginReceiveFrom( | ||
173 | //wrappedBuffer.Instance.Data, | ||
174 | buf.Data, | ||
175 | 0, | ||
176 | UDPPacketBuffer.BUFFER_SIZE, | ||
177 | SocketFlags.None, | ||
178 | ref buf.RemoteEndPoint, | ||
179 | AsyncEndReceive, | ||
180 | //wrappedBuffer); | ||
181 | buf); | ||
182 | salvaged = true; | ||
183 | } | ||
184 | catch (SocketException) { } | ||
185 | catch (ObjectDisposedException) { return; } | ||
186 | } | ||
187 | |||
188 | m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort); | ||
189 | } | ||
217 | } | 190 | } |
191 | catch (ObjectDisposedException) { } | ||
218 | } | 192 | } |
219 | |||
220 | // we're done with the socket for now, release the reader lock. | ||
221 | rwLock.ReleaseReaderLock(); | ||
222 | } | 193 | } |
223 | 194 | ||
224 | private void AsyncEndReceive(IAsyncResult iar) | 195 | private void AsyncEndReceive(IAsyncResult iar) |
225 | { | 196 | { |
226 | // Asynchronous receive operations will complete here through the call | 197 | // Asynchronous receive operations will complete here through the call |
227 | // to AsyncBeginReceive | 198 | // to AsyncBeginReceive |
228 | 199 | if (!m_shutdownFlag) | |
229 | // aquire a reader lock | ||
230 | rwLock.AcquireReaderLock(-1); | ||
231 | |||
232 | if (!shutdownFlag) | ||
233 | { | 200 | { |
201 | // start another receive - this keeps the server going! | ||
202 | AsyncBeginReceive(); | ||
203 | |||
234 | // get the buffer that was created in AsyncBeginReceive | 204 | // get the buffer that was created in AsyncBeginReceive |
235 | // this is the received data | 205 | // this is the received data |
236 | //WrappedObject<UDPPacketBuffer> wrappedBuffer = (WrappedObject<UDPPacketBuffer>)iar.AsyncState; | 206 | //WrappedObject<UDPPacketBuffer> wrappedBuffer = (WrappedObject<UDPPacketBuffer>)iar.AsyncState; |
@@ -241,100 +211,49 @@ namespace OpenSim.Region.ClientStack.LindenUDP | |||
241 | { | 211 | { |
242 | // get the length of data actually read from the socket, store it with the | 212 | // get the length of data actually read from the socket, store it with the |
243 | // buffer | 213 | // buffer |
244 | buffer.DataLength = udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint); | 214 | buffer.DataLength = m_udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint); |
245 | |||
246 | // this operation is now complete, decrement the reference count | ||
247 | Interlocked.Decrement(ref rwOperationCount); | ||
248 | |||
249 | // we're done with the socket, release the reader lock | ||
250 | rwLock.ReleaseReaderLock(); | ||
251 | 215 | ||
252 | // call the abstract method PacketReceived(), passing the buffer that | 216 | // call the abstract method PacketReceived(), passing the buffer that |
253 | // has just been filled from the socket read. | 217 | // has just been filled from the socket read. |
254 | PacketReceived(buffer); | 218 | PacketReceived(buffer); |
255 | } | 219 | } |
256 | catch (SocketException) | 220 | catch (SocketException) { } |
257 | { | 221 | catch (ObjectDisposedException) { } |
258 | // an error occurred, therefore the operation is void. Decrement the reference count. | 222 | //finally { wrappedBuffer.Dispose(); } |
259 | Interlocked.Decrement(ref rwOperationCount); | ||
260 | |||
261 | // we're done with the socket for now, release the reader lock. | ||
262 | rwLock.ReleaseReaderLock(); | ||
263 | } | ||
264 | finally | ||
265 | { | ||
266 | // start another receive - this keeps the server going! | ||
267 | AsyncBeginReceive(); | ||
268 | |||
269 | //wrappedBuffer.Dispose(); | ||
270 | } | ||
271 | } | ||
272 | else | ||
273 | { | ||
274 | // nothing bad happened, but we are done with the operation | ||
275 | // decrement the reference count and release the reader lock | ||
276 | Interlocked.Decrement(ref rwOperationCount); | ||
277 | rwLock.ReleaseReaderLock(); | ||
278 | } | 223 | } |
279 | } | 224 | } |
280 | 225 | ||
281 | public void AsyncBeginSend(UDPPacketBuffer buf) | 226 | public void AsyncBeginSend(UDPPacketBuffer buf) |
282 | { | 227 | { |
283 | rwLock.AcquireReaderLock(-1); | 228 | if (!m_shutdownFlag) |
284 | |||
285 | if (!shutdownFlag) | ||
286 | { | 229 | { |
287 | try | 230 | try |
288 | { | 231 | { |
289 | Interlocked.Increment(ref rwOperationCount); | 232 | m_udpSocket.BeginSendTo( |
290 | udpSocket.BeginSendTo( | ||
291 | buf.Data, | 233 | buf.Data, |
292 | 0, | 234 | 0, |
293 | buf.DataLength, | 235 | buf.DataLength, |
294 | SocketFlags.None, | 236 | SocketFlags.None, |
295 | buf.RemoteEndPoint, | 237 | buf.RemoteEndPoint, |
296 | new AsyncCallback(AsyncEndSend), | 238 | AsyncEndSend, |
297 | buf); | 239 | buf); |
298 | } | 240 | } |
299 | catch (SocketException) | 241 | catch (SocketException) { } |
300 | { | 242 | catch (ObjectDisposedException) { } |
301 | //Logger.Log( | ||
302 | // "A SocketException occurred in UDPServer.AsyncBeginSend()", | ||
303 | // Helpers.LogLevel.Error, se); | ||
304 | } | ||
305 | } | 243 | } |
306 | |||
307 | rwLock.ReleaseReaderLock(); | ||
308 | } | 244 | } |
309 | 245 | ||
310 | private void AsyncEndSend(IAsyncResult iar) | 246 | void AsyncEndSend(IAsyncResult result) |
311 | { | 247 | { |
312 | rwLock.AcquireReaderLock(-1); | 248 | try |
313 | |||
314 | if (!shutdownFlag) | ||
315 | { | 249 | { |
316 | UDPPacketBuffer buffer = (UDPPacketBuffer)iar.AsyncState; | 250 | UDPPacketBuffer buf = (UDPPacketBuffer)result.AsyncState; |
251 | int bytesSent = m_udpSocket.EndSendTo(result); | ||
317 | 252 | ||
318 | try | 253 | PacketSent(buf, bytesSent); |
319 | { | ||
320 | int bytesSent = udpSocket.EndSendTo(iar); | ||
321 | |||
322 | // note that call to the abstract PacketSent() method - we are passing the number | ||
323 | // of bytes sent in a separate parameter, since we can't use buffer.DataLength which | ||
324 | // is the number of bytes to send (or bytes received depending upon whether this | ||
325 | // buffer was part of a send or a receive). | ||
326 | PacketSent(buffer, bytesSent); | ||
327 | } | ||
328 | catch (SocketException) | ||
329 | { | ||
330 | //Logger.Log( | ||
331 | // "A SocketException occurred in UDPServer.AsyncEndSend()", | ||
332 | // Helpers.LogLevel.Error, se); | ||
333 | } | ||
334 | } | 254 | } |
335 | 255 | catch (SocketException) { } | |
336 | Interlocked.Decrement(ref rwOperationCount); | 256 | catch (ObjectDisposedException) { } |
337 | rwLock.ReleaseReaderLock(); | ||
338 | } | 257 | } |
339 | } | 258 | } |
340 | } | 259 | } |