From 5e4d6cab00cb29cd088ab7b62ab13aff103b64cb Mon Sep 17 00:00:00 2001 From: onefang Date: Sun, 19 May 2019 21:24:15 +1000 Subject: Dump OpenSim 0.9.0.1 into it's own branch. --- .../Connectors/Asset/AssetServicesConnector.cs | 458 +++++++++++++++++---- 1 file changed, 369 insertions(+), 89 deletions(-) (limited to 'OpenSim/Services/Connectors/Asset') diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs index bd43552..7e81be7 100644 --- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs @@ -27,12 +27,14 @@ using log4net; using System; +using System.Threading; using System.Collections.Generic; using System.IO; using System.Reflection; +using System.Timers; using Nini.Config; using OpenSim.Framework; -using OpenSim.Framework.Console; +using OpenSim.Framework.Monitoring; using OpenSim.Services.Interfaces; using OpenMetaverse; @@ -44,15 +46,27 @@ namespace OpenSim.Services.Connectors LogManager.GetLogger( MethodBase.GetCurrentMethod().DeclaringType); + const int MAXSENDRETRIESLEN = 30; + private string m_ServerURI = String.Empty; - private IImprovedAssetCache m_Cache = null; + private IAssetCache m_Cache = null; + private int m_retryCounter; + private bool m_inRetries; + private List[] m_sendRetries = new List[MAXSENDRETRIESLEN]; + private System.Timers.Timer m_retryTimer; private int m_maxAssetRequestConcurrency = 30; - + private delegate void AssetRetrievedEx(AssetBase asset); // Keeps track of concurrent requests for the same asset, so that it's only loaded once. // Maps: Asset ID -> Handlers which will be called when the asset has been loaded - private Dictionary m_AssetHandlers = new Dictionary(); +// private Dictionary m_AssetHandlers = new Dictionary(); + + private Dictionary> m_AssetHandlers = new Dictionary>(); + + private Dictionary m_UriMap = new Dictionary(); + + private Thread[] m_fetchThreads; public int MaxAssetRequestConcurrency { @@ -91,31 +105,155 @@ namespace OpenSim.Services.Connectors string serviceURI = assetConfig.GetString("AssetServerURI", String.Empty); + m_ServerURI = serviceURI; + if (serviceURI == String.Empty) { m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); throw new Exception("Asset connector init error"); } - m_ServerURI = serviceURI; + m_retryTimer = new System.Timers.Timer(); + m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); + m_retryTimer.AutoReset = true; + m_retryTimer.Interval = 60000; + + Uri serverUri = new Uri(m_ServerURI); + + string groupHost = serverUri.Host; + + for (int i = 0 ; i < 256 ; i++) + { + string prefix = i.ToString("x2"); + groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost); + + m_UriMap[prefix] = groupHost; + //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); + } + + m_fetchThreads = new Thread[2]; + + for (int i = 0 ; i < 2 ; i++) + { + m_fetchThreads[i] = WorkManager.StartThread(AssetRequestProcessor, + String.Format("GetTextureWorker{0}", i), + ThreadPriority.Normal, + true, + false); + } + } + + private string MapServer(string id) + { + if (m_UriMap.Count == 0) + return m_ServerURI; + + UriBuilder serverUri = new UriBuilder(m_ServerURI); + + string prefix = id.Substring(0, 2).ToLower(); + + string host; + + // HG URLs will not be valid UUIDS + if (m_UriMap.ContainsKey(prefix)) + host = m_UriMap[prefix]; + else + host = m_UriMap["00"]; + + serverUri.Host = host; + + // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix); + + string ret = serverUri.Uri.AbsoluteUri; + if (ret.EndsWith("/")) + ret = ret.Substring(0, ret.Length - 1); + return ret; } - protected void SetCache(IImprovedAssetCache cache) + protected void retryCheck(object source, ElapsedEventArgs e) + { + lock(m_sendRetries) + { + if(m_inRetries) + return; + m_inRetries = true; + } + + m_retryCounter++; + if(m_retryCounter >= 61 ) // avoid overflow 60 is max in use below + m_retryCounter = 1; + + int inUse = 0; + int nextlevel; + int timefactor; + List retrylist; + // we need to go down + for(int i = MAXSENDRETRIESLEN - 1; i >= 0; i--) + { + lock(m_sendRetries) + retrylist = m_sendRetries[i]; + + if(retrylist == null) + continue; + + inUse++; + nextlevel = i + 1; + + //We exponentially fall back on frequency until we reach one attempt per hour + //The net result is that we end up in the queue for roughly 24 hours.. + //24 hours worth of assets could be a lot, so the hope is that the region admin + //will have gotten the asset connector back online quickly! + if(i == 0) + timefactor = 1; + else + { + timefactor = 1 << nextlevel; + if (timefactor > 60) + timefactor = 60; + } + + if(m_retryCounter < timefactor) + continue; // to update inUse; + + if (m_retryCounter % timefactor != 0) + continue; + + // a list to retry + lock(m_sendRetries) + m_sendRetries[i] = null; + + // we are the only ones with a copy of this retrylist now + foreach(AssetBase ass in retrylist) + retryStore(ass, nextlevel); + } + + lock(m_sendRetries) + { + if(inUse == 0 ) + m_retryTimer.Stop(); + + m_inRetries = false; + } + } + + protected void SetCache(IAssetCache cache) { m_Cache = cache; } public AssetBase Get(string id) { -// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); - - string uri = m_ServerURI + "/assets/" + id; + string uri = MapServer(id) + "/assets/" + id; AssetBase asset = null; + if (m_Cache != null) - asset = m_Cache.Get(id); + { + if (!m_Cache.Get(id, out asset)) + return null; + } - if (asset == null) + if (asset == null || asset.Data == null || asset.Data.Length == 0) { // XXX: Commented out for now since this has either never been properly operational or not for some time // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option. @@ -128,8 +266,14 @@ namespace OpenSim.Services.Connectors asset = SynchronousRestObjectRequester.MakeRequest("GET", uri, 0, m_Auth); + if (m_Cache != null) - m_Cache.Cache(asset); + { + if (asset != null) + m_Cache.Cache(asset); + else + m_Cache.CacheNegative(id); + } } return asset; } @@ -138,23 +282,28 @@ namespace OpenSim.Services.Connectors { // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Cache request for {0}", id); + AssetBase asset = null; if (m_Cache != null) - return m_Cache.Get(id); + { + m_Cache.Get(id, out asset); + } - return null; + return asset; } public AssetMetadata GetMetadata(string id) { if (m_Cache != null) { - AssetBase fullAsset = m_Cache.Get(id); + AssetBase fullAsset; + if (!m_Cache.Get(id, out fullAsset)) + return null; if (fullAsset != null) return fullAsset.Metadata; } - string uri = m_ServerURI + "/assets/" + id + "/metadata"; + string uri = MapServer(id) + "/assets/" + id + "/metadata"; AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest("GET", uri, 0, m_Auth); return asset; @@ -164,13 +313,15 @@ namespace OpenSim.Services.Connectors { if (m_Cache != null) { - AssetBase fullAsset = m_Cache.Get(id); + AssetBase fullAsset; + if (!m_Cache.Get(id, out fullAsset)) + return null; if (fullAsset != null) return fullAsset.Data; } - using (RestClient rc = new RestClient(m_ServerURI)) + using (RestClient rc = new RestClient(MapServer(id))) { rc.AddResourcePath("assets"); rc.AddResourcePath(id); @@ -178,81 +329,110 @@ namespace OpenSim.Services.Connectors rc.RequestMethod = "GET"; - Stream s = rc.Request(m_Auth); + using (Stream s = rc.Request(m_Auth)) + { + if (s == null) + return null; - if (s == null) - return null; + if (s.Length > 0) + { + byte[] ret = new byte[s.Length]; + s.Read(ret, 0, (int)s.Length); + + return ret; + } + } + return null; + } + } - if (s.Length > 0) + private class QueuedAssetRequest + { + public string uri; + public string id; + } + + private OpenSim.Framework.BlockingQueue m_requestQueue = + new OpenSim.Framework.BlockingQueue(); + + private void AssetRequestProcessor() + { + QueuedAssetRequest r; + + while (true) + { + r = m_requestQueue.Dequeue(4500); + Watchdog.UpdateThread(); + if(r== null) + continue; + string uri = r.uri; + string id = r.id; + + try { - byte[] ret = new byte[s.Length]; - s.Read(ret, 0, (int)s.Length); + AssetBase a = SynchronousRestObjectRequester.MakeRequest("GET", uri, 0, 30000, m_Auth); - return ret; - } + if (a != null && m_Cache != null) + m_Cache.Cache(a); - return null; + List handlers; + lock (m_AssetHandlers) + { + handlers = m_AssetHandlers[id]; + m_AssetHandlers.Remove(id); + } + + if(handlers != null) + { + Util.FireAndForget(x => + { + foreach (AssetRetrievedEx h in handlers) + { + try { h.Invoke(a); } + catch { } + } + handlers.Clear(); + }); + } + } + catch { } } } public bool Get(string id, Object sender, AssetRetrieved handler) { -// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); - - string uri = m_ServerURI + "/assets/" + id; + string uri = MapServer(id) + "/assets/" + id; AssetBase asset = null; if (m_Cache != null) - asset = m_Cache.Get(id); + { + if (!m_Cache.Get(id, out asset)) + return false; + } - if (asset == null) + if (asset == null || asset.Data == null || asset.Data.Length == 0) { lock (m_AssetHandlers) { AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); - AssetRetrievedEx handlers; + List handlers; if (m_AssetHandlers.TryGetValue(id, out handlers)) { // Someone else is already loading this asset. It will notify our handler when done. - handlers += handlerEx; + handlers.Add(handlerEx); return true; } - // Load the asset ourselves - handlers += handlerEx; - m_AssetHandlers.Add(id, handlers); - } + handlers = new List(); + handlers.Add(handlerEx); - bool success = false; - try - { - AsynchronousRestObjectRequester.MakeRequest("GET", uri, 0, - delegate(AssetBase a) - { - if (a != null && m_Cache != null) - m_Cache.Cache(a); + m_AssetHandlers.Add(id, handlers); - AssetRetrievedEx handlers; - lock (m_AssetHandlers) - { - handlers = m_AssetHandlers[id]; - m_AssetHandlers.Remove(id); - } - handlers.Invoke(a); - }, m_maxAssetRequestConcurrency, m_Auth); - - success = true; - } - finally - { - if (!success) - { - lock (m_AssetHandlers) - { - m_AssetHandlers.Remove(id); - } - } + QueuedAssetRequest request = new QueuedAssetRequest(); + request.id = id; + request.uri = uri; + m_requestQueue.Enqueue(request); } } else @@ -277,52 +457,151 @@ namespace OpenSim.Services.Connectors // This is most likely to happen because the server doesn't support this function, // so just silently return "doesn't exist" for all the assets. } - + if (exist == null) exist = new bool[ids.Length]; return exist; } + string stringUUIDZero = UUID.Zero.ToString(); + public string Store(AssetBase asset) { - if (asset.Local) + // Have to assign the asset ID here. This isn't likely to + // trigger since current callers don't pass emtpy IDs + // We need the asset ID to route the request to the proper + // cluster member, so we can't have the server assign one. + if (asset.ID == string.Empty || asset.ID == stringUUIDZero) { - if (m_Cache != null) - m_Cache.Cache(asset); + if (asset.FullID == UUID.Zero) + { + asset.FullID = UUID.Random(); + } + m_log.WarnFormat("[Assets] Zero ID: {0}",asset.Name); + asset.ID = asset.FullID.ToString(); + } + + if (asset.FullID == UUID.Zero) + { + UUID uuid = UUID.Zero; + if (UUID.TryParse(asset.ID, out uuid)) + { + asset.FullID = uuid; + } + if(asset.FullID == UUID.Zero) + { + m_log.WarnFormat("[Assets] Zero IDs: {0}",asset.Name); + asset.FullID = UUID.Random(); + asset.ID = asset.FullID.ToString(); + } + } + + if (m_Cache != null) + m_Cache.Cache(asset); + if (asset.Temporary || asset.Local) + { return asset.ID; } - string uri = m_ServerURI + "/assets/"; + string uri = MapServer(asset.FullID.ToString()) + "/assets/"; - string newID; + string newID = null; try { - newID = SynchronousRestObjectRequester.MakeRequest("POST", uri, asset, m_Auth); + newID = SynchronousRestObjectRequester. + MakeRequest("POST", uri, asset, 100000, m_Auth); } - catch (Exception e) + catch { - m_log.Warn(string.Format("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1} ", asset.ID, e.Message), e); - return string.Empty; + newID = null; } - // TEMPORARY: SRAS returns 'null' when it's asked to store existing assets - if (newID == null) + if (newID == null || newID == String.Empty || newID == stringUUIDZero) { - m_log.DebugFormat("[ASSET CONNECTOR]: Storing of asset {0} returned null; assuming the asset already exists", asset.ID); - return asset.ID; + //The asset upload failed, try later + lock(m_sendRetries) + { + if (m_sendRetries[0] == null) + m_sendRetries[0] = new List(); + List m_queue = m_sendRetries[0]; + m_queue.Add(asset); + m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later", + asset.ID.ToString(), asset.Type.ToString()); + m_retryTimer.Start(); + } } + else + { + if (newID != asset.ID) + { + // Placing this here, so that this work with old asset servers that don't send any reply back + // SynchronousRestObjectRequester returns somethins that is not an empty string - if (string.IsNullOrEmpty(newID)) - return string.Empty; + asset.ID = newID; - asset.ID = newID; + if (m_Cache != null) + m_Cache.Cache(asset); + } + } + return asset.ID; + } - if (m_Cache != null) - m_Cache.Cache(asset); + public void retryStore(AssetBase asset, int nextRetryLevel) + { +/* this may be bad, so excluding + if (m_Cache != null && !m_Cache.Check(asset.ID)) + { + m_log.WarnFormat("[Assets] Upload giveup asset bc no longer in local cache: {0}", + asset.ID.ToString(); + return; // if no longer in cache, it was deleted or expired + } +*/ + string uri = MapServer(asset.FullID.ToString()) + "/assets/"; - return newID; + string newID = null; + try + { + newID = SynchronousRestObjectRequester. + MakeRequest("POST", uri, asset, 100000, m_Auth); + } + catch + { + newID = null; + } + + if (newID == null || newID == String.Empty || newID == stringUUIDZero) + { + if(nextRetryLevel >= MAXSENDRETRIESLEN) + m_log.WarnFormat("[Assets] Upload giveup after several retries id: {0} type {1}", + asset.ID.ToString(), asset.Type.ToString()); + else + { + lock(m_sendRetries) + { + if (m_sendRetries[nextRetryLevel] == null) + { + m_sendRetries[nextRetryLevel] = new List(); + } + List m_queue = m_sendRetries[nextRetryLevel]; + m_queue.Add(asset); + m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later", + asset.ID.ToString(), asset.Type.ToString()); + } + } + } + else + { + m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), nextRetryLevel.ToString()); + if (newID != asset.ID) + { + asset.ID = newID; + + if (m_Cache != null) + m_Cache.Cache(asset); + } + } } public bool UpdateContent(string id, byte[] data) @@ -330,7 +609,7 @@ namespace OpenSim.Services.Connectors AssetBase asset = null; if (m_Cache != null) - asset = m_Cache.Get(id); + m_Cache.Get(id, out asset); if (asset == null) { @@ -343,7 +622,7 @@ namespace OpenSim.Services.Connectors } asset.Data = data; - string uri = m_ServerURI + "/assets/" + id; + string uri = MapServer(id) + "/assets/" + id; if (SynchronousRestObjectRequester.MakeRequest("POST", uri, asset, m_Auth)) { @@ -355,9 +634,10 @@ namespace OpenSim.Services.Connectors return false; } + public bool Delete(string id) { - string uri = m_ServerURI + "/assets/" + id; + string uri = MapServer(id) + "/assets/" + id; if (SynchronousRestObjectRequester.MakeRequest("DELETE", uri, 0, m_Auth)) { -- cgit v1.1