aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs366
1 files changed, 290 insertions, 76 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index bd43552..622780a 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -46,13 +48,22 @@ namespace OpenSim.Services.Connectors
46 48
47 private string m_ServerURI = String.Empty; 49 private string m_ServerURI = String.Empty;
48 private IImprovedAssetCache m_Cache = null; 50 private IImprovedAssetCache m_Cache = null;
51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private System.Timers.Timer m_retryTimer;
49 private int m_maxAssetRequestConcurrency = 30; 54 private int m_maxAssetRequestConcurrency = 30;
50 55
51 private delegate void AssetRetrievedEx(AssetBase asset); 56 private delegate void AssetRetrievedEx(AssetBase asset);
52 57
53 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 58 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
54 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 59 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
55 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 60// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
61
62 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
63
64 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
65
66 private Thread[] m_fetchThreads;
56 67
57 public int MaxAssetRequestConcurrency 68 public int MaxAssetRequestConcurrency
58 { 69 {
@@ -91,13 +102,113 @@ namespace OpenSim.Services.Connectors
91 string serviceURI = assetConfig.GetString("AssetServerURI", 102 string serviceURI = assetConfig.GetString("AssetServerURI",
92 String.Empty); 103 String.Empty);
93 104
105 m_ServerURI = serviceURI;
106
94 if (serviceURI == String.Empty) 107 if (serviceURI == String.Empty)
95 { 108 {
96 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 109 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
97 throw new Exception("Asset connector init error"); 110 throw new Exception("Asset connector init error");
98 } 111 }
99 112
100 m_ServerURI = serviceURI; 113
114 m_retryTimer = new System.Timers.Timer();
115 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
116 m_retryTimer.Interval = 60000;
117
118 Uri serverUri = new Uri(m_ServerURI);
119
120 string groupHost = serverUri.Host;
121
122 for (int i = 0 ; i < 256 ; i++)
123 {
124 string prefix = i.ToString("x2");
125 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
126
127 m_UriMap[prefix] = groupHost;
128 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
129 }
130
131 m_fetchThreads = new Thread[2];
132
133 for (int i = 0 ; i < 2 ; i++)
134 {
135 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
136 m_fetchThreads[i].Start();
137 }
138 }
139
140 private string MapServer(string id)
141 {
142 if (m_UriMap.Count == 0)
143 return m_ServerURI;
144
145 UriBuilder serverUri = new UriBuilder(m_ServerURI);
146
147 string prefix = id.Substring(0, 2).ToLower();
148
149 string host;
150
151 // HG URLs will not be valid UUIDS
152 if (m_UriMap.ContainsKey(prefix))
153 host = m_UriMap[prefix];
154 else
155 host = m_UriMap["00"];
156
157 serverUri.Host = host;
158
159 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
160
161 string ret = serverUri.Uri.AbsoluteUri;
162 if (ret.EndsWith("/"))
163 ret = ret.Substring(0, ret.Length - 1);
164 return ret;
165 }
166
167 protected void retryCheck(object source, ElapsedEventArgs e)
168 {
169 m_retryCounter++;
170 if (m_retryCounter > 60)
171 m_retryCounter -= 60;
172
173 List<int> keys = new List<int>();
174 foreach (int a in m_retryQueue.Keys)
175 {
176 keys.Add(a);
177 }
178 foreach (int a in keys)
179 {
180 //We exponentially fall back on frequency until we reach one attempt per hour
181 //The net result is that we end up in the queue for roughly 24 hours..
182 //24 hours worth of assets could be a lot, so the hope is that the region admin
183 //will have gotten the asset connector back online quickly!
184
185 int timefactor = a ^ 2;
186 if (timefactor > 60)
187 {
188 timefactor = 60;
189 }
190
191 //First, find out if we care about this timefactor
192 if (timefactor % a == 0)
193 {
194 //Yes, we do!
195 List<AssetBase> retrylist = m_retryQueue[a];
196 m_retryQueue.Remove(a);
197
198 foreach(AssetBase ass in retrylist)
199 {
200 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
201 }
202 }
203 }
204
205 if (m_retryQueue.Count == 0)
206 {
207 //It might only be one tick per minute, but I have
208 //repented and abandoned my wasteful ways
209 m_retryCounter = 0;
210 m_retryTimer.Stop();
211 }
101 } 212 }
102 213
103 protected void SetCache(IImprovedAssetCache cache) 214 protected void SetCache(IImprovedAssetCache cache)
@@ -107,15 +218,13 @@ namespace OpenSim.Services.Connectors
107 218
108 public AssetBase Get(string id) 219 public AssetBase Get(string id)
109 { 220 {
110// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 221 string uri = MapServer(id) + "/assets/" + id;
111
112 string uri = m_ServerURI + "/assets/" + id;
113 222
114 AssetBase asset = null; 223 AssetBase asset = null;
115 if (m_Cache != null) 224 if (m_Cache != null)
116 asset = m_Cache.Get(id); 225 asset = m_Cache.Get(id);
117 226
118 if (asset == null) 227 if (asset == null || asset.Data == null || asset.Data.Length == 0)
119 { 228 {
120 // XXX: Commented out for now since this has either never been properly operational or not for some time 229 // XXX: Commented out for now since this has either never been properly operational or not for some time
121 // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option. 230 // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option.
@@ -154,7 +263,7 @@ namespace OpenSim.Services.Connectors
154 return fullAsset.Metadata; 263 return fullAsset.Metadata;
155 } 264 }
156 265
157 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 266 string uri = MapServer(id) + "/assets/" + id + "/metadata";
158 267
159 AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth); 268 AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth);
160 return asset; 269 return asset;
@@ -170,7 +279,7 @@ namespace OpenSim.Services.Connectors
170 return fullAsset.Data; 279 return fullAsset.Data;
171 } 280 }
172 281
173 using (RestClient rc = new RestClient(m_ServerURI)) 282 using (RestClient rc = new RestClient(MapServer(id)))
174 { 283 {
175 rc.AddResourcePath("assets"); 284 rc.AddResourcePath("assets");
176 rc.AddResourcePath(id); 285 rc.AddResourcePath(id);
@@ -178,83 +287,136 @@ namespace OpenSim.Services.Connectors
178 287
179 rc.RequestMethod = "GET"; 288 rc.RequestMethod = "GET";
180 289
181 Stream s = rc.Request(m_Auth); 290 using (Stream s = rc.Request(m_Auth))
182
183 if (s == null)
184 return null;
185
186 if (s.Length > 0)
187 { 291 {
188 byte[] ret = new byte[s.Length]; 292 if (s == null)
189 s.Read(ret, 0, (int)s.Length); 293 return null;
190 294
191 return ret; 295 if (s.Length > 0)
192 } 296 {
297 byte[] ret = new byte[s.Length];
298 s.Read(ret, 0, (int)s.Length);
193 299
300 return ret;
301 }
302 }
194 return null; 303 return null;
195 } 304 }
196 } 305 }
197 306
198 public bool Get(string id, Object sender, AssetRetrieved handler) 307 private class QueuedAssetRequest
199 { 308 {
200// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 309 public string uri;
310 public string id;
311 }
201 312
202 string uri = m_ServerURI + "/assets/" + id; 313 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
314 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
203 315
204 AssetBase asset = null; 316 private void AssetRequestProcessor()
205 if (m_Cache != null) 317 {
206 asset = m_Cache.Get(id); 318 QueuedAssetRequest r;
207 319
208 if (asset == null) 320 while (true)
209 { 321 {
210 lock (m_AssetHandlers) 322 r = m_requestQueue.Dequeue();
211 {
212 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
213 323
214 AssetRetrievedEx handlers; 324 string uri = r.uri;
215 if (m_AssetHandlers.TryGetValue(id, out handlers)) 325 string id = r.id;
216 {
217 // Someone else is already loading this asset. It will notify our handler when done.
218 handlers += handlerEx;
219 return true;
220 }
221
222 // Load the asset ourselves
223 handlers += handlerEx;
224 m_AssetHandlers.Add(id, handlers);
225 }
226 326
227 bool success = false; 327 bool success = false;
228 try 328 try
229 { 329 {
230 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 330 AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30000, m_Auth);
231 delegate(AssetBase a) 331 if (a != null)
332 {
333 if (m_Cache != null)
334 m_Cache.Cache(a);
335
336 List<AssetRetrievedEx> handlers;
337 lock (m_AssetHandlers)
232 { 338 {
233 if (a != null && m_Cache != null) 339 handlers = m_AssetHandlers[id];
234 m_Cache.Cache(a); 340 m_AssetHandlers.Remove(id);
341 }
235 342
236 AssetRetrievedEx handlers; 343 Util.FireAndForget(x =>
237 lock (m_AssetHandlers)
238 { 344 {
239 handlers = m_AssetHandlers[id]; 345
240 m_AssetHandlers.Remove(id); 346 foreach (AssetRetrievedEx h in handlers)
241 } 347 {
242 handlers.Invoke(a); 348 // Util.FireAndForget(x =>
243 }, m_maxAssetRequestConcurrency, m_Auth); 349 // {
244 350 try { h.Invoke(a); }
245 success = true; 351 catch { }
352 // });
353 }
354
355 if (handlers != null)
356 handlers.Clear();
357
358 });
359
360// if (handlers != null)
361// handlers.Clear();
362 success = true;
363 }
246 } 364 }
247 finally 365 finally
248 { 366 {
249 if (!success) 367 if (!success)
250 { 368 {
369 List<AssetRetrievedEx> handlers;
251 lock (m_AssetHandlers) 370 lock (m_AssetHandlers)
252 { 371 {
372 handlers = m_AssetHandlers[id];
253 m_AssetHandlers.Remove(id); 373 m_AssetHandlers.Remove(id);
254 } 374 }
375 if (handlers != null)
376 handlers.Clear();
255 } 377 }
256 } 378 }
257 } 379 }
380 }
381
382 public bool Get(string id, Object sender, AssetRetrieved handler)
383 {
384 string uri = MapServer(id) + "/assets/" + id;
385
386 AssetBase asset = null;
387 if (m_Cache != null)
388 asset = m_Cache.Get(id);
389
390 if (asset == null || asset.Data == null || asset.Data.Length == 0)
391 {
392 lock (m_AssetHandlers)
393 {
394 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
395
396// AssetRetrievedEx handlers;
397 List<AssetRetrievedEx> handlers;
398 if (m_AssetHandlers.TryGetValue(id, out handlers))
399 {
400 // Someone else is already loading this asset. It will notify our handler when done.
401// handlers += handlerEx;
402 handlers.Add(handlerEx);
403 return true;
404 }
405
406 // Load the asset ourselves
407// handlers += handlerEx;
408 handlers = new List<AssetRetrievedEx>();
409 handlers.Add(handlerEx);
410
411 m_AssetHandlers.Add(id, handlers);
412 }
413
414 QueuedAssetRequest request = new QueuedAssetRequest();
415 request.id = id;
416 request.uri = uri;
417
418 m_requestQueue.Enqueue(request);
419 }
258 else 420 else
259 { 421 {
260 handler(id, sender, asset); 422 handler(id, sender, asset);
@@ -286,43 +448,95 @@ namespace OpenSim.Services.Connectors
286 448
287 public string Store(AssetBase asset) 449 public string Store(AssetBase asset)
288 { 450 {
289 if (asset.Local) 451 // Have to assign the asset ID here. This isn't likely to
452 // trigger since current callers don't pass emtpy IDs
453 // We need the asset ID to route the request to the proper
454 // cluster member, so we can't have the server assign one.
455 if (asset.ID == string.Empty)
290 { 456 {
291 if (m_Cache != null) 457 if (asset.FullID == UUID.Zero)
292 m_Cache.Cache(asset); 458 {
459 asset.FullID = UUID.Random();
460 }
461 asset.ID = asset.FullID.ToString();
462 }
463 else if (asset.FullID == UUID.Zero)
464 {
465 UUID uuid = UUID.Zero;
466 if (UUID.TryParse(asset.ID, out uuid))
467 {
468 asset.FullID = uuid;
469 }
470 else
471 {
472 asset.FullID = UUID.Random();
473 }
474 }
293 475
476 if (m_Cache != null)
477 m_Cache.Cache(asset);
478 if (asset.Temporary || asset.Local)
479 {
294 return asset.ID; 480 return asset.ID;
295 } 481 }
296 482
297 string uri = m_ServerURI + "/assets/"; 483 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
298 484
299 string newID; 485 string newID;
300 try 486 try
301 { 487 {
302 newID = SynchronousRestObjectRequester.MakeRequest<AssetBase, string>("POST", uri, asset, m_Auth); 488 newID = SynchronousRestObjectRequester.
489 MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
490 if (newID == null || newID == "")
491 {
492 newID = UUID.Zero.ToString();
493 }
303 } 494 }
304 catch (Exception e) 495 catch (Exception e)
305 { 496 {
306 m_log.Warn(string.Format("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1} ", asset.ID, e.Message), e); 497 newID = UUID.Zero.ToString();
307 return string.Empty;
308 } 498 }
309 499
310 // TEMPORARY: SRAS returns 'null' when it's asked to store existing assets 500 if (newID == UUID.Zero.ToString())
311 if (newID == null)
312 { 501 {
313 m_log.DebugFormat("[ASSET CONNECTOR]: Storing of asset {0} returned null; assuming the asset already exists", asset.ID); 502 //The asset upload failed, put it in a queue for later
314 return asset.ID; 503 asset.UploadAttempts++;
504 if (asset.UploadAttempts > 30)
505 {
506 //By this stage we've been in the queue for a good few hours;
507 //We're going to drop the asset.
508 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
509 }
510 else
511 {
512 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
513 {
514 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
515 }
516 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
517 m_queue.Add(asset);
518 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
519 m_retryTimer.Start();
520 }
315 } 521 }
522 else
523 {
524 if (asset.UploadAttempts > 0)
525 {
526 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
527 }
528 if (newID != String.Empty)
529 {
530 // Placing this here, so that this work with old asset servers that don't send any reply back
531 // SynchronousRestObjectRequester returns somethins that is not an empty string
532 if (newID != null)
533 asset.ID = newID;
316 534
317 if (string.IsNullOrEmpty(newID)) 535 if (m_Cache != null)
318 return string.Empty; 536 m_Cache.Cache(asset);
319 537 }
320 asset.ID = newID; 538 }
321 539 return asset.ID;
322 if (m_Cache != null)
323 m_Cache.Cache(asset);
324
325 return newID;
326 } 540 }
327 541
328 public bool UpdateContent(string id, byte[] data) 542 public bool UpdateContent(string id, byte[] data)
@@ -343,7 +557,7 @@ namespace OpenSim.Services.Connectors
343 } 557 }
344 asset.Data = data; 558 asset.Data = data;
345 559
346 string uri = m_ServerURI + "/assets/" + id; 560 string uri = MapServer(id) + "/assets/" + id;
347 561
348 if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth)) 562 if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth))
349 { 563 {
@@ -357,7 +571,7 @@ namespace OpenSim.Services.Connectors
357 571
358 public bool Delete(string id) 572 public bool Delete(string id)
359 { 573 {
360 string uri = m_ServerURI + "/assets/" + id; 574 string uri = MapServer(id) + "/assets/" + id;
361 575
362 if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth)) 576 if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth))
363 { 577 {