aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs206
1 files changed, 128 insertions, 78 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index 7ac7917..5e86771 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -46,10 +46,12 @@ namespace OpenSim.Services.Connectors
46 LogManager.GetLogger( 46 LogManager.GetLogger(
47 MethodBase.GetCurrentMethod().DeclaringType); 47 MethodBase.GetCurrentMethod().DeclaringType);
48 48
49// const int MAXSENDRETRIESLEN = 30;
50 const int MAXSENDRETRIESLEN = 2;
49 private string m_ServerURI = String.Empty; 51 private string m_ServerURI = String.Empty;
50 private IImprovedAssetCache m_Cache = null; 52 private IImprovedAssetCache m_Cache = null;
51 private int m_retryCounter; 53 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); 54 private List<AssetBase>[] m_sendRetries = new List<AssetBase>[MAXSENDRETRIESLEN];
53 private System.Timers.Timer m_retryTimer; 55 private System.Timers.Timer m_retryTimer;
54 private int m_maxAssetRequestConcurrency = 30; 56 private int m_maxAssetRequestConcurrency = 30;
55 57
@@ -110,9 +112,9 @@ namespace OpenSim.Services.Connectors
110 throw new Exception("Asset connector init error"); 112 throw new Exception("Asset connector init error");
111 } 113 }
112 114
113
114 m_retryTimer = new System.Timers.Timer(); 115 m_retryTimer = new System.Timers.Timer();
115 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); 116 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
117 m_retryTimer.AutoReset = false;
116 m_retryTimer.Interval = 60000; 118 m_retryTimer.Interval = 60000;
117 119
118 Uri serverUri = new Uri(m_ServerURI); 120 Uri serverUri = new Uri(m_ServerURI);
@@ -167,47 +169,57 @@ namespace OpenSim.Services.Connectors
167 protected void retryCheck(object source, ElapsedEventArgs e) 169 protected void retryCheck(object source, ElapsedEventArgs e)
168 { 170 {
169 m_retryCounter++; 171 m_retryCounter++;
170 if (m_retryCounter > 60) 172 if(m_retryCounter >= 61 ) // avoid overflow 60 is max in use below
171 m_retryCounter -= 60; 173 m_retryCounter = 1;
172 174
173 List<int> keys = new List<int>(); 175 int inUse = 0;
174 foreach (int a in m_retryQueue.Keys) 176 int nextlevel;
175 { 177 int timefactor;
176 keys.Add(a); 178 List<AssetBase> retrylist;
177 } 179 // we need to go down
178 foreach (int a in keys) 180 for(int i = MAXSENDRETRIESLEN - 1; i >= 0; i--)
179 { 181 {
182 lock(m_sendRetries)
183 retrylist = m_sendRetries[i];
184
185 if(retrylist == null)
186 continue;
187
188 inUse++;
189 nextlevel = i + 1;
190
180 //We exponentially fall back on frequency until we reach one attempt per hour 191 //We exponentially fall back on frequency until we reach one attempt per hour
181 //The net result is that we end up in the queue for roughly 24 hours.. 192 //The net result is that we end up in the queue for roughly 24 hours..
182 //24 hours worth of assets could be a lot, so the hope is that the region admin 193 //24 hours worth of assets could be a lot, so the hope is that the region admin
183 //will have gotten the asset connector back online quickly! 194 //will have gotten the asset connector back online quickly!
184 195 if(i == 0)
185 int timefactor = a ^ 2; 196 timefactor = 1;
186 if (timefactor > 60) 197 else
187 { 198 {
188 timefactor = 60; 199 timefactor = 1 << nextlevel;
200 if (timefactor > 60)
201 timefactor = 60;
189 } 202 }
190 203
191 //First, find out if we care about this timefactor 204 if(m_retryCounter < timefactor)
192 if (timefactor % a == 0) 205 continue; // to update inUse;
193 {
194 //Yes, we do!
195 List<AssetBase> retrylist = m_retryQueue[a];
196 m_retryQueue.Remove(a);
197 206
198 foreach(AssetBase ass in retrylist) 207 if (m_retryCounter % timefactor != 0)
199 { 208 continue;
200 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails 209
201 } 210 // a list to retry
202 } 211 lock(m_sendRetries)
212 m_sendRetries[i] = null;
213
214 // we are the only ones with a copy of this retrylist now
215 foreach(AssetBase ass in retrylist)
216 retryStore(ass, nextlevel);
203 } 217 }
204 218
205 if (m_retryQueue.Count == 0) 219 lock(m_sendRetries)
206 { 220 {
207 //It might only be one tick per minute, but I have 221 if(inUse > 0 && !m_retryTimer.Enabled)
208 //repented and abandoned my wasteful ways 222 m_retryTimer.Start();
209 m_retryCounter = 0;
210 m_retryTimer.Stop();
211 } 223 }
212 } 224 }
213 225
@@ -237,8 +249,9 @@ namespace OpenSim.Services.Connectors
237 249
238 asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth); 250 asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth);
239 251
240 if (m_Cache != null) 252
241 m_Cache.Cache(asset); 253 if (asset != null && m_Cache != null)
254 m_Cache.Cache(asset);
242 } 255 }
243 return asset; 256 return asset;
244 } 257 }
@@ -340,25 +353,18 @@ namespace OpenSim.Services.Connectors
340 m_AssetHandlers.Remove(id); 353 m_AssetHandlers.Remove(id);
341 } 354 }
342 355
343 Util.FireAndForget(x => 356 if(handlers != null)
357 {
358 Util.FireAndForget(x =>
344 { 359 {
345
346 foreach (AssetRetrievedEx h in handlers) 360 foreach (AssetRetrievedEx h in handlers)
347 { 361 {
348 // Util.FireAndForget(x =>
349 // {
350 try { h.Invoke(a); } 362 try { h.Invoke(a); }
351 catch { } 363 catch { }
352 // });
353 } 364 }
354 365 handlers.Clear();
355 if (handlers != null)
356 handlers.Clear();
357
358 }); 366 });
359 367 }
360// if (handlers != null)
361// handlers.Clear();
362 success = true; 368 success = true;
363 } 369 }
364 } 370 }
@@ -393,29 +399,24 @@ namespace OpenSim.Services.Connectors
393 { 399 {
394 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); 400 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
395 401
396// AssetRetrievedEx handlers;
397 List<AssetRetrievedEx> handlers; 402 List<AssetRetrievedEx> handlers;
398 if (m_AssetHandlers.TryGetValue(id, out handlers)) 403 if (m_AssetHandlers.TryGetValue(id, out handlers))
399 { 404 {
400 // Someone else is already loading this asset. It will notify our handler when done. 405 // Someone else is already loading this asset. It will notify our handler when done.
401// handlers += handlerEx;
402 handlers.Add(handlerEx); 406 handlers.Add(handlerEx);
403 return true; 407 return true;
404 } 408 }
405 409
406 // Load the asset ourselves
407// handlers += handlerEx;
408 handlers = new List<AssetRetrievedEx>(); 410 handlers = new List<AssetRetrievedEx>();
409 handlers.Add(handlerEx); 411 handlers.Add(handlerEx);
410 412
411 m_AssetHandlers.Add(id, handlers); 413 m_AssetHandlers.Add(id, handlers);
412 }
413 414
414 QueuedAssetRequest request = new QueuedAssetRequest(); 415 QueuedAssetRequest request = new QueuedAssetRequest();
415 request.id = id; 416 request.id = id;
416 request.uri = uri; 417 request.uri = uri;
417 418 m_requestQueue.Enqueue(request);
418 m_requestQueue.Enqueue(request); 419 }
419 } 420 }
420 else 421 else
421 { 422 {
@@ -495,43 +496,35 @@ namespace OpenSim.Services.Connectors
495 newID = SynchronousRestObjectRequester. 496 newID = SynchronousRestObjectRequester.
496 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth); 497 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
497 } 498 }
498 catch {} 499 catch
500 {
501 newID = null;
502 }
499 503
500 if (newID == null || newID == String.Empty || newID == stringUUIDZero) 504 if (newID == null || newID == String.Empty || newID == stringUUIDZero)
501 { 505 {
502 //The asset upload failed, put it in a queue for later 506 //The asset upload failed, try later
503 asset.UploadAttempts++; 507 lock(m_sendRetries)
504 if (asset.UploadAttempts > 30)
505 {
506 //By this stage we've been in the queue for a good few hours;
507 //We're going to drop the asset.
508 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
509 }
510 else
511 { 508 {
512 if (!m_retryQueue.ContainsKey(asset.UploadAttempts)) 509 if (m_sendRetries[0] == null)
513 { 510 m_sendRetries[0] = new List<AssetBase>();
514 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>()); 511 List<AssetBase> m_queue = m_sendRetries[0];
515 }
516 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
517 m_queue.Add(asset); 512 m_queue.Add(asset);
518 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); 513 m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later",
519 m_retryTimer.Start(); 514 asset.ID.ToString(), asset.Type.ToString());
515 if(!m_retryTimer.Enabled)
516 m_retryTimer.Start();
520 } 517 }
521 } 518 }
522 else 519 else
523 { 520 {
524 if (asset.UploadAttempts > 0)
525 {
526 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
527 }
528 if (newID != asset.ID) 521 if (newID != asset.ID)
529 { 522 {
530 // Placing this here, so that this work with old asset servers that don't send any reply back 523 // Placing this here, so that this work with old asset servers that don't send any reply back
531 // SynchronousRestObjectRequester returns somethins that is not an empty string 524 // SynchronousRestObjectRequester returns somethins that is not an empty string
532 525
533 asset.ID = newID; 526 asset.ID = newID;
534// what about FullID ???? 527
535 if (m_Cache != null) 528 if (m_Cache != null)
536 m_Cache.Cache(asset); 529 m_Cache.Cache(asset);
537 } 530 }
@@ -539,6 +532,62 @@ namespace OpenSim.Services.Connectors
539 return asset.ID; 532 return asset.ID;
540 } 533 }
541 534
535 public void retryStore(AssetBase asset, int nextRetryLevel)
536 {
537/* this may be bad, so excluding
538 if (m_Cache != null && !m_Cache.Check(asset.ID))
539 {
540 m_log.WarnFormat("[Assets] Upload giveup asset bc no longer in local cache: {0}",
541 asset.ID.ToString();
542 return; // if no longer in cache, it was deleted or expired
543 }
544*/
545 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
546
547 string newID = null;
548 try
549 {
550 newID = SynchronousRestObjectRequester.
551 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
552 }
553 catch
554 {
555 newID = null;
556 }
557
558 if (newID == null || newID == String.Empty || newID == stringUUIDZero)
559 {
560 if(nextRetryLevel >= MAXSENDRETRIESLEN)
561 m_log.WarnFormat("[Assets] Upload giveup after several retries id: {0} type {1}",
562 asset.ID.ToString(), asset.Type.ToString());
563 else
564 {
565 lock(m_sendRetries)
566 {
567 if (m_sendRetries[nextRetryLevel] == null)
568 {
569 m_sendRetries[nextRetryLevel] = new List<AssetBase>();
570 }
571 List<AssetBase> m_queue = m_sendRetries[nextRetryLevel];
572 m_queue.Add(asset);
573 m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later",
574 asset.ID.ToString(), asset.Type.ToString());
575 }
576 }
577 }
578 else
579 {
580 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), nextRetryLevel.ToString());
581 if (newID != asset.ID)
582 {
583 asset.ID = newID;
584
585 if (m_Cache != null)
586 m_Cache.Cache(asset);
587 }
588 }
589 }
590
542 public bool UpdateContent(string id, byte[] data) 591 public bool UpdateContent(string id, byte[] data)
543 { 592 {
544 AssetBase asset = null; 593 AssetBase asset = null;
@@ -569,6 +618,7 @@ namespace OpenSim.Services.Connectors
569 return false; 618 return false;
570 } 619 }
571 620
621
572 public bool Delete(string id) 622 public bool Delete(string id)
573 { 623 {
574 string uri = MapServer(id) + "/assets/" + id; 624 string uri = MapServer(id) + "/assets/" + id;