aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs341
1 files changed, 275 insertions, 66 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index bd43552..8f3cb80 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -46,13 +48,22 @@ namespace OpenSim.Services.Connectors
46 48
47 private string m_ServerURI = String.Empty; 49 private string m_ServerURI = String.Empty;
48 private IImprovedAssetCache m_Cache = null; 50 private IImprovedAssetCache m_Cache = null;
51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private System.Timers.Timer m_retryTimer;
49 private int m_maxAssetRequestConcurrency = 30; 54 private int m_maxAssetRequestConcurrency = 30;
50 55
51 private delegate void AssetRetrievedEx(AssetBase asset); 56 private delegate void AssetRetrievedEx(AssetBase asset);
52 57
53 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 58 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
54 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 59 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
55 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 60// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
61
62 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
63
64 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
65
66 private Thread[] m_fetchThreads;
56 67
57 public int MaxAssetRequestConcurrency 68 public int MaxAssetRequestConcurrency
58 { 69 {
@@ -91,13 +102,108 @@ namespace OpenSim.Services.Connectors
91 string serviceURI = assetConfig.GetString("AssetServerURI", 102 string serviceURI = assetConfig.GetString("AssetServerURI",
92 String.Empty); 103 String.Empty);
93 104
105 m_ServerURI = serviceURI;
106
94 if (serviceURI == String.Empty) 107 if (serviceURI == String.Empty)
95 { 108 {
96 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 109 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
97 throw new Exception("Asset connector init error"); 110 throw new Exception("Asset connector init error");
98 } 111 }
99 112
100 m_ServerURI = serviceURI; 113
114 m_retryTimer = new System.Timers.Timer();
115 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
116 m_retryTimer.Interval = 60000;
117
118 Uri serverUri = new Uri(m_ServerURI);
119
120 string groupHost = serverUri.Host;
121
122 for (int i = 0 ; i < 256 ; i++)
123 {
124 string prefix = i.ToString("x2");
125 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
126
127 m_UriMap[prefix] = groupHost;
128 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
129 }
130
131 m_fetchThreads = new Thread[2];
132
133 for (int i = 0 ; i < 2 ; i++)
134 {
135 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
136 m_fetchThreads[i].Start();
137 }
138 }
139
140 private string MapServer(string id)
141 {
142 UriBuilder serverUri = new UriBuilder(m_ServerURI);
143
144 string prefix = id.Substring(0, 2).ToLower();
145
146 string host;
147
148 // HG URLs will not be valid UUIDS
149 if (m_UriMap.ContainsKey(prefix))
150 host = m_UriMap[prefix];
151 else
152 host = m_UriMap["00"];
153
154 serverUri.Host = host;
155
156 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
157
158 string ret = serverUri.Uri.AbsoluteUri;
159 if (ret.EndsWith("/"))
160 ret = ret.Substring(0, ret.Length - 1);
161 return ret;
162 }
163
164 protected void retryCheck(object source, ElapsedEventArgs e)
165 {
166 m_retryCounter++;
167 if (m_retryCounter > 60) m_retryCounter -= 60;
168 List<int> keys = new List<int>();
169 foreach (int a in m_retryQueue.Keys)
170 {
171 keys.Add(a);
172 }
173 foreach (int a in keys)
174 {
175 //We exponentially fall back on frequency until we reach one attempt per hour
176 //The net result is that we end up in the queue for roughly 24 hours..
177 //24 hours worth of assets could be a lot, so the hope is that the region admin
178 //will have gotten the asset connector back online quickly!
179
180 int timefactor = a ^ 2;
181 if (timefactor > 60)
182 {
183 timefactor = 60;
184 }
185
186 //First, find out if we care about this timefactor
187 if (timefactor % a == 0)
188 {
189 //Yes, we do!
190 List<AssetBase> retrylist = m_retryQueue[a];
191 m_retryQueue.Remove(a);
192
193 foreach(AssetBase ass in retrylist)
194 {
195 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
196 }
197 }
198 }
199
200 if (m_retryQueue.Count == 0)
201 {
202 //It might only be one tick per minute, but I have
203 //repented and abandoned my wasteful ways
204 m_retryCounter = 0;
205 m_retryTimer.Stop();
206 }
101 } 207 }
102 208
103 protected void SetCache(IImprovedAssetCache cache) 209 protected void SetCache(IImprovedAssetCache cache)
@@ -107,15 +213,13 @@ namespace OpenSim.Services.Connectors
107 213
108 public AssetBase Get(string id) 214 public AssetBase Get(string id)
109 { 215 {
110// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 216 string uri = MapServer(id) + "/assets/" + id;
111
112 string uri = m_ServerURI + "/assets/" + id;
113 217
114 AssetBase asset = null; 218 AssetBase asset = null;
115 if (m_Cache != null) 219 if (m_Cache != null)
116 asset = m_Cache.Get(id); 220 asset = m_Cache.Get(id);
117 221
118 if (asset == null) 222 if (asset == null || asset.Data == null || asset.Data.Length == 0)
119 { 223 {
120 // XXX: Commented out for now since this has either never been properly operational or not for some time 224 // XXX: Commented out for now since this has either never been properly operational or not for some time
121 // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option. 225 // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option.
@@ -154,7 +258,7 @@ namespace OpenSim.Services.Connectors
154 return fullAsset.Metadata; 258 return fullAsset.Metadata;
155 } 259 }
156 260
157 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 261 string uri = MapServer(id) + "/assets/" + id + "/metadata";
158 262
159 AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth); 263 AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth);
160 return asset; 264 return asset;
@@ -170,7 +274,7 @@ namespace OpenSim.Services.Connectors
170 return fullAsset.Data; 274 return fullAsset.Data;
171 } 275 }
172 276
173 using (RestClient rc = new RestClient(m_ServerURI)) 277 using (RestClient rc = new RestClient(MapServer(id)))
174 { 278 {
175 rc.AddResourcePath("assets"); 279 rc.AddResourcePath("assets");
176 rc.AddResourcePath(id); 280 rc.AddResourcePath(id);
@@ -195,66 +299,119 @@ namespace OpenSim.Services.Connectors
195 } 299 }
196 } 300 }
197 301
198 public bool Get(string id, Object sender, AssetRetrieved handler) 302 private class QueuedAssetRequest
199 { 303 {
200// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 304 public string uri;
305 public string id;
306 }
201 307
202 string uri = m_ServerURI + "/assets/" + id; 308 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
309 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
203 310
204 AssetBase asset = null; 311 private void AssetRequestProcessor()
205 if (m_Cache != null) 312 {
206 asset = m_Cache.Get(id); 313 QueuedAssetRequest r;
207 314
208 if (asset == null) 315 while (true)
209 { 316 {
210 lock (m_AssetHandlers) 317 r = m_requestQueue.Dequeue();
211 {
212 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
213 318
214 AssetRetrievedEx handlers; 319 string uri = r.uri;
215 if (m_AssetHandlers.TryGetValue(id, out handlers)) 320 string id = r.id;
216 {
217 // Someone else is already loading this asset. It will notify our handler when done.
218 handlers += handlerEx;
219 return true;
220 }
221
222 // Load the asset ourselves
223 handlers += handlerEx;
224 m_AssetHandlers.Add(id, handlers);
225 }
226 321
227 bool success = false; 322 bool success = false;
228 try 323 try
229 { 324 {
230 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 325 AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30);
231 delegate(AssetBase a) 326 if (a != null)
327 {
328 if (m_Cache != null)
329 m_Cache.Cache(a);
330
331 List<AssetRetrievedEx> handlers;
332 lock (m_AssetHandlers)
232 { 333 {
233 if (a != null && m_Cache != null) 334 handlers = m_AssetHandlers[id];
234 m_Cache.Cache(a); 335 m_AssetHandlers.Remove(id);
336 }
235 337
236 AssetRetrievedEx handlers; 338 Util.FireAndForget(x =>
237 lock (m_AssetHandlers)
238 { 339 {
239 handlers = m_AssetHandlers[id]; 340
240 m_AssetHandlers.Remove(id); 341 foreach (AssetRetrievedEx h in handlers)
241 } 342 {
242 handlers.Invoke(a); 343 // Util.FireAndForget(x =>
243 }, m_maxAssetRequestConcurrency, m_Auth); 344 // {
244 345 try { h.Invoke(a); }
245 success = true; 346 catch { }
347 // });
348 }
349
350 if (handlers != null)
351 handlers.Clear();
352
353 });
354
355// if (handlers != null)
356// handlers.Clear();
357 success = true;
358 }
246 } 359 }
247 finally 360 finally
248 { 361 {
249 if (!success) 362 if (!success)
250 { 363 {
364 List<AssetRetrievedEx> handlers;
251 lock (m_AssetHandlers) 365 lock (m_AssetHandlers)
252 { 366 {
367 handlers = m_AssetHandlers[id];
253 m_AssetHandlers.Remove(id); 368 m_AssetHandlers.Remove(id);
254 } 369 }
370 if (handlers != null)
371 handlers.Clear();
255 } 372 }
256 } 373 }
257 } 374 }
375 }
376
377 public bool Get(string id, Object sender, AssetRetrieved handler)
378 {
379 string uri = MapServer(id) + "/assets/" + id;
380
381 AssetBase asset = null;
382 if (m_Cache != null)
383 asset = m_Cache.Get(id);
384
385 if (asset == null || asset.Data == null || asset.Data.Length == 0)
386 {
387 lock (m_AssetHandlers)
388 {
389 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
390
391// AssetRetrievedEx handlers;
392 List<AssetRetrievedEx> handlers;
393 if (m_AssetHandlers.TryGetValue(id, out handlers))
394 {
395 // Someone else is already loading this asset. It will notify our handler when done.
396// handlers += handlerEx;
397 handlers.Add(handlerEx);
398 return true;
399 }
400
401 // Load the asset ourselves
402// handlers += handlerEx;
403 handlers = new List<AssetRetrievedEx>();
404 handlers.Add(handlerEx);
405
406 m_AssetHandlers.Add(id, handlers);
407 }
408
409 QueuedAssetRequest request = new QueuedAssetRequest();
410 request.id = id;
411 request.uri = uri;
412
413 m_requestQueue.Enqueue(request);
414 }
258 else 415 else
259 { 416 {
260 handler(id, sender, asset); 417 handler(id, sender, asset);
@@ -286,43 +443,95 @@ namespace OpenSim.Services.Connectors
286 443
287 public string Store(AssetBase asset) 444 public string Store(AssetBase asset)
288 { 445 {
289 if (asset.Local) 446 // Have to assign the asset ID here. This isn't likely to
447 // trigger since current callers don't pass emtpy IDs
448 // We need the asset ID to route the request to the proper
449 // cluster member, so we can't have the server assign one.
450 if (asset.ID == string.Empty)
290 { 451 {
291 if (m_Cache != null) 452 if (asset.FullID == UUID.Zero)
292 m_Cache.Cache(asset); 453 {
454 asset.FullID = UUID.Random();
455 }
456 asset.ID = asset.FullID.ToString();
457 }
458 else if (asset.FullID == UUID.Zero)
459 {
460 UUID uuid = UUID.Zero;
461 if (UUID.TryParse(asset.ID, out uuid))
462 {
463 asset.FullID = uuid;
464 }
465 else
466 {
467 asset.FullID = UUID.Random();
468 }
469 }
293 470
471 if (m_Cache != null)
472 m_Cache.Cache(asset);
473 if (asset.Temporary || asset.Local)
474 {
294 return asset.ID; 475 return asset.ID;
295 } 476 }
296 477
297 string uri = m_ServerURI + "/assets/"; 478 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
298 479
299 string newID; 480 string newID;
300 try 481 try
301 { 482 {
302 newID = SynchronousRestObjectRequester.MakeRequest<AssetBase, string>("POST", uri, asset, m_Auth); 483 newID = SynchronousRestObjectRequester.
484 MakeRequest<AssetBase, string>("POST", uri, asset, 100);
485 if (newID == null || newID == "")
486 {
487 newID = UUID.Zero.ToString();
488 }
303 } 489 }
304 catch (Exception e) 490 catch (Exception e)
305 { 491 {
306 m_log.Warn(string.Format("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1} ", asset.ID, e.Message), e); 492 newID = UUID.Zero.ToString();
307 return string.Empty;
308 } 493 }
309 494
310 // TEMPORARY: SRAS returns 'null' when it's asked to store existing assets 495 if (newID == UUID.Zero.ToString())
311 if (newID == null)
312 { 496 {
313 m_log.DebugFormat("[ASSET CONNECTOR]: Storing of asset {0} returned null; assuming the asset already exists", asset.ID); 497 //The asset upload failed, put it in a queue for later
314 return asset.ID; 498 asset.UploadAttempts++;
499 if (asset.UploadAttempts > 30)
500 {
501 //By this stage we've been in the queue for a good few hours;
502 //We're going to drop the asset.
503 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
504 }
505 else
506 {
507 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
508 {
509 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
510 }
511 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
512 m_queue.Add(asset);
513 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
514 m_retryTimer.Start();
515 }
315 } 516 }
517 else
518 {
519 if (asset.UploadAttempts > 0)
520 {
521 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
522 }
523 if (newID != String.Empty)
524 {
525 // Placing this here, so that this work with old asset servers that don't send any reply back
526 // SynchronousRestObjectRequester returns somethins that is not an empty string
527 if (newID != null)
528 asset.ID = newID;
316 529
317 if (string.IsNullOrEmpty(newID)) 530 if (m_Cache != null)
318 return string.Empty; 531 m_Cache.Cache(asset);
319 532 }
320 asset.ID = newID; 533 }
321 534 return asset.ID;
322 if (m_Cache != null)
323 m_Cache.Cache(asset);
324
325 return newID;
326 } 535 }
327 536
328 public bool UpdateContent(string id, byte[] data) 537 public bool UpdateContent(string id, byte[] data)
@@ -343,7 +552,7 @@ namespace OpenSim.Services.Connectors
343 } 552 }
344 asset.Data = data; 553 asset.Data = data;
345 554
346 string uri = m_ServerURI + "/assets/" + id; 555 string uri = MapServer(id) + "/assets/" + id;
347 556
348 if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth)) 557 if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth))
349 { 558 {
@@ -357,7 +566,7 @@ namespace OpenSim.Services.Connectors
357 566
358 public bool Delete(string id) 567 public bool Delete(string id)
359 { 568 {
360 string uri = m_ServerURI + "/assets/" + id; 569 string uri = MapServer(id) + "/assets/" + id;
361 570
362 if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth)) 571 if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth))
363 { 572 {