From 2aa7a22129e2b5c226d3937c31bdcbdbc65a20f8 Mon Sep 17 00:00:00 2001 From: Melanie Date: Fri, 14 Sep 2012 23:09:07 +0200 Subject: Sequence/throttle asset retrievals. --- .../Connectors/Asset/AssetServicesConnector.cs | 104 +++++++++++++-------- 1 file changed, 67 insertions(+), 37 deletions(-) (limited to 'OpenSim/Services/Connectors') diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs index daf38bc..9d6d9ad 100644 --- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs @@ -27,6 +27,7 @@ using log4net; using System; +using System.Threading; using System.Collections.Generic; using System.IO; using System.Reflection; @@ -50,7 +51,7 @@ namespace OpenSim.Services.Connectors private IImprovedAssetCache m_Cache = null; private int m_retryCounter; private Dictionary> m_retryQueue = new Dictionary>(); - private Timer m_retryTimer; + private System.Timers.Timer m_retryTimer; private delegate void AssetRetrievedEx(AssetBase asset); // Keeps track of concurrent requests for the same asset, so that it's only loaded once. @@ -61,6 +62,8 @@ namespace OpenSim.Services.Connectors private Dictionary m_UriMap = new Dictionary(); + private Thread[] m_fetchThreads; + public AssetServicesConnector() { } @@ -96,7 +99,7 @@ namespace OpenSim.Services.Connectors } - m_retryTimer = new Timer(); + m_retryTimer = new System.Timers.Timer(); m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); m_retryTimer.Interval = 60000; @@ -112,6 +115,14 @@ namespace OpenSim.Services.Connectors m_UriMap[prefix] = groupHost; //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); } + + m_fetchThreads = new Thread[2]; + + for (int i = 0 ; i < 2 ; i++) + { + m_fetchThreads[i] = new Thread(AssetRequestProcessor); + m_fetchThreads[i].Start(); + } } private string MapServer(string id) @@ -261,37 +272,25 @@ namespace OpenSim.Services.Connectors return null; } - public bool Get(string id, Object sender, AssetRetrieved handler) + private class QueuedAssetRequest { - string uri = MapServer(id) + "/assets/" + id; - - AssetBase asset = null; - if (m_Cache != null) - asset = m_Cache.Get(id); + public string uri; + public string id; + } - if (asset == null || asset.Data == null || asset.Data.Length == 0) - { - lock (m_AssetHandlers) - { - AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); + private OpenMetaverse.BlockingQueue m_requestQueue = + new OpenMetaverse.BlockingQueue(); -// AssetRetrievedEx handlers; - List handlers; - if (m_AssetHandlers.TryGetValue(id, out handlers)) - { - // Someone else is already loading this asset. It will notify our handler when done. -// handlers += handlerEx; - handlers.Add(handlerEx); - return true; - } + private void AssetRequestProcessor() + { + QueuedAssetRequest r; - // Load the asset ourselves -// handlers += handlerEx; - handlers = new List(); - handlers.Add(handlerEx); + while (true) + { + r = m_requestQueue.Dequeue(); - m_AssetHandlers.Add(id, handlers); - } + string uri = r.uri; + string id = r.id; bool success = false; try @@ -301,16 +300,7 @@ namespace OpenSim.Services.Connectors { if (m_Cache != null) m_Cache.Cache(a); -/* - AssetRetrievedEx handlers; - lock (m_AssetHandlers) - { - handlers = m_AssetHandlers[id]; - m_AssetHandlers.Remove(id); - } - handlers.Invoke(a); -*/ List handlers; lock (m_AssetHandlers) { @@ -340,6 +330,46 @@ namespace OpenSim.Services.Connectors } } } + } + + public bool Get(string id, Object sender, AssetRetrieved handler) + { + string uri = MapServer(id) + "/assets/" + id; + + AssetBase asset = null; + if (m_Cache != null) + asset = m_Cache.Get(id); + + if (asset == null || asset.Data == null || asset.Data.Length == 0) + { + lock (m_AssetHandlers) + { + AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); + +// AssetRetrievedEx handlers; + List handlers; + if (m_AssetHandlers.TryGetValue(id, out handlers)) + { + // Someone else is already loading this asset. It will notify our handler when done. +// handlers += handlerEx; + handlers.Add(handlerEx); + return true; + } + + // Load the asset ourselves +// handlers += handlerEx; + handlers = new List(); + handlers.Add(handlerEx); + + m_AssetHandlers.Add(id, handlers); + } + + QueuedAssetRequest request = new QueuedAssetRequest(); + request.id = id; + request.uri = uri; + + m_requestQueue.Enqueue(request); + } else { handler(id, sender, asset); -- cgit v1.1