diff options
Diffstat (limited to '')
-rw-r--r-- | OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | 211 |
1 files changed, 135 insertions, 76 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs index 7ac7917..99119d3 100644 --- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | |||
@@ -46,10 +46,13 @@ namespace OpenSim.Services.Connectors | |||
46 | LogManager.GetLogger( | 46 | LogManager.GetLogger( |
47 | MethodBase.GetCurrentMethod().DeclaringType); | 47 | MethodBase.GetCurrentMethod().DeclaringType); |
48 | 48 | ||
49 | const int MAXSENDRETRIESLEN = 30; | ||
50 | |||
49 | private string m_ServerURI = String.Empty; | 51 | private string m_ServerURI = String.Empty; |
50 | private IImprovedAssetCache m_Cache = null; | 52 | private IImprovedAssetCache m_Cache = null; |
51 | private int m_retryCounter; | 53 | private int m_retryCounter; |
52 | private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); | 54 | private bool m_inRetries; |
55 | private List<AssetBase>[] m_sendRetries = new List<AssetBase>[MAXSENDRETRIESLEN]; | ||
53 | private System.Timers.Timer m_retryTimer; | 56 | private System.Timers.Timer m_retryTimer; |
54 | private int m_maxAssetRequestConcurrency = 30; | 57 | private int m_maxAssetRequestConcurrency = 30; |
55 | 58 | ||
@@ -110,9 +113,9 @@ namespace OpenSim.Services.Connectors | |||
110 | throw new Exception("Asset connector init error"); | 113 | throw new Exception("Asset connector init error"); |
111 | } | 114 | } |
112 | 115 | ||
113 | |||
114 | m_retryTimer = new System.Timers.Timer(); | 116 | m_retryTimer = new System.Timers.Timer(); |
115 | m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); | 117 | m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); |
118 | m_retryTimer.AutoReset = true; | ||
116 | m_retryTimer.Interval = 60000; | 119 | m_retryTimer.Interval = 60000; |
117 | 120 | ||
118 | Uri serverUri = new Uri(m_ServerURI); | 121 | Uri serverUri = new Uri(m_ServerURI); |
@@ -166,48 +169,67 @@ namespace OpenSim.Services.Connectors | |||
166 | 169 | ||
167 | protected void retryCheck(object source, ElapsedEventArgs e) | 170 | protected void retryCheck(object source, ElapsedEventArgs e) |
168 | { | 171 | { |
169 | m_retryCounter++; | 172 | lock(m_sendRetries) |
170 | if (m_retryCounter > 60) | ||
171 | m_retryCounter -= 60; | ||
172 | |||
173 | List<int> keys = new List<int>(); | ||
174 | foreach (int a in m_retryQueue.Keys) | ||
175 | { | 173 | { |
176 | keys.Add(a); | 174 | if(m_inRetries) |
175 | return; | ||
176 | m_inRetries = true; | ||
177 | } | 177 | } |
178 | foreach (int a in keys) | 178 | |
179 | m_retryCounter++; | ||
180 | if(m_retryCounter >= 61 ) // avoid overflow 60 is max in use below | ||
181 | m_retryCounter = 1; | ||
182 | |||
183 | int inUse = 0; | ||
184 | int nextlevel; | ||
185 | int timefactor; | ||
186 | List<AssetBase> retrylist; | ||
187 | // we need to go down | ||
188 | for(int i = MAXSENDRETRIESLEN - 1; i >= 0; i--) | ||
179 | { | 189 | { |
190 | lock(m_sendRetries) | ||
191 | retrylist = m_sendRetries[i]; | ||
192 | |||
193 | if(retrylist == null) | ||
194 | continue; | ||
195 | |||
196 | inUse++; | ||
197 | nextlevel = i + 1; | ||
198 | |||
180 | //We exponentially fall back on frequency until we reach one attempt per hour | 199 | //We exponentially fall back on frequency until we reach one attempt per hour |
181 | //The net result is that we end up in the queue for roughly 24 hours.. | 200 | //The net result is that we end up in the queue for roughly 24 hours.. |
182 | //24 hours worth of assets could be a lot, so the hope is that the region admin | 201 | //24 hours worth of assets could be a lot, so the hope is that the region admin |
183 | //will have gotten the asset connector back online quickly! | 202 | //will have gotten the asset connector back online quickly! |
184 | 203 | if(i == 0) | |
185 | int timefactor = a ^ 2; | 204 | timefactor = 1; |
186 | if (timefactor > 60) | 205 | else |
187 | { | 206 | { |
188 | timefactor = 60; | 207 | timefactor = 1 << nextlevel; |
208 | if (timefactor > 60) | ||
209 | timefactor = 60; | ||
189 | } | 210 | } |
190 | 211 | ||
191 | //First, find out if we care about this timefactor | 212 | if(m_retryCounter < timefactor) |
192 | if (timefactor % a == 0) | 213 | continue; // to update inUse; |
193 | { | ||
194 | //Yes, we do! | ||
195 | List<AssetBase> retrylist = m_retryQueue[a]; | ||
196 | m_retryQueue.Remove(a); | ||
197 | 214 | ||
198 | foreach(AssetBase ass in retrylist) | 215 | if (m_retryCounter % timefactor != 0) |
199 | { | 216 | continue; |
200 | Store(ass); //Store my ass. This function will put it back in the dictionary if it fails | 217 | |
201 | } | 218 | // a list to retry |
202 | } | 219 | lock(m_sendRetries) |
220 | m_sendRetries[i] = null; | ||
221 | |||
222 | // we are the only ones with a copy of this retrylist now | ||
223 | foreach(AssetBase ass in retrylist) | ||
224 | retryStore(ass, nextlevel); | ||
203 | } | 225 | } |
204 | 226 | ||
205 | if (m_retryQueue.Count == 0) | 227 | lock(m_sendRetries) |
206 | { | 228 | { |
207 | //It might only be one tick per minute, but I have | 229 | if(inUse == 0 ) |
208 | //repented and abandoned my wasteful ways | 230 | m_retryTimer.Stop(); |
209 | m_retryCounter = 0; | 231 | |
210 | m_retryTimer.Stop(); | 232 | m_inRetries = false; |
211 | } | 233 | } |
212 | } | 234 | } |
213 | 235 | ||
@@ -237,8 +259,9 @@ namespace OpenSim.Services.Connectors | |||
237 | 259 | ||
238 | asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth); | 260 | asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth); |
239 | 261 | ||
240 | if (m_Cache != null) | 262 | |
241 | m_Cache.Cache(asset); | 263 | if (asset != null && m_Cache != null) |
264 | m_Cache.Cache(asset); | ||
242 | } | 265 | } |
243 | return asset; | 266 | return asset; |
244 | } | 267 | } |
@@ -340,25 +363,18 @@ namespace OpenSim.Services.Connectors | |||
340 | m_AssetHandlers.Remove(id); | 363 | m_AssetHandlers.Remove(id); |
341 | } | 364 | } |
342 | 365 | ||
343 | Util.FireAndForget(x => | 366 | if(handlers != null) |
367 | { | ||
368 | Util.FireAndForget(x => | ||
344 | { | 369 | { |
345 | |||
346 | foreach (AssetRetrievedEx h in handlers) | 370 | foreach (AssetRetrievedEx h in handlers) |
347 | { | 371 | { |
348 | // Util.FireAndForget(x => | ||
349 | // { | ||
350 | try { h.Invoke(a); } | 372 | try { h.Invoke(a); } |
351 | catch { } | 373 | catch { } |
352 | // }); | ||
353 | } | 374 | } |
354 | 375 | handlers.Clear(); | |
355 | if (handlers != null) | ||
356 | handlers.Clear(); | ||
357 | |||
358 | }); | 376 | }); |
359 | 377 | } | |
360 | // if (handlers != null) | ||
361 | // handlers.Clear(); | ||
362 | success = true; | 378 | success = true; |
363 | } | 379 | } |
364 | } | 380 | } |
@@ -393,29 +409,24 @@ namespace OpenSim.Services.Connectors | |||
393 | { | 409 | { |
394 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | 410 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); |
395 | 411 | ||
396 | // AssetRetrievedEx handlers; | ||
397 | List<AssetRetrievedEx> handlers; | 412 | List<AssetRetrievedEx> handlers; |
398 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | 413 | if (m_AssetHandlers.TryGetValue(id, out handlers)) |
399 | { | 414 | { |
400 | // Someone else is already loading this asset. It will notify our handler when done. | 415 | // Someone else is already loading this asset. It will notify our handler when done. |
401 | // handlers += handlerEx; | ||
402 | handlers.Add(handlerEx); | 416 | handlers.Add(handlerEx); |
403 | return true; | 417 | return true; |
404 | } | 418 | } |
405 | 419 | ||
406 | // Load the asset ourselves | ||
407 | // handlers += handlerEx; | ||
408 | handlers = new List<AssetRetrievedEx>(); | 420 | handlers = new List<AssetRetrievedEx>(); |
409 | handlers.Add(handlerEx); | 421 | handlers.Add(handlerEx); |
410 | 422 | ||
411 | m_AssetHandlers.Add(id, handlers); | 423 | m_AssetHandlers.Add(id, handlers); |
412 | } | ||
413 | |||
414 | QueuedAssetRequest request = new QueuedAssetRequest(); | ||
415 | request.id = id; | ||
416 | request.uri = uri; | ||
417 | 424 | ||
418 | m_requestQueue.Enqueue(request); | 425 | QueuedAssetRequest request = new QueuedAssetRequest(); |
426 | request.id = id; | ||
427 | request.uri = uri; | ||
428 | m_requestQueue.Enqueue(request); | ||
429 | } | ||
419 | } | 430 | } |
420 | else | 431 | else |
421 | { | 432 | { |
@@ -495,43 +506,34 @@ namespace OpenSim.Services.Connectors | |||
495 | newID = SynchronousRestObjectRequester. | 506 | newID = SynchronousRestObjectRequester. |
496 | MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth); | 507 | MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth); |
497 | } | 508 | } |
498 | catch {} | 509 | catch |
510 | { | ||
511 | newID = null; | ||
512 | } | ||
499 | 513 | ||
500 | if (newID == null || newID == String.Empty || newID == stringUUIDZero) | 514 | if (newID == null || newID == String.Empty || newID == stringUUIDZero) |
501 | { | 515 | { |
502 | //The asset upload failed, put it in a queue for later | 516 | //The asset upload failed, try later |
503 | asset.UploadAttempts++; | 517 | lock(m_sendRetries) |
504 | if (asset.UploadAttempts > 30) | ||
505 | { | 518 | { |
506 | //By this stage we've been in the queue for a good few hours; | 519 | if (m_sendRetries[0] == null) |
507 | //We're going to drop the asset. | 520 | m_sendRetries[0] = new List<AssetBase>(); |
508 | m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); | 521 | List<AssetBase> m_queue = m_sendRetries[0]; |
509 | } | ||
510 | else | ||
511 | { | ||
512 | if (!m_retryQueue.ContainsKey(asset.UploadAttempts)) | ||
513 | { | ||
514 | m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>()); | ||
515 | } | ||
516 | List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts]; | ||
517 | m_queue.Add(asset); | 522 | m_queue.Add(asset); |
518 | m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); | 523 | m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later", |
524 | asset.ID.ToString(), asset.Type.ToString()); | ||
519 | m_retryTimer.Start(); | 525 | m_retryTimer.Start(); |
520 | } | 526 | } |
521 | } | 527 | } |
522 | else | 528 | else |
523 | { | 529 | { |
524 | if (asset.UploadAttempts > 0) | ||
525 | { | ||
526 | m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString()); | ||
527 | } | ||
528 | if (newID != asset.ID) | 530 | if (newID != asset.ID) |
529 | { | 531 | { |
530 | // Placing this here, so that this work with old asset servers that don't send any reply back | 532 | // Placing this here, so that this work with old asset servers that don't send any reply back |
531 | // SynchronousRestObjectRequester returns somethins that is not an empty string | 533 | // SynchronousRestObjectRequester returns somethins that is not an empty string |
532 | 534 | ||
533 | asset.ID = newID; | 535 | asset.ID = newID; |
534 | // what about FullID ???? | 536 | |
535 | if (m_Cache != null) | 537 | if (m_Cache != null) |
536 | m_Cache.Cache(asset); | 538 | m_Cache.Cache(asset); |
537 | } | 539 | } |
@@ -539,6 +541,62 @@ namespace OpenSim.Services.Connectors | |||
539 | return asset.ID; | 541 | return asset.ID; |
540 | } | 542 | } |
541 | 543 | ||
544 | public void retryStore(AssetBase asset, int nextRetryLevel) | ||
545 | { | ||
546 | /* this may be bad, so excluding | ||
547 | if (m_Cache != null && !m_Cache.Check(asset.ID)) | ||
548 | { | ||
549 | m_log.WarnFormat("[Assets] Upload giveup asset bc no longer in local cache: {0}", | ||
550 | asset.ID.ToString(); | ||
551 | return; // if no longer in cache, it was deleted or expired | ||
552 | } | ||
553 | */ | ||
554 | string uri = MapServer(asset.FullID.ToString()) + "/assets/"; | ||
555 | |||
556 | string newID = null; | ||
557 | try | ||
558 | { | ||
559 | newID = SynchronousRestObjectRequester. | ||
560 | MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth); | ||
561 | } | ||
562 | catch | ||
563 | { | ||
564 | newID = null; | ||
565 | } | ||
566 | |||
567 | if (newID == null || newID == String.Empty || newID == stringUUIDZero) | ||
568 | { | ||
569 | if(nextRetryLevel >= MAXSENDRETRIESLEN) | ||
570 | m_log.WarnFormat("[Assets] Upload giveup after several retries id: {0} type {1}", | ||
571 | asset.ID.ToString(), asset.Type.ToString()); | ||
572 | else | ||
573 | { | ||
574 | lock(m_sendRetries) | ||
575 | { | ||
576 | if (m_sendRetries[nextRetryLevel] == null) | ||
577 | { | ||
578 | m_sendRetries[nextRetryLevel] = new List<AssetBase>(); | ||
579 | } | ||
580 | List<AssetBase> m_queue = m_sendRetries[nextRetryLevel]; | ||
581 | m_queue.Add(asset); | ||
582 | m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later", | ||
583 | asset.ID.ToString(), asset.Type.ToString()); | ||
584 | } | ||
585 | } | ||
586 | } | ||
587 | else | ||
588 | { | ||
589 | m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), nextRetryLevel.ToString()); | ||
590 | if (newID != asset.ID) | ||
591 | { | ||
592 | asset.ID = newID; | ||
593 | |||
594 | if (m_Cache != null) | ||
595 | m_Cache.Cache(asset); | ||
596 | } | ||
597 | } | ||
598 | } | ||
599 | |||
542 | public bool UpdateContent(string id, byte[] data) | 600 | public bool UpdateContent(string id, byte[] data) |
543 | { | 601 | { |
544 | AssetBase asset = null; | 602 | AssetBase asset = null; |
@@ -569,6 +627,7 @@ namespace OpenSim.Services.Connectors | |||
569 | return false; | 627 | return false; |
570 | } | 628 | } |
571 | 629 | ||
630 | |||
572 | public bool Delete(string id) | 631 | public bool Delete(string id) |
573 | { | 632 | { |
574 | string uri = MapServer(id) + "/assets/" + id; | 633 | string uri = MapServer(id) + "/assets/" + id; |