From e6d57a1e49ce6f284dc03f7b8a57d08ade7b7ce4 Mon Sep 17 00:00:00 2001 From: UbitUmarov Date: Sun, 10 Jul 2016 15:25:05 +0100 Subject: change Xfermanager file sending --- .../Region/CoreModules/Agent/Xfer/XferModule.cs | 290 ++++++++++++++------- 1 file changed, 192 insertions(+), 98 deletions(-) (limited to 'OpenSim/Region') diff --git a/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs b/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs index 468dccf..437cbcd 100644 --- a/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs +++ b/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs @@ -49,12 +49,11 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer private Dictionary Transfers = new Dictionary(); private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - private object timeTickLock = new object(); - private double lastTimeTick = 0.0; - private bool inTimeTick = false; - private double lastFilesExpire = 0.0; - + private object timeTickLock = new object(); + private double lastTimeTick = 0.0; + private double lastFilesExpire = 0.0; + private bool inTimeTick = false; + public struct XferRequest { public IClientAPI remoteClient; @@ -81,15 +80,15 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer public void AddRegion(Scene scene) { m_scene = scene; + m_scene.RegisterModuleInterface(this); m_scene.EventManager.OnNewClient += NewClient; m_scene.EventManager.OnRegionHeartbeatEnd += OnTimeTick; - m_scene.RegisterModuleInterface(this); } public void RemoveRegion(Scene scene) { m_scene.EventManager.OnNewClient -= NewClient; - m_scene.EventManager.OnRegionHeartbeatEnd -= OnTimeTick; + m_scene.EventManager.OnRegionHeartbeatEnd -= OnTimeTick; m_scene.UnregisterModuleInterface(this); m_scene = null; @@ -123,7 +122,7 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer if(!inTimeTick) { double now = Util.GetTimeStampMS(); - if(now - lastTimeTick > 1500.0) // 1.5 second + if(now - lastTimeTick > 1750.0) { inTimeTick = true; @@ -197,7 +196,7 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer } } } - + public void NewClient(IClientAPI client) { client.OnRequestXfer += RequestXfer; @@ -205,9 +204,49 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer client.OnAbortXfer += AbortXfer; } + public void OnClientClosed(IClientAPI client) + { + client.OnRequestXfer -= RequestXfer; + client.OnConfirmXfer -= AckPacket; + client.OnAbortXfer -= AbortXfer; + } + + private void RemoveOrDecrementFile(string fileName) + { + // NewFiles must be locked + + if (NewFiles.ContainsKey(fileName)) + { + if (NewFiles[fileName].refsCount == 1) + NewFiles.Remove(fileName); + else + NewFiles[fileName].refsCount--; + } + } + public void transfersTimeTick(double now) { - + XferDownLoad[] xfrs; + lock(Transfers) + { + if(Transfers.Count == 0) + return; + + xfrs = new XferDownLoad[Transfers.Count]; + Transfers.Values.CopyTo(xfrs,0); + } + foreach(XferDownLoad xfr in xfrs) + { + if(xfr.checkTime(now)) + { + ulong xfrID = xfr.XferID; + lock(Transfers) + { + if(Transfers.ContainsKey(xfrID)) + Transfers.Remove(xfrID); + } + } + } } /// @@ -222,80 +261,49 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer { if (NewFiles.ContainsKey(fileName)) { - if (!Transfers.ContainsKey(xferID)) + lock(Transfers) { - byte[] fileData = NewFiles[fileName].Data; - XferDownLoad transaction = new XferDownLoad(fileName, fileData, xferID, remoteClient); - if (fileName.StartsWith("inventory_")) - transaction.isTaskInventory = true; - - Transfers.Add(xferID, transaction); + if (!Transfers.ContainsKey(xferID)) + { + byte[] fileData = NewFiles[fileName].Data; + XferDownLoad transaction = + new XferDownLoad(fileName, fileData, xferID, remoteClient); - if (transaction.StartSend()) - RemoveXferData(xferID); + Transfers.Add(xferID, transaction); - // The transaction for this file is either complete or on its way - RemoveOrDecrement(fileName); + transaction.StartSend(); + // The transaction for this file is on its way + RemoveOrDecrementFile(fileName); + } } } else - m_log.WarnFormat("[Xfer]: {0} not found", fileName); - + m_log.WarnFormat("[Xfer]: {0} not found", fileName); } } public void AckPacket(IClientAPI remoteClient, ulong xferID, uint packet) { - lock (NewFiles) // This is actually to lock Transfers + lock (Transfers) { if (Transfers.ContainsKey(xferID)) { - XferDownLoad dl = Transfers[xferID]; if (Transfers[xferID].AckPacket(packet)) - { - RemoveXferData(xferID); - RemoveOrDecrement(dl.FileName); - } + Transfers.Remove(xferID); } } } - private void RemoveXferData(ulong xferID) - { - // NewFiles must be locked! - if (Transfers.ContainsKey(xferID)) - { - XferModule.XferDownLoad xferItem = Transfers[xferID]; - //string filename = xferItem.FileName; - Transfers.Remove(xferID); - xferItem.Data = new byte[0]; // Clear the data - xferItem.DataPointer = 0; - - } - } - public void AbortXfer(IClientAPI remoteClient, ulong xferID) { - lock (NewFiles) + lock (Transfers) { if (Transfers.ContainsKey(xferID)) - RemoveOrDecrement(Transfers[xferID].FileName); - - RemoveXferData(xferID); - } - } - - private void RemoveOrDecrement(string fileName) - { - // NewFiles must be locked - - if (NewFiles.ContainsKey(fileName)) - { - if (NewFiles[fileName].refsCount == 1) - NewFiles.Remove(fileName); - else - NewFiles[fileName].refsCount--; + { + Transfers[xferID].done(); + Transfers.Remove(xferID); + } } } @@ -304,14 +312,19 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer public class XferDownLoad { public IClientAPI Client; - private bool complete; public byte[] Data = new byte[0]; - public int DataPointer = 0; public string FileName = String.Empty; - public uint Packet = 0; - public uint Serial = 1; public ulong XferID = 0; - public bool isTaskInventory = false; + public bool isDeleted = false; + + private object myLock = new object(); + private double lastsendTimeMS; + private int LastPacket; + private int lastBytes; + private int lastSentPacket; + private int lastAckPacket; + private int burstSize; + private int retries = 0; public XferDownLoad(string fileName, byte[] data, ulong xferID, IClientAPI client) { @@ -325,32 +338,97 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer { } + public void done() + { + if(!isDeleted) + { + Data = new byte[0]; + isDeleted = true; + } + } + /// /// Start a transfer /// /// True if the transfer is complete, false if not - public bool StartSend() + public void StartSend() { - if (Data.Length < 1000) + lock(myLock) { - // for now (testing) we only support files under 1000 bytes - byte[] transferData = new byte[Data.Length + 4]; - Array.Copy(Utils.IntToBytes(Data.Length), 0, transferData, 0, 4); - Array.Copy(Data, 0, transferData, 4, Data.Length); - Client.SendXferPacket(XferID, 0 + 0x80000000, transferData, isTaskInventory); - complete = true; + if(Data.Length == 0) //?? + { + LastPacket = 0; + lastBytes = 0; + burstSize = 0; + } + else + { + // payload of 1024bytes + LastPacket = Data.Length >> 10; + lastBytes = Data.Length & 0x3ff; + if(lastBytes == 0) + { + lastBytes = 1024; + LastPacket--; + } + burstSize = Client.GetAgentThrottleSilent((int)ThrottleOutPacketType.Asset) >> 11; + } + + lastAckPacket = -1; + lastSentPacket = -1; + + double now = Util.GetTimeStampMS(); + + SendBurst(now); + return; + } + } + + private void SendBurst(double now) + { + int start = lastAckPacket + 1; + int end = start + burstSize; + if (end > LastPacket) + end = LastPacket; + while(start <= end) + SendPacket(start++ , now); + } + + private void SendPacket(int pkt, double now) + { + if(pkt > LastPacket) + return; + + int pktsize; + uint pktid; + if (pkt == LastPacket) + { + pktsize = lastBytes; + pktid = (uint)pkt | 0x80000000u; } else { - byte[] transferData = new byte[1000 + 4]; + pktsize = 1024; + pktid = (uint)pkt; + } + + byte[] transferData; + if(pkt == 0) + { + transferData = new byte[pktsize + 4]; Array.Copy(Utils.IntToBytes(Data.Length), 0, transferData, 0, 4); - Array.Copy(Data, 0, transferData, 4, 1000); - Client.SendXferPacket(XferID, 0, transferData, isTaskInventory); - Packet++; - DataPointer = 1000; + Array.Copy(Data, 0, transferData, 4, pktsize); + } + else + { + transferData = new byte[pktsize]; + Array.Copy(Data, pkt << 10, transferData, 0, pktsize); } - return complete; + Client.SendXferPacket(XferID, pktid, transferData, false); + + lastSentPacket = pkt; + lastsendTimeMS = now; } /// @@ -360,30 +438,46 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer /// True if the transfer is complete, false otherwise public bool AckPacket(uint packet) { - if (!complete) + lock(myLock) { - if ((Data.Length - DataPointer) > 1000) + if(isDeleted) + return true; + + packet &= 0x7fffffff; + if(lastAckPacket < packet) + lastAckPacket = (int)packet; + + if(lastAckPacket == LastPacket) { - byte[] transferData = new byte[1000]; - Array.Copy(Data, DataPointer, transferData, 0, 1000); - Client.SendXferPacket(XferID, Packet, transferData, isTaskInventory); - Packet++; - DataPointer += 1000; + done(); + return true; } - else + double now = Util.GetTimeStampMS(); + SendPacket(lastSentPacket + 1, now); + return false; + } + } + + public bool checkTime(double now) + { + if(Monitor.TryEnter(myLock)) + { + if(!isDeleted) { - byte[] transferData = new byte[Data.Length - DataPointer]; - Array.Copy(Data, DataPointer, transferData, 0, Data.Length - DataPointer); - uint endPacket = Packet |= (uint) 0x80000000; - Client.SendXferPacket(XferID, endPacket, transferData, isTaskInventory); - Packet++; - DataPointer += (Data.Length - DataPointer); - - complete = true; + double timeMS = now - lastsendTimeMS; + if(timeMS > 60000.0) + done(); + else if(timeMS > 3500.0 && retries++ < 3) + { + burstSize >>= 1; + SendBurst(now); + } } - } - return complete; + Monitor.Exit(myLock); + return isDeleted; + } + return false; } } -- cgit v1.1