aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs211
1 files changed, 135 insertions, 76 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index 7ac7917..99119d3 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -46,10 +46,13 @@ namespace OpenSim.Services.Connectors
46 LogManager.GetLogger( 46 LogManager.GetLogger(
47 MethodBase.GetCurrentMethod().DeclaringType); 47 MethodBase.GetCurrentMethod().DeclaringType);
48 48
49 const int MAXSENDRETRIESLEN = 30;
50
49 private string m_ServerURI = String.Empty; 51 private string m_ServerURI = String.Empty;
50 private IImprovedAssetCache m_Cache = null; 52 private IImprovedAssetCache m_Cache = null;
51 private int m_retryCounter; 53 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); 54 private bool m_inRetries;
55 private List<AssetBase>[] m_sendRetries = new List<AssetBase>[MAXSENDRETRIESLEN];
53 private System.Timers.Timer m_retryTimer; 56 private System.Timers.Timer m_retryTimer;
54 private int m_maxAssetRequestConcurrency = 30; 57 private int m_maxAssetRequestConcurrency = 30;
55 58
@@ -110,9 +113,9 @@ namespace OpenSim.Services.Connectors
110 throw new Exception("Asset connector init error"); 113 throw new Exception("Asset connector init error");
111 } 114 }
112 115
113
114 m_retryTimer = new System.Timers.Timer(); 116 m_retryTimer = new System.Timers.Timer();
115 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); 117 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
118 m_retryTimer.AutoReset = true;
116 m_retryTimer.Interval = 60000; 119 m_retryTimer.Interval = 60000;
117 120
118 Uri serverUri = new Uri(m_ServerURI); 121 Uri serverUri = new Uri(m_ServerURI);
@@ -166,48 +169,67 @@ namespace OpenSim.Services.Connectors
166 169
167 protected void retryCheck(object source, ElapsedEventArgs e) 170 protected void retryCheck(object source, ElapsedEventArgs e)
168 { 171 {
169 m_retryCounter++; 172 lock(m_sendRetries)
170 if (m_retryCounter > 60)
171 m_retryCounter -= 60;
172
173 List<int> keys = new List<int>();
174 foreach (int a in m_retryQueue.Keys)
175 { 173 {
176 keys.Add(a); 174 if(m_inRetries)
175 return;
176 m_inRetries = true;
177 } 177 }
178 foreach (int a in keys) 178
179 m_retryCounter++;
180 if(m_retryCounter >= 61 ) // avoid overflow 60 is max in use below
181 m_retryCounter = 1;
182
183 int inUse = 0;
184 int nextlevel;
185 int timefactor;
186 List<AssetBase> retrylist;
187 // we need to go down
188 for(int i = MAXSENDRETRIESLEN - 1; i >= 0; i--)
179 { 189 {
190 lock(m_sendRetries)
191 retrylist = m_sendRetries[i];
192
193 if(retrylist == null)
194 continue;
195
196 inUse++;
197 nextlevel = i + 1;
198
180 //We exponentially fall back on frequency until we reach one attempt per hour 199 //We exponentially fall back on frequency until we reach one attempt per hour
181 //The net result is that we end up in the queue for roughly 24 hours.. 200 //The net result is that we end up in the queue for roughly 24 hours..
182 //24 hours worth of assets could be a lot, so the hope is that the region admin 201 //24 hours worth of assets could be a lot, so the hope is that the region admin
183 //will have gotten the asset connector back online quickly! 202 //will have gotten the asset connector back online quickly!
184 203 if(i == 0)
185 int timefactor = a ^ 2; 204 timefactor = 1;
186 if (timefactor > 60) 205 else
187 { 206 {
188 timefactor = 60; 207 timefactor = 1 << nextlevel;
208 if (timefactor > 60)
209 timefactor = 60;
189 } 210 }
190 211
191 //First, find out if we care about this timefactor 212 if(m_retryCounter < timefactor)
192 if (timefactor % a == 0) 213 continue; // to update inUse;
193 {
194 //Yes, we do!
195 List<AssetBase> retrylist = m_retryQueue[a];
196 m_retryQueue.Remove(a);
197 214
198 foreach(AssetBase ass in retrylist) 215 if (m_retryCounter % timefactor != 0)
199 { 216 continue;
200 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails 217
201 } 218 // a list to retry
202 } 219 lock(m_sendRetries)
220 m_sendRetries[i] = null;
221
222 // we are the only ones with a copy of this retrylist now
223 foreach(AssetBase ass in retrylist)
224 retryStore(ass, nextlevel);
203 } 225 }
204 226
205 if (m_retryQueue.Count == 0) 227 lock(m_sendRetries)
206 { 228 {
207 //It might only be one tick per minute, but I have 229 if(inUse == 0 )
208 //repented and abandoned my wasteful ways 230 m_retryTimer.Stop();
209 m_retryCounter = 0; 231
210 m_retryTimer.Stop(); 232 m_inRetries = false;
211 } 233 }
212 } 234 }
213 235
@@ -237,8 +259,9 @@ namespace OpenSim.Services.Connectors
237 259
238 asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth); 260 asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth);
239 261
240 if (m_Cache != null) 262
241 m_Cache.Cache(asset); 263 if (asset != null && m_Cache != null)
264 m_Cache.Cache(asset);
242 } 265 }
243 return asset; 266 return asset;
244 } 267 }
@@ -340,25 +363,18 @@ namespace OpenSim.Services.Connectors
340 m_AssetHandlers.Remove(id); 363 m_AssetHandlers.Remove(id);
341 } 364 }
342 365
343 Util.FireAndForget(x => 366 if(handlers != null)
367 {
368 Util.FireAndForget(x =>
344 { 369 {
345
346 foreach (AssetRetrievedEx h in handlers) 370 foreach (AssetRetrievedEx h in handlers)
347 { 371 {
348 // Util.FireAndForget(x =>
349 // {
350 try { h.Invoke(a); } 372 try { h.Invoke(a); }
351 catch { } 373 catch { }
352 // });
353 } 374 }
354 375 handlers.Clear();
355 if (handlers != null)
356 handlers.Clear();
357
358 }); 376 });
359 377 }
360// if (handlers != null)
361// handlers.Clear();
362 success = true; 378 success = true;
363 } 379 }
364 } 380 }
@@ -393,29 +409,24 @@ namespace OpenSim.Services.Connectors
393 { 409 {
394 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); 410 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
395 411
396// AssetRetrievedEx handlers;
397 List<AssetRetrievedEx> handlers; 412 List<AssetRetrievedEx> handlers;
398 if (m_AssetHandlers.TryGetValue(id, out handlers)) 413 if (m_AssetHandlers.TryGetValue(id, out handlers))
399 { 414 {
400 // Someone else is already loading this asset. It will notify our handler when done. 415 // Someone else is already loading this asset. It will notify our handler when done.
401// handlers += handlerEx;
402 handlers.Add(handlerEx); 416 handlers.Add(handlerEx);
403 return true; 417 return true;
404 } 418 }
405 419
406 // Load the asset ourselves
407// handlers += handlerEx;
408 handlers = new List<AssetRetrievedEx>(); 420 handlers = new List<AssetRetrievedEx>();
409 handlers.Add(handlerEx); 421 handlers.Add(handlerEx);
410 422
411 m_AssetHandlers.Add(id, handlers); 423 m_AssetHandlers.Add(id, handlers);
412 }
413
414 QueuedAssetRequest request = new QueuedAssetRequest();
415 request.id = id;
416 request.uri = uri;
417 424
418 m_requestQueue.Enqueue(request); 425 QueuedAssetRequest request = new QueuedAssetRequest();
426 request.id = id;
427 request.uri = uri;
428 m_requestQueue.Enqueue(request);
429 }
419 } 430 }
420 else 431 else
421 { 432 {
@@ -495,43 +506,34 @@ namespace OpenSim.Services.Connectors
495 newID = SynchronousRestObjectRequester. 506 newID = SynchronousRestObjectRequester.
496 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth); 507 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
497 } 508 }
498 catch {} 509 catch
510 {
511 newID = null;
512 }
499 513
500 if (newID == null || newID == String.Empty || newID == stringUUIDZero) 514 if (newID == null || newID == String.Empty || newID == stringUUIDZero)
501 { 515 {
502 //The asset upload failed, put it in a queue for later 516 //The asset upload failed, try later
503 asset.UploadAttempts++; 517 lock(m_sendRetries)
504 if (asset.UploadAttempts > 30)
505 { 518 {
506 //By this stage we've been in the queue for a good few hours; 519 if (m_sendRetries[0] == null)
507 //We're going to drop the asset. 520 m_sendRetries[0] = new List<AssetBase>();
508 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); 521 List<AssetBase> m_queue = m_sendRetries[0];
509 }
510 else
511 {
512 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
513 {
514 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
515 }
516 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
517 m_queue.Add(asset); 522 m_queue.Add(asset);
518 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); 523 m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later",
524 asset.ID.ToString(), asset.Type.ToString());
519 m_retryTimer.Start(); 525 m_retryTimer.Start();
520 } 526 }
521 } 527 }
522 else 528 else
523 { 529 {
524 if (asset.UploadAttempts > 0)
525 {
526 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
527 }
528 if (newID != asset.ID) 530 if (newID != asset.ID)
529 { 531 {
530 // Placing this here, so that this work with old asset servers that don't send any reply back 532 // Placing this here, so that this work with old asset servers that don't send any reply back
531 // SynchronousRestObjectRequester returns somethins that is not an empty string 533 // SynchronousRestObjectRequester returns somethins that is not an empty string
532 534
533 asset.ID = newID; 535 asset.ID = newID;
534// what about FullID ???? 536
535 if (m_Cache != null) 537 if (m_Cache != null)
536 m_Cache.Cache(asset); 538 m_Cache.Cache(asset);
537 } 539 }
@@ -539,6 +541,62 @@ namespace OpenSim.Services.Connectors
539 return asset.ID; 541 return asset.ID;
540 } 542 }
541 543
544 public void retryStore(AssetBase asset, int nextRetryLevel)
545 {
546/* this may be bad, so excluding
547 if (m_Cache != null && !m_Cache.Check(asset.ID))
548 {
549 m_log.WarnFormat("[Assets] Upload giveup asset bc no longer in local cache: {0}",
550 asset.ID.ToString();
551 return; // if no longer in cache, it was deleted or expired
552 }
553*/
554 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
555
556 string newID = null;
557 try
558 {
559 newID = SynchronousRestObjectRequester.
560 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
561 }
562 catch
563 {
564 newID = null;
565 }
566
567 if (newID == null || newID == String.Empty || newID == stringUUIDZero)
568 {
569 if(nextRetryLevel >= MAXSENDRETRIESLEN)
570 m_log.WarnFormat("[Assets] Upload giveup after several retries id: {0} type {1}",
571 asset.ID.ToString(), asset.Type.ToString());
572 else
573 {
574 lock(m_sendRetries)
575 {
576 if (m_sendRetries[nextRetryLevel] == null)
577 {
578 m_sendRetries[nextRetryLevel] = new List<AssetBase>();
579 }
580 List<AssetBase> m_queue = m_sendRetries[nextRetryLevel];
581 m_queue.Add(asset);
582 m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later",
583 asset.ID.ToString(), asset.Type.ToString());
584 }
585 }
586 }
587 else
588 {
589 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), nextRetryLevel.ToString());
590 if (newID != asset.ID)
591 {
592 asset.ID = newID;
593
594 if (m_Cache != null)
595 m_Cache.Cache(asset);
596 }
597 }
598 }
599
542 public bool UpdateContent(string id, byte[] data) 600 public bool UpdateContent(string id, byte[] data)
543 { 601 {
544 AssetBase asset = null; 602 AssetBase asset = null;
@@ -569,6 +627,7 @@ namespace OpenSim.Services.Connectors
569 return false; 627 return false;
570 } 628 }
571 629
630
572 public bool Delete(string id) 631 public bool Delete(string id)
573 { 632 {
574 string uri = MapServer(id) + "/assets/" + id; 633 string uri = MapServer(id) + "/assets/" + id;