aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs324
1 files changed, 264 insertions, 60 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index 8b04d7f..bf0cc35 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -47,13 +49,22 @@ namespace OpenSim.Services.Connectors
47 49
48 private string m_ServerURI = String.Empty; 50 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 51 private IImprovedAssetCache m_Cache = null;
52 private int m_retryCounter;
53 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
54 private System.Timers.Timer m_retryTimer;
50 private int m_maxAssetRequestConcurrency = 30; 55 private int m_maxAssetRequestConcurrency = 30;
51 56
52 private delegate void AssetRetrievedEx(AssetBase asset); 57 private delegate void AssetRetrievedEx(AssetBase asset);
53 58
54 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 59 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
55 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 60 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
56 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 61// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
62
63 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
64
65 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
66
67 private Thread[] m_fetchThreads;
57 68
58 public int MaxAssetRequestConcurrency 69 public int MaxAssetRequestConcurrency
59 { 70 {
@@ -91,13 +102,108 @@ namespace OpenSim.Services.Connectors
91 string serviceURI = assetConfig.GetString("AssetServerURI", 102 string serviceURI = assetConfig.GetString("AssetServerURI",
92 String.Empty); 103 String.Empty);
93 104
105 m_ServerURI = serviceURI;
106
94 if (serviceURI == String.Empty) 107 if (serviceURI == String.Empty)
95 { 108 {
96 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 109 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
97 throw new Exception("Asset connector init error"); 110 throw new Exception("Asset connector init error");
98 } 111 }
99 112
100 m_ServerURI = serviceURI; 113
114 m_retryTimer = new System.Timers.Timer();
115 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
116 m_retryTimer.Interval = 60000;
117
118 Uri serverUri = new Uri(m_ServerURI);
119
120 string groupHost = serverUri.Host;
121
122 for (int i = 0 ; i < 256 ; i++)
123 {
124 string prefix = i.ToString("x2");
125 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
126
127 m_UriMap[prefix] = groupHost;
128 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
129 }
130
131 m_fetchThreads = new Thread[2];
132
133 for (int i = 0 ; i < 2 ; i++)
134 {
135 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
136 m_fetchThreads[i].Start();
137 }
138 }
139
140 private string MapServer(string id)
141 {
142 UriBuilder serverUri = new UriBuilder(m_ServerURI);
143
144 string prefix = id.Substring(0, 2).ToLower();
145
146 string host;
147
148 // HG URLs will not be valid UUIDS
149 if (m_UriMap.ContainsKey(prefix))
150 host = m_UriMap[prefix];
151 else
152 host = m_UriMap["00"];
153
154 serverUri.Host = host;
155
156 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
157
158 string ret = serverUri.Uri.AbsoluteUri;
159 if (ret.EndsWith("/"))
160 ret = ret.Substring(0, ret.Length - 1);
161 return ret;
162 }
163
164 protected void retryCheck(object source, ElapsedEventArgs e)
165 {
166 m_retryCounter++;
167 if (m_retryCounter > 60) m_retryCounter -= 60;
168 List<int> keys = new List<int>();
169 foreach (int a in m_retryQueue.Keys)
170 {
171 keys.Add(a);
172 }
173 foreach (int a in keys)
174 {
175 //We exponentially fall back on frequency until we reach one attempt per hour
176 //The net result is that we end up in the queue for roughly 24 hours..
177 //24 hours worth of assets could be a lot, so the hope is that the region admin
178 //will have gotten the asset connector back online quickly!
179
180 int timefactor = a ^ 2;
181 if (timefactor > 60)
182 {
183 timefactor = 60;
184 }
185
186 //First, find out if we care about this timefactor
187 if (timefactor % a == 0)
188 {
189 //Yes, we do!
190 List<AssetBase> retrylist = m_retryQueue[a];
191 m_retryQueue.Remove(a);
192
193 foreach(AssetBase ass in retrylist)
194 {
195 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
196 }
197 }
198 }
199
200 if (m_retryQueue.Count == 0)
201 {
202 //It might only be one tick per minute, but I have
203 //repented and abandoned my wasteful ways
204 m_retryCounter = 0;
205 m_retryTimer.Stop();
206 }
101 } 207 }
102 208
103 protected void SetCache(IImprovedAssetCache cache) 209 protected void SetCache(IImprovedAssetCache cache)
@@ -107,15 +213,13 @@ namespace OpenSim.Services.Connectors
107 213
108 public AssetBase Get(string id) 214 public AssetBase Get(string id)
109 { 215 {
110// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 216 string uri = MapServer(id) + "/assets/" + id;
111
112 string uri = m_ServerURI + "/assets/" + id;
113 217
114 AssetBase asset = null; 218 AssetBase asset = null;
115 if (m_Cache != null) 219 if (m_Cache != null)
116 asset = m_Cache.Get(id); 220 asset = m_Cache.Get(id);
117 221
118 if (asset == null) 222 if (asset == null || asset.Data == null || asset.Data.Length == 0)
119 { 223 {
120 asset = SynchronousRestObjectRequester. 224 asset = SynchronousRestObjectRequester.
121 MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); 225 MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency);
@@ -146,7 +250,7 @@ namespace OpenSim.Services.Connectors
146 return fullAsset.Metadata; 250 return fullAsset.Metadata;
147 } 251 }
148 252
149 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 253 string uri = MapServer(id) + "/assets/" + id + "/metadata";
150 254
151 AssetMetadata asset = SynchronousRestObjectRequester. 255 AssetMetadata asset = SynchronousRestObjectRequester.
152 MakeRequest<int, AssetMetadata>("GET", uri, 0); 256 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -163,7 +267,7 @@ namespace OpenSim.Services.Connectors
163 return fullAsset.Data; 267 return fullAsset.Data;
164 } 268 }
165 269
166 RestClient rc = new RestClient(m_ServerURI); 270 RestClient rc = new RestClient(MapServer(id));
167 rc.AddResourcePath("assets"); 271 rc.AddResourcePath("assets");
168 rc.AddResourcePath(id); 272 rc.AddResourcePath(id);
169 rc.AddResourcePath("data"); 273 rc.AddResourcePath("data");
@@ -186,66 +290,109 @@ namespace OpenSim.Services.Connectors
186 return null; 290 return null;
187 } 291 }
188 292
189 public bool Get(string id, Object sender, AssetRetrieved handler) 293 private class QueuedAssetRequest
190 { 294 {
191// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 295 public string uri;
296 public string id;
297 }
192 298
193 string uri = m_ServerURI + "/assets/" + id; 299 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
300 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
194 301
195 AssetBase asset = null; 302 private void AssetRequestProcessor()
196 if (m_Cache != null) 303 {
197 asset = m_Cache.Get(id); 304 QueuedAssetRequest r;
198 305
199 if (asset == null) 306 while (true)
200 { 307 {
201 lock (m_AssetHandlers) 308 r = m_requestQueue.Dequeue();
202 {
203 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
204
205 AssetRetrievedEx handlers;
206 if (m_AssetHandlers.TryGetValue(id, out handlers))
207 {
208 // Someone else is already loading this asset. It will notify our handler when done.
209 handlers += handlerEx;
210 return true;
211 }
212 309
213 // Load the asset ourselves 310 string uri = r.uri;
214 handlers += handlerEx; 311 string id = r.id;
215 m_AssetHandlers.Add(id, handlers);
216 }
217 312
218 bool success = false; 313 bool success = false;
219 try 314 try
220 { 315 {
221 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 316 AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30);
222 delegate(AssetBase a) 317 if (a != null)
223 { 318 {
224 if (m_Cache != null) 319 if (m_Cache != null)
225 m_Cache.Cache(a); 320 m_Cache.Cache(a);
226 321
227 AssetRetrievedEx handlers; 322 List<AssetRetrievedEx> handlers;
228 lock (m_AssetHandlers) 323 lock (m_AssetHandlers)
324 {
325 handlers = m_AssetHandlers[id];
326 m_AssetHandlers.Remove(id);
327 }
328 foreach (AssetRetrievedEx h in handlers)
329 {
330 Util.FireAndForget(x =>
229 { 331 {
230 handlers = m_AssetHandlers[id]; 332 h.Invoke(a);
231 m_AssetHandlers.Remove(id); 333 });
232 } 334 }
233 handlers.Invoke(a); 335 if (handlers != null)
234 }, m_maxAssetRequestConcurrency); 336 handlers.Clear();
235 337
236 success = true; 338 success = true;
339 }
237 } 340 }
238 finally 341 finally
239 { 342 {
240 if (!success) 343 if (!success)
241 { 344 {
345 List<AssetRetrievedEx> handlers;
242 lock (m_AssetHandlers) 346 lock (m_AssetHandlers)
243 { 347 {
348 handlers = m_AssetHandlers[id];
244 m_AssetHandlers.Remove(id); 349 m_AssetHandlers.Remove(id);
245 } 350 }
351 if (handlers != null)
352 handlers.Clear();
246 } 353 }
247 } 354 }
248 } 355 }
356 }
357
358 public bool Get(string id, Object sender, AssetRetrieved handler)
359 {
360 string uri = MapServer(id) + "/assets/" + id;
361
362 AssetBase asset = null;
363 if (m_Cache != null)
364 asset = m_Cache.Get(id);
365
366 if (asset == null || asset.Data == null || asset.Data.Length == 0)
367 {
368 lock (m_AssetHandlers)
369 {
370 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
371
372// AssetRetrievedEx handlers;
373 List<AssetRetrievedEx> handlers;
374 if (m_AssetHandlers.TryGetValue(id, out handlers))
375 {
376 // Someone else is already loading this asset. It will notify our handler when done.
377// handlers += handlerEx;
378 handlers.Add(handlerEx);
379 return true;
380 }
381
382 // Load the asset ourselves
383// handlers += handlerEx;
384 handlers = new List<AssetRetrievedEx>();
385 handlers.Add(handlerEx);
386
387 m_AssetHandlers.Add(id, handlers);
388 }
389
390 QueuedAssetRequest request = new QueuedAssetRequest();
391 request.id = id;
392 request.uri = uri;
393
394 m_requestQueue.Enqueue(request);
395 }
249 else 396 else
250 { 397 {
251 handler(id, sender, asset); 398 handler(id, sender, asset);
@@ -256,38 +403,95 @@ namespace OpenSim.Services.Connectors
256 403
257 public string Store(AssetBase asset) 404 public string Store(AssetBase asset)
258 { 405 {
259 if (asset.Local) 406 // Have to assign the asset ID here. This isn't likely to
407 // trigger since current callers don't pass emtpy IDs
408 // We need the asset ID to route the request to the proper
409 // cluster member, so we can't have the server assign one.
410 if (asset.ID == string.Empty)
260 { 411 {
261 if (m_Cache != null) 412 if (asset.FullID == UUID.Zero)
262 m_Cache.Cache(asset); 413 {
414 asset.FullID = UUID.Random();
415 }
416 asset.ID = asset.FullID.ToString();
417 }
418 else if (asset.FullID == UUID.Zero)
419 {
420 UUID uuid = UUID.Zero;
421 if (UUID.TryParse(asset.ID, out uuid))
422 {
423 asset.FullID = uuid;
424 }
425 else
426 {
427 asset.FullID = UUID.Random();
428 }
429 }
263 430
431 if (m_Cache != null)
432 m_Cache.Cache(asset);
433 if (asset.Temporary || asset.Local)
434 {
264 return asset.ID; 435 return asset.ID;
265 } 436 }
266 437
267 string uri = m_ServerURI + "/assets/"; 438 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
268 439
269 string newID = string.Empty; 440 string newID = string.Empty;
270 try 441 try
271 { 442 {
272 newID = SynchronousRestObjectRequester. 443 newID = SynchronousRestObjectRequester.
273 MakeRequest<AssetBase, string>("POST", uri, asset); 444 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
445 if (newID == null || newID == "")
446 {
447 newID = UUID.Zero.ToString();
448 }
274 } 449 }
275 catch (Exception e) 450 catch (Exception e)
276 { 451 {
277 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 452 newID = UUID.Zero.ToString();
278 } 453 }
279 454
280 if (newID != String.Empty) 455 if (newID == UUID.Zero.ToString())
456 {
457 //The asset upload failed, put it in a queue for later
458 asset.UploadAttempts++;
459 if (asset.UploadAttempts > 30)
460 {
461 //By this stage we've been in the queue for a good few hours;
462 //We're going to drop the asset.
463 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
464 }
465 else
466 {
467 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
468 {
469 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
470 }
471 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
472 m_queue.Add(asset);
473 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
474 m_retryTimer.Start();
475 }
476 }
477 else
281 { 478 {
282 // Placing this here, so that this work with old asset servers that don't send any reply back 479 if (asset.UploadAttempts > 0)
283 // SynchronousRestObjectRequester returns somethins that is not an empty string 480 {
284 if (newID != null) 481 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
285 asset.ID = newID; 482 }
483 if (newID != String.Empty)
484 {
485 // Placing this here, so that this work with old asset servers that don't send any reply back
486 // SynchronousRestObjectRequester returns somethins that is not an empty string
487 if (newID != null)
488 asset.ID = newID;
286 489
287 if (m_Cache != null) 490 if (m_Cache != null)
288 m_Cache.Cache(asset); 491 m_Cache.Cache(asset);
492 }
289 } 493 }
290 return newID; 494 return asset.ID;
291 } 495 }
292 496
293 public bool UpdateContent(string id, byte[] data) 497 public bool UpdateContent(string id, byte[] data)
@@ -308,7 +512,7 @@ namespace OpenSim.Services.Connectors
308 } 512 }
309 asset.Data = data; 513 asset.Data = data;
310 514
311 string uri = m_ServerURI + "/assets/" + id; 515 string uri = MapServer(id) + "/assets/" + id;
312 516
313 if (SynchronousRestObjectRequester. 517 if (SynchronousRestObjectRequester.
314 MakeRequest<AssetBase, bool>("POST", uri, asset)) 518 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -323,7 +527,7 @@ namespace OpenSim.Services.Connectors
323 527
324 public bool Delete(string id) 528 public bool Delete(string id)
325 { 529 {
326 string uri = m_ServerURI + "/assets/" + id; 530 string uri = MapServer(id) + "/assets/" + id;
327 531
328 if (SynchronousRestObjectRequester. 532 if (SynchronousRestObjectRequester.
329 MakeRequest<int, bool>("DELETE", uri, 0)) 533 MakeRequest<int, bool>("DELETE", uri, 0))