From 1e9a66cbaae97759c5c4e936664b5cc7a4feca89 Mon Sep 17 00:00:00 2001 From: Tedd Hansen Date: Sat, 12 Jan 2008 00:48:58 +0000 Subject: ScriptServer communication protocol (v1), primitive RPC-like TCP client/server --- .../Region/ScriptEngine/Common/TRPC/TCPClient.cs | 109 ++++++++++++++++ .../Region/ScriptEngine/Common/TRPC/TCPCommon.cs | 33 +++++ .../Region/ScriptEngine/Common/TRPC/TCPServer.cs | 106 +++++++++++++++ .../Region/ScriptEngine/Common/TRPC/TCPSocket.cs | 86 ++++++++++++ OpenSim/Region/ScriptEngine/Common/TRPC_Remote.cs | 144 +++++++++++++++++++++ 5 files changed, 478 insertions(+) create mode 100644 OpenSim/Region/ScriptEngine/Common/TRPC/TCPClient.cs create mode 100644 OpenSim/Region/ScriptEngine/Common/TRPC/TCPCommon.cs create mode 100644 OpenSim/Region/ScriptEngine/Common/TRPC/TCPServer.cs create mode 100644 OpenSim/Region/ScriptEngine/Common/TRPC/TCPSocket.cs create mode 100644 OpenSim/Region/ScriptEngine/Common/TRPC_Remote.cs (limited to 'OpenSim/Region/ScriptEngine/Common') diff --git a/OpenSim/Region/ScriptEngine/Common/TRPC/TCPClient.cs b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPClient.cs new file mode 100644 index 0000000..3230614 --- /dev/null +++ b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPClient.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace OpenSim.Region.ScriptEngine.Common.TRPC +{ + public class TCPClient: TCPCommon.ClientInterface + { + + public TCPClient() + { + } + private readonly Dictionary Clients = new Dictionary(); + private int ClientCount = 0; + + + public event TCPCommon.ClientConnectedDelegate ClientConnected; + public event TCPCommon.DataReceivedDelegate DataReceived; + public event TCPCommon.DataSentDelegate DataSent; + public event TCPCommon.CloseDelegate Close; + public event TCPCommon.ConnectErrorDelegate ConnectError; + + + /// + /// Creates client connection + /// + public void Connect(string RemoteHost, int RemotePort) + { + Socket newsock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + IPEndPoint ipe = new IPEndPoint(IPAddress.Parse(RemoteHost), RemotePort); + newsock.BeginConnect(ipe, new AsyncCallback(asyncConnected), newsock); + + + } + + public void Disconnect(int ID) + { + Clients[ID].Disconnect(); + } + + void asyncConnected(IAsyncResult iar) + { + Socket client = (Socket)iar.AsyncState; + try + { + client.EndConnect(iar); + + + int id = ClientCount++; + TCPSocket S = new TCPSocket(id, client); + + // Add to dictionary + Clients.Add(id, S); + + // Add event handlers + S.Close += new TCPSocket.CloseDelegate(S_Close); + S.DataReceived += new TCPSocket.DataReceivedDelegate(S_DataReceived); + S.DataSent += new TCPSocket.DataSentDelegate(S_DataSent); + + // Start it + S.Start(); + + Debug.WriteLine("Connection established: " + client.RemoteEndPoint.ToString()); + + // Fire Connected-event + if (ClientConnected != null) + ClientConnected(id, client.RemoteEndPoint); + + } + catch (SocketException sex) + { + if (ConnectError != null) + ConnectError(sex.Message); + } + } + + + + + void S_DataSent(int ID, int length) + { + if (DataSent != null) + DataSent(ID, length); + } + + void S_DataReceived(int ID, byte[] data, int offset, int length) + { + if (DataReceived != null) + DataReceived(ID, data, offset, length); + } + + void S_Close(int ID) + { + if (Close != null) + Close(ID); + Clients.Remove(ID); + } + + public void Send(int clientID, byte[] data, int offset, int len) + { + Clients[clientID].Send(clientID, data, offset, len); + } + + + } +} \ No newline at end of file diff --git a/OpenSim/Region/ScriptEngine/Common/TRPC/TCPCommon.cs b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPCommon.cs new file mode 100644 index 0000000..83548b4 --- /dev/null +++ b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPCommon.cs @@ -0,0 +1,33 @@ +namespace OpenSim.Region.ScriptEngine.Common.TRPC +{ + public class TCPCommon + { + public delegate void ClientConnectedDelegate(int ID, System.Net.EndPoint Remote); + public delegate void DataReceivedDelegate(int ID, byte[] data, int offset, int length); + public delegate void DataSentDelegate(int ID, int length); + public delegate void CloseDelegate(int ID); + public delegate void ConnectErrorDelegate(string Reason); + + + public interface ServerAndClientInterface + { + void Send(int clientID, byte[] data, int offset, int len); + event ClientConnectedDelegate ClientConnected; + event DataReceivedDelegate DataReceived; + event DataSentDelegate DataSent; + event CloseDelegate Close; + } + public interface ClientInterface : ServerAndClientInterface + { + event TCPCommon.ConnectErrorDelegate ConnectError; + void Connect(string RemoteHost, int RemotePort); + void Disconnect(int ID); + } + public interface ServerInterface : ServerAndClientInterface + { + void StartListen(); + void StopListen(); + } + + } +} \ No newline at end of file diff --git a/OpenSim/Region/ScriptEngine/Common/TRPC/TCPServer.cs b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPServer.cs new file mode 100644 index 0000000..3af898a --- /dev/null +++ b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPServer.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using TCPCommon=OpenSim.Region.ScriptEngine.Common.TRPC.TCPCommon; + +namespace OpenSim.Region.ScriptEngine.Common.TRPC +{ + public class TCPServer: TCPCommon.ServerInterface + { + public readonly int LocalPort; + public TCPServer(int localPort) + { + LocalPort = localPort; + } + + private Socket server; + + /// + /// Starts listening for new connections + /// + public void StartListen() + { + server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + IPEndPoint ipe = new IPEndPoint(IPAddress.Any, LocalPort); + server.Bind(ipe); + server.Listen(10); + server.BeginAccept(new AsyncCallback(AsyncAcceptConnections), server); + } + /// + /// Stops listening for new connections + /// + public void StopListen() + { + server.Close(); + server = null; + } + + private readonly Dictionary Clients = new Dictionary(); + private int ClientCount = 0; + + + public event TCPCommon.ClientConnectedDelegate ClientConnected; + public event TCPCommon.DataReceivedDelegate DataReceived; + public event TCPCommon.DataSentDelegate DataSent; + public event TCPCommon.CloseDelegate Close; + + /// + /// Async callback for new connections + /// + /// + private void AsyncAcceptConnections(IAsyncResult ar) + { + int id = ClientCount++; + Socket oldserver = (Socket)ar.AsyncState; + Socket client = oldserver.EndAccept(ar); + TCPSocket S = new TCPSocket(id, client); + + // Add to dictionary + Clients.Add(id, S); + + // Add event handlers + S.Close += new TCPSocket.CloseDelegate(S_Close); + S.DataReceived += new TCPSocket.DataReceivedDelegate(S_DataReceived); + S.DataSent += new TCPSocket.DataSentDelegate(S_DataSent); + + // Start it + S.Start(); + + Debug.WriteLine("Connection received: " + client.RemoteEndPoint.ToString()); + + // Fire Connected-event + if (ClientConnected != null) + ClientConnected(id, client.RemoteEndPoint); + + } + + void S_DataSent(int ID, int length) + { + if (DataSent != null) + DataSent(ID, length); + } + + void S_DataReceived(int ID, byte[] data, int offset, int length) + { + if (DataReceived != null) + DataReceived(ID, data, offset, length); + } + + void S_Close(int ID) + { + if (Close != null) + Close(ID); + Clients.Remove(ID); + } + + public void Send(int clientID, byte[] data, int offset, int len) + { + Clients[clientID].Send(clientID, data, offset, len); + } + + + + } +} \ No newline at end of file diff --git a/OpenSim/Region/ScriptEngine/Common/TRPC/TCPSocket.cs b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPSocket.cs new file mode 100644 index 0000000..1079846 --- /dev/null +++ b/OpenSim/Region/ScriptEngine/Common/TRPC/TCPSocket.cs @@ -0,0 +1,86 @@ +using System; +using System.Net.Sockets; + +namespace OpenSim.Region.ScriptEngine.Common.TRPC +{ + public class TCPSocket + { + + public readonly Socket Client; + public readonly int ID; + + public delegate void DataReceivedDelegate(int ID, byte[] data, int offset, int length); + public delegate void DataSentDelegate(int ID, int length); + public delegate void CloseDelegate(int ID); + public event DataReceivedDelegate DataReceived; + public event DataSentDelegate DataSent; + public event CloseDelegate Close; + + private byte[] RecvQueue = new byte[4096]; + private int RecvQueueSize = 4096; + + public TCPSocket(int id, Socket client) + { + ID = id; + Client = client; + } + public void Start() + { + // Start listening + BeginReceive(); + } + + private void BeginReceive() + { + Client.BeginReceive(RecvQueue, 0, RecvQueueSize, SocketFlags.None, new AsyncCallback(asyncDataReceived), Client); + } + + /// + /// Callback for successful receive (or connection close) + /// + /// + private void asyncDataReceived(IAsyncResult ar) + { + Socket client = (Socket)ar.AsyncState; + int recv = client.EndReceive(ar); + + // Is connection closed? + if (recv == 0) + { + client.Close(); + Close(ID); + return; + } + + // Call receive event + DataReceived(ID, RecvQueue, 0, recv); + + // Start new receive + BeginReceive(); + + } + + + public void Send(int clientID, byte[] data, int offset, int len) + { + Client.BeginSend(data, offset, len, SocketFlags.None, new AsyncCallback(asyncDataSent), Client); + } + + /// + /// Callback for successful send + /// + /// + void asyncDataSent(IAsyncResult ar) + { + Socket client = (Socket)ar.AsyncState; + int sent = client.EndSend(ar); + DataSent(ID, sent); + } + + public void Disconnect() + { + Client.Close(); + Close(ID); + } + } +} \ No newline at end of file diff --git a/OpenSim/Region/ScriptEngine/Common/TRPC_Remote.cs b/OpenSim/Region/ScriptEngine/Common/TRPC_Remote.cs new file mode 100644 index 0000000..f8ec7b5 --- /dev/null +++ b/OpenSim/Region/ScriptEngine/Common/TRPC_Remote.cs @@ -0,0 +1,144 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using OpenSim.Region.ScriptEngine.Common.TRPC; + +namespace OpenSim.Region.ScriptEngine.Common +{ + public class TRPC_Remote + { + public readonly int MaxQueueSize = 1024 * 10; + public readonly TCPCommon.ServerAndClientInterface TCPS; + + public delegate void ReceiveCommandDelegate(int ID, string Command, params object[] p); + public event ReceiveCommandDelegate ReceiveCommand; + + // TODO: Maybe we should move queue into TCPSocket so we won't have to keep one queue instance per connection + private System.Collections.Generic.Dictionary InQueue = new Dictionary(); + private class InQueueStruct + { + public byte[] Queue; + public int QueueSize; + public object QueueLockObject = new object(); + } + + public TRPC_Remote(TCPCommon.ServerAndClientInterface TCPClientOrServer) + { + TCPS = TCPClientOrServer; + TCPS.Close += new TCPCommon.CloseDelegate(TCPS_Close); + TCPS.ClientConnected += new TCPCommon.ClientConnectedDelegate(TCPS_ClientConnected); + TCPS.DataReceived += new TCPCommon.DataReceivedDelegate(TCPS_DataReceived); + //TCPS.StartListen(); + } + + void TCPS_ClientConnected(int ID, System.Net.EndPoint Remote) + { + // Create a incoming queue for this connection + InQueueStruct iq = new InQueueStruct(); + iq.Queue = new byte[MaxQueueSize]; + iq.QueueSize = 0; + InQueue.Add(ID, iq); + } + + void TCPS_Close(int ID) + { + // Remove queue + InQueue.Remove(ID); + } + + void TCPS_DataReceived(int ID, byte[] data, int offset, int length) + { + // Copy new data to incoming queue + lock (InQueue[ID].QueueLockObject) + { + Array.Copy(data, offset, InQueue[ID].Queue, InQueue[ID].QueueSize, length); + InQueue[ID].QueueSize += length; + + // Process incoming queue + ProcessQueue(ID); + } + } + + private void ProcessQueue(int ID) + { + + // This is just a temp implementation -- not so fast :) + + InQueueStruct myIQS = InQueue[ID]; + if (myIQS.QueueSize == 0) + return; + + string receivedData = Encoding.ASCII.GetString(myIQS.Queue, 0, myIQS.QueueSize); + Debug.WriteLine("RAW: " + receivedData); + + + byte newLine = 10; + while (true) + { + bool ShouldProcess = false; + int lineEndPos = 0; + + // Look for newline + for (int i = 0; i < myIQS.QueueSize; i++) + { + if (myIQS.Queue[i] == newLine) + { + ShouldProcess = true; + lineEndPos = i; + break; + } + } + + // Process it? + if (!ShouldProcess) + return; + // Yes + string cmdLine = Encoding.ASCII.GetString(myIQS.Queue, 0, lineEndPos); + Debug.WriteLine("Command: " + cmdLine); + + // Fix remaining queue in an inefficient way + byte[] newQueue = new byte[MaxQueueSize]; + Array.Copy(myIQS.Queue, lineEndPos, newQueue, 0, myIQS.QueueSize - lineEndPos); + myIQS.Queue = newQueue; + myIQS.QueueSize -= (lineEndPos + 1); + + // Now back to the command + string[] parts = cmdLine.Split(','); + if (parts.Length > 0) + { + string cmd = parts[0]; + int paramCount = parts.Length - 1; + string[] param = null; + + if (paramCount > 0) + { + // Process all parameters (decoding them from URL encoding) + param = new string[paramCount]; + for (int i = 1; i < parts.Length; i++) + { + param[i - 1] = System.Web.HttpUtility.UrlDecode(parts[i]); + } + } + + ReceiveCommand(ID, cmd, param); + } + } + } + + public void SendCommand(int ID, string Command, params object[] p) + { + // Call PacketFactory to have it create a packet for us + + //string[] tmpP = new string[p.Length]; + string tmpStr = Command; + for (int i = 0; i < p.Length; i++) + { + tmpStr += "," + System.Web.HttpUtility.UrlEncode(p[i].ToString()); // .Replace(",", "%44") + } + tmpStr += "\n"; + byte[] byteData = Encoding.ASCII.GetBytes(tmpStr); + TCPS.Send(ID, byteData, 0, byteData.Length); + } + } +} \ No newline at end of file -- cgit v1.1