From 5e4d6cab00cb29cd088ab7b62ab13aff103b64cb Mon Sep 17 00:00:00 2001 From: onefang Date: Sun, 19 May 2019 21:24:15 +1000 Subject: Dump OpenSim 0.9.0.1 into it's own branch. --- .../Region/CoreModules/Agent/Xfer/XferModule.cs | 365 +++++++++++++++------ 1 file changed, 269 insertions(+), 96 deletions(-) (limited to 'OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs') diff --git a/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs b/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs index 4299726..1b6401a 100644 --- a/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs +++ b/OpenSim/Region/CoreModules/Agent/Xfer/XferModule.cs @@ -28,10 +28,12 @@ using System; using System.Collections.Generic; using System.Reflection; +using System.Threading; using Nini.Config; using log4net; using OpenMetaverse; using OpenSim.Framework; +using OpenSim.Framework.Monitoring; using OpenSim.Region.Framework.Interfaces; using OpenSim.Region.Framework.Scenes; @@ -45,9 +47,13 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer private Scene m_scene; private Dictionary NewFiles = new Dictionary(); private Dictionary Transfers = new Dictionary(); - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private object timeTickLock = new object(); + private double lastTimeTick = 0.0; + private double lastFilesExpire = 0.0; + private bool inTimeTick = false; + public struct XferRequest { public IClientAPI remoteClient; @@ -59,26 +65,30 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer private class FileData { public byte[] Data; - public int Count; + public int refsCount; + public double timeStampMS; } - + #region INonSharedRegionModule Members public void Initialise(IConfigSource config) { + lastTimeTick = Util.GetTimeStampMS() + 30000.0; + lastFilesExpire = lastTimeTick + 180000.0; } public void AddRegion(Scene scene) { m_scene = scene; - m_scene.EventManager.OnNewClient += NewClient; - m_scene.RegisterModuleInterface(this); + m_scene.EventManager.OnNewClient += NewClient; + m_scene.EventManager.OnRegionHeartbeatEnd += OnTimeTick; } public void RemoveRegion(Scene scene) { m_scene.EventManager.OnNewClient -= NewClient; + m_scene.EventManager.OnRegionHeartbeatEnd -= OnTimeTick; m_scene.UnregisterModuleInterface(this); m_scene = null; @@ -104,6 +114,41 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer #endregion + public void OnTimeTick(Scene scene) + { + // we are on a heartbeat thread we there can be several + if(Monitor.TryEnter(timeTickLock)) + { + if(!inTimeTick) + { + double now = Util.GetTimeStampMS(); + if(now - lastTimeTick > 1750.0) + { + + if(Transfers.Count == 0 && NewFiles.Count == 0) + lastTimeTick = now; + else + { + inTimeTick = true; + + //don't overload busy heartbeat + WorkManager.RunInThreadPool( + delegate + { + transfersTimeTick(now); + expireFiles(now); + + lastTimeTick = now; + inTimeTick = false; + }, + null, + "XferTimeTick"); + } + } + } + Monitor.Exit(timeTickLock); + } + } #region IXfer Members /// @@ -118,24 +163,45 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer { lock (NewFiles) { + double now = Util.GetTimeStampMS(); if (NewFiles.ContainsKey(fileName)) { - NewFiles[fileName].Count++; + NewFiles[fileName].refsCount++; NewFiles[fileName].Data = data; + NewFiles[fileName].timeStampMS = now; } else { FileData fd = new FileData(); - fd.Count = 1; + fd.refsCount = 1; fd.Data = data; + fd.timeStampMS = now; NewFiles.Add(fileName, fd); } } - return true; } #endregion + public void expireFiles(double now) + { + lock (NewFiles) + { + // hopefully we will not have many files so nasty code will do it + if(now - lastFilesExpire > 120000.0) + { + lastFilesExpire = now; + List expires = new List(); + foreach(string fname in NewFiles.Keys) + { + if(NewFiles[fname].refsCount == 0 && now - NewFiles[fname].timeStampMS > 120000.0) + expires.Add(fname); + } + foreach(string fname in expires) + NewFiles.Remove(fname); + } + } + } public void NewClient(IClientAPI client) { @@ -144,6 +210,51 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer client.OnAbortXfer += AbortXfer; } + public void OnClientClosed(IClientAPI client) + { + client.OnRequestXfer -= RequestXfer; + client.OnConfirmXfer -= AckPacket; + client.OnAbortXfer -= AbortXfer; + } + + private void RemoveOrDecrementFile(string fileName) + { + // NewFiles must be locked + + if (NewFiles.ContainsKey(fileName)) + { + if (NewFiles[fileName].refsCount == 1) + NewFiles.Remove(fileName); + else + NewFiles[fileName].refsCount--; + } + } + + public void transfersTimeTick(double now) + { + XferDownLoad[] xfrs; + lock(Transfers) + { + if(Transfers.Count == 0) + return; + + xfrs = new XferDownLoad[Transfers.Count]; + Transfers.Values.CopyTo(xfrs,0); + } + foreach(XferDownLoad xfr in xfrs) + { + if(xfr.checkTime(now)) + { + ulong xfrID = xfr.XferID; + lock(Transfers) + { + if(Transfers.ContainsKey(xfrID)) + Transfers.Remove(xfrID); + } + } + } + } + /// /// /// @@ -156,78 +267,52 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer { if (NewFiles.ContainsKey(fileName)) { - if (!Transfers.ContainsKey(xferID)) + lock(Transfers) { - byte[] fileData = NewFiles[fileName].Data; - XferDownLoad transaction = new XferDownLoad(fileName, fileData, xferID, remoteClient); - - Transfers.Add(xferID, transaction); - - if (transaction.StartSend()) - RemoveXferData(xferID); - - // The transaction for this file is either complete or on its way - RemoveOrDecrement(fileName); - + if (!Transfers.ContainsKey(xferID)) + { + byte[] fileData = NewFiles[fileName].Data; + int burstSize = remoteClient.GetAgentThrottleSilent((int)ThrottleOutPacketType.Asset) >> 11; + if(Transfers.Count > 1) + burstSize /= Transfers.Count; + XferDownLoad transaction = + new XferDownLoad(fileName, fileData, xferID, remoteClient, burstSize); + + Transfers.Add(xferID, transaction); + + transaction.StartSend(); + + // The transaction for this file is on its way + RemoveOrDecrementFile(fileName); + } } } else m_log.WarnFormat("[Xfer]: {0} not found", fileName); - } } public void AckPacket(IClientAPI remoteClient, ulong xferID, uint packet) { - lock (NewFiles) // This is actually to lock Transfers + lock (Transfers) { if (Transfers.ContainsKey(xferID)) { - XferDownLoad dl = Transfers[xferID]; if (Transfers[xferID].AckPacket(packet)) - { - RemoveXferData(xferID); - RemoveOrDecrement(dl.FileName); - } + Transfers.Remove(xferID); } } } - private void RemoveXferData(ulong xferID) - { - // NewFiles must be locked! - if (Transfers.ContainsKey(xferID)) - { - XferModule.XferDownLoad xferItem = Transfers[xferID]; - //string filename = xferItem.FileName; - Transfers.Remove(xferID); - xferItem.Data = new byte[0]; // Clear the data - xferItem.DataPointer = 0; - - } - } - public void AbortXfer(IClientAPI remoteClient, ulong xferID) { - lock (NewFiles) + lock (Transfers) { if (Transfers.ContainsKey(xferID)) - RemoveOrDecrement(Transfers[xferID].FileName); - - RemoveXferData(xferID); - } - } - - private void RemoveOrDecrement(string fileName) - { - // NewFiles must be locked - - if (NewFiles.ContainsKey(fileName)) - { - if (NewFiles[fileName].Count == 1) - NewFiles.Remove(fileName); - else - NewFiles[fileName].Count--; + { + Transfers[xferID].done(); + Transfers.Remove(xferID); + } } } @@ -236,52 +321,124 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer public class XferDownLoad { public IClientAPI Client; - private bool complete; public byte[] Data = new byte[0]; - public int DataPointer = 0; public string FileName = String.Empty; - public uint Packet = 0; - public uint Serial = 1; public ulong XferID = 0; - - public XferDownLoad(string fileName, byte[] data, ulong xferID, IClientAPI client) + public bool isDeleted = false; + + private object myLock = new object(); + private double lastsendTimeMS; + private int LastPacket; + private int lastBytes; + private int lastSentPacket; + private int lastAckPacket; + private int burstSize; + private int retries = 0; + + public XferDownLoad(string fileName, byte[] data, ulong xferID, IClientAPI client, int burstsz) { FileName = fileName; Data = data; XferID = xferID; Client = client; + burstSize = burstsz; } public XferDownLoad() { } + public void done() + { + if(!isDeleted) + { + Data = new byte[0]; + isDeleted = true; + } + } + /// /// Start a transfer /// /// True if the transfer is complete, false if not - public bool StartSend() + public void StartSend() { - if (Data.Length < 1000) + lock(myLock) { - // for now (testing) we only support files under 1000 bytes - byte[] transferData = new byte[Data.Length + 4]; - Array.Copy(Utils.IntToBytes(Data.Length), 0, transferData, 0, 4); - Array.Copy(Data, 0, transferData, 4, Data.Length); - Client.SendXferPacket(XferID, 0 + 0x80000000, transferData); - complete = true; + if(Data.Length == 0) //?? + { + LastPacket = 0; + lastBytes = 0; + burstSize = 0; + } + else + { + // payload of 1024bytes + LastPacket = Data.Length >> 10; + lastBytes = Data.Length & 0x3ff; + if(lastBytes == 0) + { + lastBytes = 1024; + LastPacket--; + } + + } + + lastAckPacket = -1; + lastSentPacket = -1; + + double now = Util.GetTimeStampMS(); + + SendBurst(now); + return; + } + } + + private void SendBurst(double now) + { + int start = lastAckPacket + 1; + int end = start + burstSize; + if (end > LastPacket) + end = LastPacket; + while(start <= end) + SendPacket(start++ , now); + } + + private void SendPacket(int pkt, double now) + { + if(pkt > LastPacket) + return; + + int pktsize; + uint pktid; + if (pkt == LastPacket) + { + pktsize = lastBytes; + pktid = (uint)pkt | 0x80000000u; } else { - byte[] transferData = new byte[1000 + 4]; + pktsize = 1024; + pktid = (uint)pkt; + } + + byte[] transferData; + if(pkt == 0) + { + transferData = new byte[pktsize + 4]; Array.Copy(Utils.IntToBytes(Data.Length), 0, transferData, 0, 4); - Array.Copy(Data, 0, transferData, 4, 1000); - Client.SendXferPacket(XferID, 0, transferData); - Packet++; - DataPointer = 1000; + Array.Copy(Data, 0, transferData, 4, pktsize); } + else + { + transferData = new byte[pktsize]; + Array.Copy(Data, pkt << 10, transferData, 0, pktsize); + } + + Client.SendXferPacket(XferID, pktid, transferData, false); - return complete; + lastSentPacket = pkt; + lastsendTimeMS = now; } /// @@ -291,30 +448,46 @@ namespace OpenSim.Region.CoreModules.Agent.Xfer /// True if the transfer is complete, false otherwise public bool AckPacket(uint packet) { - if (!complete) + lock(myLock) { - if ((Data.Length - DataPointer) > 1000) + if(isDeleted) + return true; + + packet &= 0x7fffffff; + if(lastAckPacket < packet) + lastAckPacket = (int)packet; + + if(lastAckPacket == LastPacket) { - byte[] transferData = new byte[1000]; - Array.Copy(Data, DataPointer, transferData, 0, 1000); - Client.SendXferPacket(XferID, Packet, transferData); - Packet++; - DataPointer += 1000; + done(); + return true; } - else + double now = Util.GetTimeStampMS(); + SendPacket(lastSentPacket + 1, now); + return false; + } + } + + public bool checkTime(double now) + { + if(Monitor.TryEnter(myLock)) + { + if(!isDeleted) { - byte[] transferData = new byte[Data.Length - DataPointer]; - Array.Copy(Data, DataPointer, transferData, 0, Data.Length - DataPointer); - uint endPacket = Packet |= (uint) 0x80000000; - Client.SendXferPacket(XferID, endPacket, transferData); - Packet++; - DataPointer += (Data.Length - DataPointer); - - complete = true; + double timeMS = now - lastsendTimeMS; + if(timeMS > 60000.0) + done(); + else if(timeMS > 3500.0 && retries++ < 3) + { + burstSize >>= 1; + SendBurst(now); + } } - } - return complete; + Monitor.Exit(myLock); + return isDeleted; + } + return false; } } -- cgit v1.1