aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs347
1 files changed, 306 insertions, 41 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index badacb7..795ca2e 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -47,13 +49,22 @@ namespace OpenSim.Services.Connectors
47 49
48 private string m_ServerURI = String.Empty; 50 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 51 private IImprovedAssetCache m_Cache = null;
52 private int m_retryCounter;
53 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
54 private System.Timers.Timer m_retryTimer;
50 private int m_maxAssetRequestConcurrency = 30; 55 private int m_maxAssetRequestConcurrency = 30;
51 56
52 private delegate void AssetRetrievedEx(AssetBase asset); 57 private delegate void AssetRetrievedEx(AssetBase asset);
53 58
54 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 59 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
55 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 60 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
56 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 61// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
62
63 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
64
65 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
66
67 private Thread[] m_fetchThreads;
57 68
58 public int MaxAssetRequestConcurrency 69 public int MaxAssetRequestConcurrency
59 { 70 {
@@ -92,13 +103,108 @@ namespace OpenSim.Services.Connectors
92 string serviceURI = assetConfig.GetString("AssetServerURI", 103 string serviceURI = assetConfig.GetString("AssetServerURI",
93 String.Empty); 104 String.Empty);
94 105
106 m_ServerURI = serviceURI;
107
95 if (serviceURI == String.Empty) 108 if (serviceURI == String.Empty)
96 { 109 {
97 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 110 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
98 throw new Exception("Asset connector init error"); 111 throw new Exception("Asset connector init error");
99 } 112 }
100 113
101 m_ServerURI = serviceURI; 114
115 m_retryTimer = new System.Timers.Timer();
116 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
117 m_retryTimer.Interval = 60000;
118
119 Uri serverUri = new Uri(m_ServerURI);
120
121 string groupHost = serverUri.Host;
122
123 for (int i = 0 ; i < 256 ; i++)
124 {
125 string prefix = i.ToString("x2");
126 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
127
128 m_UriMap[prefix] = groupHost;
129 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
130 }
131
132 m_fetchThreads = new Thread[2];
133
134 for (int i = 0 ; i < 2 ; i++)
135 {
136 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
137 m_fetchThreads[i].Start();
138 }
139 }
140
141 private string MapServer(string id)
142 {
143 UriBuilder serverUri = new UriBuilder(m_ServerURI);
144
145 string prefix = id.Substring(0, 2).ToLower();
146
147 string host;
148
149 // HG URLs will not be valid UUIDS
150 if (m_UriMap.ContainsKey(prefix))
151 host = m_UriMap[prefix];
152 else
153 host = m_UriMap["00"];
154
155 serverUri.Host = host;
156
157 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
158
159 string ret = serverUri.Uri.AbsoluteUri;
160 if (ret.EndsWith("/"))
161 ret = ret.Substring(0, ret.Length - 1);
162 return ret;
163 }
164
165 protected void retryCheck(object source, ElapsedEventArgs e)
166 {
167 m_retryCounter++;
168 if (m_retryCounter > 60) m_retryCounter -= 60;
169 List<int> keys = new List<int>();
170 foreach (int a in m_retryQueue.Keys)
171 {
172 keys.Add(a);
173 }
174 foreach (int a in keys)
175 {
176 //We exponentially fall back on frequency until we reach one attempt per hour
177 //The net result is that we end up in the queue for roughly 24 hours..
178 //24 hours worth of assets could be a lot, so the hope is that the region admin
179 //will have gotten the asset connector back online quickly!
180
181 int timefactor = a ^ 2;
182 if (timefactor > 60)
183 {
184 timefactor = 60;
185 }
186
187 //First, find out if we care about this timefactor
188 if (timefactor % a == 0)
189 {
190 //Yes, we do!
191 List<AssetBase> retrylist = m_retryQueue[a];
192 m_retryQueue.Remove(a);
193
194 foreach(AssetBase ass in retrylist)
195 {
196 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
197 }
198 }
199 }
200
201 if (m_retryQueue.Count == 0)
202 {
203 //It might only be one tick per minute, but I have
204 //repented and abandoned my wasteful ways
205 m_retryCounter = 0;
206 m_retryTimer.Stop();
207 }
102 } 208 }
103 209
104 protected void SetCache(IImprovedAssetCache cache) 210 protected void SetCache(IImprovedAssetCache cache)
@@ -108,15 +214,13 @@ namespace OpenSim.Services.Connectors
108 214
109 public AssetBase Get(string id) 215 public AssetBase Get(string id)
110 { 216 {
111// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 217 string uri = MapServer(id) + "/assets/" + id;
112
113 string uri = m_ServerURI + "/assets/" + id;
114 218
115 AssetBase asset = null; 219 AssetBase asset = null;
116 if (m_Cache != null) 220 if (m_Cache != null)
117 asset = m_Cache.Get(id); 221 asset = m_Cache.Get(id);
118 222
119 if (asset == null) 223 if (asset == null || asset.Data == null || asset.Data.Length == 0)
120 { 224 {
121 // XXX: Commented out for now since this has either never been properly operational or not for some time 225 // XXX: Commented out for now since this has either never been properly operational or not for some time
122 // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option. 226 // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option.
@@ -155,7 +259,7 @@ namespace OpenSim.Services.Connectors
155 return fullAsset.Metadata; 259 return fullAsset.Metadata;
156 } 260 }
157 261
158 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 262 string uri = MapServer(id) + "/assets/" + id + "/metadata";
159 263
160 AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth); 264 AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth);
161 return asset; 265 return asset;
@@ -171,11 +275,18 @@ namespace OpenSim.Services.Connectors
171 return fullAsset.Data; 275 return fullAsset.Data;
172 } 276 }
173 277
278<<<<<<< HEAD
174 using (RestClient rc = new RestClient(m_ServerURI)) 279 using (RestClient rc = new RestClient(m_ServerURI))
175 { 280 {
176 rc.AddResourcePath("assets"); 281 rc.AddResourcePath("assets");
177 rc.AddResourcePath(id); 282 rc.AddResourcePath(id);
178 rc.AddResourcePath("data"); 283 rc.AddResourcePath("data");
284=======
285 RestClient rc = new RestClient(MapServer(id));
286 rc.AddResourcePath("assets");
287 rc.AddResourcePath(id);
288 rc.AddResourcePath("data");
289>>>>>>> avn/ubitvar
179 290
180 rc.RequestMethod = "GET"; 291 rc.RequestMethod = "GET";
181 292
@@ -189,72 +300,147 @@ namespace OpenSim.Services.Connectors
189 byte[] ret = new byte[s.Length]; 300 byte[] ret = new byte[s.Length];
190 s.Read(ret, 0, (int)s.Length); 301 s.Read(ret, 0, (int)s.Length);
191 302
303<<<<<<< HEAD
192 return ret; 304 return ret;
193 } 305 }
194 306
195 return null; 307 return null;
196 } 308 }
309=======
310 s.Close();
311 return ret;
312 }
313
314 s.Close();
315 return null;
316>>>>>>> avn/ubitvar
197 } 317 }
198 318
199 public bool Get(string id, Object sender, AssetRetrieved handler) 319 private class QueuedAssetRequest
200 { 320 {
201// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 321 public string uri;
322 public string id;
323 }
202 324
203 string uri = m_ServerURI + "/assets/" + id; 325 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
326 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
204 327
205 AssetBase asset = null; 328 private void AssetRequestProcessor()
206 if (m_Cache != null) 329 {
207 asset = m_Cache.Get(id); 330 QueuedAssetRequest r;
208 331
209 if (asset == null) 332 while (true)
210 { 333 {
211 lock (m_AssetHandlers) 334 r = m_requestQueue.Dequeue();
212 {
213 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
214 335
215 AssetRetrievedEx handlers; 336 string uri = r.uri;
216 if (m_AssetHandlers.TryGetValue(id, out handlers)) 337 string id = r.id;
217 {
218 // Someone else is already loading this asset. It will notify our handler when done.
219 handlers += handlerEx;
220 return true;
221 }
222
223 // Load the asset ourselves
224 handlers += handlerEx;
225 m_AssetHandlers.Add(id, handlers);
226 }
227 338
228 bool success = false; 339 bool success = false;
229 try 340 try
230 { 341 {
231 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 342 AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30);
232 delegate(AssetBase a) 343 if (a != null)
344 {
345 if (m_Cache != null)
346 m_Cache.Cache(a);
347
348 List<AssetRetrievedEx> handlers;
349 lock (m_AssetHandlers)
233 { 350 {
351<<<<<<< HEAD
234 if (a != null && m_Cache != null) 352 if (a != null && m_Cache != null)
235 m_Cache.Cache(a); 353 m_Cache.Cache(a);
354=======
355 handlers = m_AssetHandlers[id];
356 m_AssetHandlers.Remove(id);
357 }
358>>>>>>> avn/ubitvar
236 359
237 AssetRetrievedEx handlers; 360 Util.FireAndForget(x =>
238 lock (m_AssetHandlers)
239 { 361 {
362<<<<<<< HEAD
240 handlers = m_AssetHandlers[id]; 363 handlers = m_AssetHandlers[id];
241 m_AssetHandlers.Remove(id); 364 m_AssetHandlers.Remove(id);
242 } 365 }
243 handlers.Invoke(a); 366 handlers.Invoke(a);
244 }, m_maxAssetRequestConcurrency, m_Auth); 367 }, m_maxAssetRequestConcurrency, m_Auth);
368=======
369 foreach (AssetRetrievedEx h in handlers)
370 {
371 // Util.FireAndForget(x =>
372 // {
373 try { h.Invoke(a); }
374 catch { }
375 // });
376 }
377
378 if (handlers != null)
379 handlers.Clear();
380
381 });
382
383// if (handlers != null)
384// handlers.Clear();
385>>>>>>> avn/ubitvar
245 386
246 success = true; 387 success = true;
388 }
247 } 389 }
248 finally 390 finally
249 { 391 {
250 if (!success) 392 if (!success)
251 { 393 {
394 List<AssetRetrievedEx> handlers;
252 lock (m_AssetHandlers) 395 lock (m_AssetHandlers)
253 { 396 {
397 handlers = m_AssetHandlers[id];
254 m_AssetHandlers.Remove(id); 398 m_AssetHandlers.Remove(id);
255 } 399 }
400 if (handlers != null)
401 handlers.Clear();
402 }
403 }
404 }
405 }
406
407 public bool Get(string id, Object sender, AssetRetrieved handler)
408 {
409 string uri = MapServer(id) + "/assets/" + id;
410
411 AssetBase asset = null;
412 if (m_Cache != null)
413 asset = m_Cache.Get(id);
414
415 if (asset == null || asset.Data == null || asset.Data.Length == 0)
416 {
417 lock (m_AssetHandlers)
418 {
419 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
420
421// AssetRetrievedEx handlers;
422 List<AssetRetrievedEx> handlers;
423 if (m_AssetHandlers.TryGetValue(id, out handlers))
424 {
425 // Someone else is already loading this asset. It will notify our handler when done.
426// handlers += handlerEx;
427 handlers.Add(handlerEx);
428 return true;
256 } 429 }
430
431 // Load the asset ourselves
432// handlers += handlerEx;
433 handlers = new List<AssetRetrievedEx>();
434 handlers.Add(handlerEx);
435
436 m_AssetHandlers.Add(id, handlers);
257 } 437 }
438
439 QueuedAssetRequest request = new QueuedAssetRequest();
440 request.id = id;
441 request.uri = uri;
442
443 m_requestQueue.Enqueue(request);
258 } 444 }
259 else 445 else
260 { 446 {
@@ -287,19 +473,44 @@ namespace OpenSim.Services.Connectors
287 473
288 public string Store(AssetBase asset) 474 public string Store(AssetBase asset)
289 { 475 {
290 if (asset.Local) 476 // Have to assign the asset ID here. This isn't likely to
477 // trigger since current callers don't pass emtpy IDs
478 // We need the asset ID to route the request to the proper
479 // cluster member, so we can't have the server assign one.
480 if (asset.ID == string.Empty)
291 { 481 {
292 if (m_Cache != null) 482 if (asset.FullID == UUID.Zero)
293 m_Cache.Cache(asset); 483 {
484 asset.FullID = UUID.Random();
485 }
486 asset.ID = asset.FullID.ToString();
487 }
488 else if (asset.FullID == UUID.Zero)
489 {
490 UUID uuid = UUID.Zero;
491 if (UUID.TryParse(asset.ID, out uuid))
492 {
493 asset.FullID = uuid;
494 }
495 else
496 {
497 asset.FullID = UUID.Random();
498 }
499 }
294 500
501 if (m_Cache != null)
502 m_Cache.Cache(asset);
503 if (asset.Temporary || asset.Local)
504 {
295 return asset.ID; 505 return asset.ID;
296 } 506 }
297 507
298 string uri = m_ServerURI + "/assets/"; 508 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
299 509
300 string newID; 510 string newID;
301 try 511 try
302 { 512 {
513<<<<<<< HEAD
303 newID = SynchronousRestObjectRequester.MakeRequest<AssetBase, string>("POST", uri, asset, m_Auth); 514 newID = SynchronousRestObjectRequester.MakeRequest<AssetBase, string>("POST", uri, asset, m_Auth);
304 } 515 }
305 catch (Exception e) 516 catch (Exception e)
@@ -324,6 +535,60 @@ namespace OpenSim.Services.Connectors
324 m_Cache.Cache(asset); 535 m_Cache.Cache(asset);
325 536
326 return newID; 537 return newID;
538=======
539 newID = SynchronousRestObjectRequester.
540 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
541 if (newID == null || newID == "")
542 {
543 newID = UUID.Zero.ToString();
544 }
545 }
546 catch (Exception e)
547 {
548 newID = UUID.Zero.ToString();
549 }
550
551 if (newID == UUID.Zero.ToString())
552 {
553 //The asset upload failed, put it in a queue for later
554 asset.UploadAttempts++;
555 if (asset.UploadAttempts > 30)
556 {
557 //By this stage we've been in the queue for a good few hours;
558 //We're going to drop the asset.
559 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
560 }
561 else
562 {
563 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
564 {
565 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
566 }
567 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
568 m_queue.Add(asset);
569 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
570 m_retryTimer.Start();
571 }
572 }
573 else
574 {
575 if (asset.UploadAttempts > 0)
576 {
577 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
578 }
579 if (newID != String.Empty)
580 {
581 // Placing this here, so that this work with old asset servers that don't send any reply back
582 // SynchronousRestObjectRequester returns somethins that is not an empty string
583 if (newID != null)
584 asset.ID = newID;
585
586 if (m_Cache != null)
587 m_Cache.Cache(asset);
588 }
589 }
590 return asset.ID;
591>>>>>>> avn/ubitvar
327 } 592 }
328 593
329 public bool UpdateContent(string id, byte[] data) 594 public bool UpdateContent(string id, byte[] data)
@@ -344,7 +609,7 @@ namespace OpenSim.Services.Connectors
344 } 609 }
345 asset.Data = data; 610 asset.Data = data;
346 611
347 string uri = m_ServerURI + "/assets/" + id; 612 string uri = MapServer(id) + "/assets/" + id;
348 613
349 if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth)) 614 if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth))
350 { 615 {
@@ -358,7 +623,7 @@ namespace OpenSim.Services.Connectors
358 623
359 public bool Delete(string id) 624 public bool Delete(string id)
360 { 625 {
361 string uri = m_ServerURI + "/assets/" + id; 626 string uri = MapServer(id) + "/assets/" + id;
362 627
363 if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth)) 628 if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth))
364 { 629 {