aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs196
1 files changed, 165 insertions, 31 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index e4c3eaf..2882906 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -30,6 +30,7 @@ using System;
30using System.Collections.Generic; 30using System.Collections.Generic;
31using System.IO; 31using System.IO;
32using System.Reflection; 32using System.Reflection;
33using System.Timers;
33using Nini.Config; 34using Nini.Config;
34using OpenSim.Framework; 35using OpenSim.Framework;
35using OpenSim.Framework.Console; 36using OpenSim.Framework.Console;
@@ -47,13 +48,15 @@ namespace OpenSim.Services.Connectors
47 48
48 private string m_ServerURI = String.Empty; 49 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 50 private IImprovedAssetCache m_Cache = null;
50 51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private Timer m_retryTimer;
51 private delegate void AssetRetrievedEx(AssetBase asset); 54 private delegate void AssetRetrievedEx(AssetBase asset);
52 55
53 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 56 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
54 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 57 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
55 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 58 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
56 59 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
57 60
58 public AssetServicesConnector() 61 public AssetServicesConnector()
59 { 62 {
@@ -81,13 +84,91 @@ namespace OpenSim.Services.Connectors
81 string serviceURI = assetConfig.GetString("AssetServerURI", 84 string serviceURI = assetConfig.GetString("AssetServerURI",
82 String.Empty); 85 String.Empty);
83 86
87 m_ServerURI = serviceURI;
88
84 if (serviceURI == String.Empty) 89 if (serviceURI == String.Empty)
85 { 90 {
86 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 91 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
87 throw new Exception("Asset connector init error"); 92 throw new Exception("Asset connector init error");
88 } 93 }
89 94
90 m_ServerURI = serviceURI; 95
96 m_retryTimer = new Timer();
97 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
98 m_retryTimer.Interval = 60000;
99
100 Uri serverUri = new Uri(m_ServerURI);
101
102 string groupHost = serverUri.Host;
103
104 for (int i = 0 ; i < 256 ; i++)
105 {
106 string prefix = i.ToString("x2");
107 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
108
109 m_UriMap[prefix] = groupHost;
110 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
111 }
112 }
113
114 private string MapServer(string id)
115 {
116 UriBuilder serverUri = new UriBuilder(m_ServerURI);
117
118 string prefix = id.Substring(0, 2).ToLower();
119
120 string host = m_UriMap[prefix];
121
122 serverUri.Host = host;
123
124 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
125
126 return serverUri.Uri.AbsoluteUri;
127 }
128
129 protected void retryCheck(object source, ElapsedEventArgs e)
130 {
131 m_retryCounter++;
132 if (m_retryCounter > 60) m_retryCounter -= 60;
133 List<int> keys = new List<int>();
134 foreach (int a in m_retryQueue.Keys)
135 {
136 keys.Add(a);
137 }
138 foreach (int a in keys)
139 {
140 //We exponentially fall back on frequency until we reach one attempt per hour
141 //The net result is that we end up in the queue for roughly 24 hours..
142 //24 hours worth of assets could be a lot, so the hope is that the region admin
143 //will have gotten the asset connector back online quickly!
144
145 int timefactor = a ^ 2;
146 if (timefactor > 60)
147 {
148 timefactor = 60;
149 }
150
151 //First, find out if we care about this timefactor
152 if (timefactor % a == 0)
153 {
154 //Yes, we do!
155 List<AssetBase> retrylist = m_retryQueue[a];
156 m_retryQueue.Remove(a);
157
158 foreach(AssetBase ass in retrylist)
159 {
160 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
161 }
162 }
163 }
164
165 if (m_retryQueue.Count == 0)
166 {
167 //It might only be one tick per minute, but I have
168 //repented and abandoned my wasteful ways
169 m_retryCounter = 0;
170 m_retryTimer.Stop();
171 }
91 } 172 }
92 173
93 protected void SetCache(IImprovedAssetCache cache) 174 protected void SetCache(IImprovedAssetCache cache)
@@ -97,15 +178,13 @@ namespace OpenSim.Services.Connectors
97 178
98 public AssetBase Get(string id) 179 public AssetBase Get(string id)
99 { 180 {
100// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 181 string uri = MapServer(id) + "/assets/" + id;
101
102 string uri = m_ServerURI + "/assets/" + id;
103 182
104 AssetBase asset = null; 183 AssetBase asset = null;
105 if (m_Cache != null) 184 if (m_Cache != null)
106 asset = m_Cache.Get(id); 185 asset = m_Cache.Get(id);
107 186
108 if (asset == null) 187 if (asset == null || asset.Data == null || asset.Data.Length == 0)
109 { 188 {
110 asset = SynchronousRestObjectRequester. 189 asset = SynchronousRestObjectRequester.
111 MakeRequest<int, AssetBase>("GET", uri, 0); 190 MakeRequest<int, AssetBase>("GET", uri, 0);
@@ -136,7 +215,7 @@ namespace OpenSim.Services.Connectors
136 return fullAsset.Metadata; 215 return fullAsset.Metadata;
137 } 216 }
138 217
139 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 218 string uri = MapServer(id) + "/assets/" + id + "/metadata";
140 219
141 AssetMetadata asset = SynchronousRestObjectRequester. 220 AssetMetadata asset = SynchronousRestObjectRequester.
142 MakeRequest<int, AssetMetadata>("GET", uri, 0); 221 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -153,7 +232,7 @@ namespace OpenSim.Services.Connectors
153 return fullAsset.Data; 232 return fullAsset.Data;
154 } 233 }
155 234
156 RestClient rc = new RestClient(m_ServerURI); 235 RestClient rc = new RestClient(MapServer(id));
157 rc.AddResourcePath("assets"); 236 rc.AddResourcePath("assets");
158 rc.AddResourcePath(id); 237 rc.AddResourcePath(id);
159 rc.AddResourcePath("data"); 238 rc.AddResourcePath("data");
@@ -178,15 +257,13 @@ namespace OpenSim.Services.Connectors
178 257
179 public bool Get(string id, Object sender, AssetRetrieved handler) 258 public bool Get(string id, Object sender, AssetRetrieved handler)
180 { 259 {
181// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 260 string uri = MapServer(id) + "/assets/" + id;
182
183 string uri = m_ServerURI + "/assets/" + id;
184 261
185 AssetBase asset = null; 262 AssetBase asset = null;
186 if (m_Cache != null) 263 if (m_Cache != null)
187 asset = m_Cache.Get(id); 264 asset = m_Cache.Get(id);
188 265
189 if (asset == null) 266 if (asset == null || asset.Data == null || asset.Data.Length == 0)
190 { 267 {
191 lock (m_AssetHandlers) 268 lock (m_AssetHandlers)
192 { 269 {
@@ -246,38 +323,95 @@ namespace OpenSim.Services.Connectors
246 323
247 public string Store(AssetBase asset) 324 public string Store(AssetBase asset)
248 { 325 {
249 if (asset.Temporary || asset.Local) 326 // Have to assign the asset ID here. This isn't likely to
327 // trigger since current callers don't pass emtpy IDs
328 // We need the asset ID to route the request to the proper
329 // cluster member, so we can't have the server assign one.
330 if (asset.ID == string.Empty)
250 { 331 {
251 if (m_Cache != null) 332 if (asset.FullID == UUID.Zero)
252 m_Cache.Cache(asset); 333 {
334 asset.FullID = UUID.Random();
335 }
336 asset.ID = asset.FullID.ToString();
337 }
338 else if (asset.FullID == UUID.Zero)
339 {
340 UUID uuid = UUID.Zero;
341 if (UUID.TryParse(asset.ID, out uuid))
342 {
343 asset.FullID = uuid;
344 }
345 else
346 {
347 asset.FullID = UUID.Random();
348 }
349 }
253 350
351 if (m_Cache != null)
352 m_Cache.Cache(asset);
353 if (asset.Temporary || asset.Local)
354 {
254 return asset.ID; 355 return asset.ID;
255 } 356 }
256 357
257 string uri = m_ServerURI + "/assets/"; 358 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
258 359
259 string newID = string.Empty; 360 string newID = string.Empty;
260 try 361 try
261 { 362 {
262 newID = SynchronousRestObjectRequester. 363 newID = SynchronousRestObjectRequester.
263 MakeRequest<AssetBase, string>("POST", uri, asset); 364 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
365 if (newID == null || newID == "")
366 {
367 newID = UUID.Zero.ToString();
368 }
264 } 369 }
265 catch (Exception e) 370 catch (Exception e)
266 { 371 {
267 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 372 newID = UUID.Zero.ToString();
268 } 373 }
269 374
270 if (newID != String.Empty) 375 if (newID == UUID.Zero.ToString())
271 { 376 {
272 // Placing this here, so that this work with old asset servers that don't send any reply back 377 //The asset upload failed, put it in a queue for later
273 // SynchronousRestObjectRequester returns somethins that is not an empty string 378 asset.UploadAttempts++;
274 if (newID != null) 379 if (asset.UploadAttempts > 30)
275 asset.ID = newID; 380 {
381 //By this stage we've been in the queue for a good few hours;
382 //We're going to drop the asset.
383 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
384 }
385 else
386 {
387 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
388 {
389 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
390 }
391 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
392 m_queue.Add(asset);
393 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
394 m_retryTimer.Start();
395 }
396 }
397 else
398 {
399 if (asset.UploadAttempts > 0)
400 {
401 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
402 }
403 if (newID != String.Empty)
404 {
405 // Placing this here, so that this work with old asset servers that don't send any reply back
406 // SynchronousRestObjectRequester returns somethins that is not an empty string
407 if (newID != null)
408 asset.ID = newID;
276 409
277 if (m_Cache != null) 410 if (m_Cache != null)
278 m_Cache.Cache(asset); 411 m_Cache.Cache(asset);
412 }
279 } 413 }
280 return newID; 414 return asset.ID;
281 } 415 }
282 416
283 public bool UpdateContent(string id, byte[] data) 417 public bool UpdateContent(string id, byte[] data)
@@ -298,7 +432,7 @@ namespace OpenSim.Services.Connectors
298 } 432 }
299 asset.Data = data; 433 asset.Data = data;
300 434
301 string uri = m_ServerURI + "/assets/" + id; 435 string uri = MapServer(id) + "/assets/" + id;
302 436
303 if (SynchronousRestObjectRequester. 437 if (SynchronousRestObjectRequester.
304 MakeRequest<AssetBase, bool>("POST", uri, asset)) 438 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -313,7 +447,7 @@ namespace OpenSim.Services.Connectors
313 447
314 public bool Delete(string id) 448 public bool Delete(string id)
315 { 449 {
316 string uri = m_ServerURI + "/assets/" + id; 450 string uri = MapServer(id) + "/assets/" + id;
317 451
318 if (SynchronousRestObjectRequester. 452 if (SynchronousRestObjectRequester.
319 MakeRequest<int, bool>("DELETE", uri, 0)) 453 MakeRequest<int, bool>("DELETE", uri, 0))
@@ -326,4 +460,4 @@ namespace OpenSim.Services.Connectors
326 return false; 460 return false;
327 } 461 }
328 } 462 }
329} \ No newline at end of file 463}