aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs323
1 files changed, 263 insertions, 60 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index 2b2f11f..8b702e0 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -47,14 +49,22 @@ namespace OpenSim.Services.Connectors
47 49
48 private string m_ServerURI = String.Empty; 50 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 51 private IImprovedAssetCache m_Cache = null;
52 private int m_retryCounter;
53 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
54 private System.Timers.Timer m_retryTimer;
50 private int m_maxAssetRequestConcurrency = 30; 55 private int m_maxAssetRequestConcurrency = 30;
51 56
52 private delegate void AssetRetrievedEx(AssetBase asset); 57 private delegate void AssetRetrievedEx(AssetBase asset);
53 58
54 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 59 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
55 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 60 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
56 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 61// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
57 62
63 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
64
65 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
66
67 private Thread[] m_fetchThreads;
58 68
59 public AssetServicesConnector() 69 public AssetServicesConnector()
60 { 70 {
@@ -86,13 +96,108 @@ namespace OpenSim.Services.Connectors
86 string serviceURI = assetConfig.GetString("AssetServerURI", 96 string serviceURI = assetConfig.GetString("AssetServerURI",
87 String.Empty); 97 String.Empty);
88 98
99 m_ServerURI = serviceURI;
100
89 if (serviceURI == String.Empty) 101 if (serviceURI == String.Empty)
90 { 102 {
91 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 103 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
92 throw new Exception("Asset connector init error"); 104 throw new Exception("Asset connector init error");
93 } 105 }
94 106
95 m_ServerURI = serviceURI; 107
108 m_retryTimer = new System.Timers.Timer();
109 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
110 m_retryTimer.Interval = 60000;
111
112 Uri serverUri = new Uri(m_ServerURI);
113
114 string groupHost = serverUri.Host;
115
116 for (int i = 0 ; i < 256 ; i++)
117 {
118 string prefix = i.ToString("x2");
119 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
120
121 m_UriMap[prefix] = groupHost;
122 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
123 }
124
125 m_fetchThreads = new Thread[2];
126
127 for (int i = 0 ; i < 2 ; i++)
128 {
129 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
130 m_fetchThreads[i].Start();
131 }
132 }
133
134 private string MapServer(string id)
135 {
136 UriBuilder serverUri = new UriBuilder(m_ServerURI);
137
138 string prefix = id.Substring(0, 2).ToLower();
139
140 string host;
141
142 // HG URLs will not be valid UUIDS
143 if (m_UriMap.ContainsKey(prefix))
144 host = m_UriMap[prefix];
145 else
146 host = m_UriMap["00"];
147
148 serverUri.Host = host;
149
150 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
151
152 string ret = serverUri.Uri.AbsoluteUri;
153 if (ret.EndsWith("/"))
154 ret = ret.Substring(0, ret.Length - 1);
155 return ret;
156 }
157
158 protected void retryCheck(object source, ElapsedEventArgs e)
159 {
160 m_retryCounter++;
161 if (m_retryCounter > 60) m_retryCounter -= 60;
162 List<int> keys = new List<int>();
163 foreach (int a in m_retryQueue.Keys)
164 {
165 keys.Add(a);
166 }
167 foreach (int a in keys)
168 {
169 //We exponentially fall back on frequency until we reach one attempt per hour
170 //The net result is that we end up in the queue for roughly 24 hours..
171 //24 hours worth of assets could be a lot, so the hope is that the region admin
172 //will have gotten the asset connector back online quickly!
173
174 int timefactor = a ^ 2;
175 if (timefactor > 60)
176 {
177 timefactor = 60;
178 }
179
180 //First, find out if we care about this timefactor
181 if (timefactor % a == 0)
182 {
183 //Yes, we do!
184 List<AssetBase> retrylist = m_retryQueue[a];
185 m_retryQueue.Remove(a);
186
187 foreach(AssetBase ass in retrylist)
188 {
189 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
190 }
191 }
192 }
193
194 if (m_retryQueue.Count == 0)
195 {
196 //It might only be one tick per minute, but I have
197 //repented and abandoned my wasteful ways
198 m_retryCounter = 0;
199 m_retryTimer.Stop();
200 }
96 } 201 }
97 202
98 protected void SetCache(IImprovedAssetCache cache) 203 protected void SetCache(IImprovedAssetCache cache)
@@ -102,15 +207,13 @@ namespace OpenSim.Services.Connectors
102 207
103 public AssetBase Get(string id) 208 public AssetBase Get(string id)
104 { 209 {
105// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 210 string uri = MapServer(id) + "/assets/" + id;
106
107 string uri = m_ServerURI + "/assets/" + id;
108 211
109 AssetBase asset = null; 212 AssetBase asset = null;
110 if (m_Cache != null) 213 if (m_Cache != null)
111 asset = m_Cache.Get(id); 214 asset = m_Cache.Get(id);
112 215
113 if (asset == null) 216 if (asset == null || asset.Data == null || asset.Data.Length == 0)
114 { 217 {
115 asset = SynchronousRestObjectRequester. 218 asset = SynchronousRestObjectRequester.
116 MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); 219 MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency);
@@ -141,7 +244,7 @@ namespace OpenSim.Services.Connectors
141 return fullAsset.Metadata; 244 return fullAsset.Metadata;
142 } 245 }
143 246
144 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 247 string uri = MapServer(id) + "/assets/" + id + "/metadata";
145 248
146 AssetMetadata asset = SynchronousRestObjectRequester. 249 AssetMetadata asset = SynchronousRestObjectRequester.
147 MakeRequest<int, AssetMetadata>("GET", uri, 0); 250 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -158,7 +261,7 @@ namespace OpenSim.Services.Connectors
158 return fullAsset.Data; 261 return fullAsset.Data;
159 } 262 }
160 263
161 RestClient rc = new RestClient(m_ServerURI); 264 RestClient rc = new RestClient(MapServer(id));
162 rc.AddResourcePath("assets"); 265 rc.AddResourcePath("assets");
163 rc.AddResourcePath(id); 266 rc.AddResourcePath(id);
164 rc.AddResourcePath("data"); 267 rc.AddResourcePath("data");
@@ -181,66 +284,109 @@ namespace OpenSim.Services.Connectors
181 return null; 284 return null;
182 } 285 }
183 286
184 public bool Get(string id, Object sender, AssetRetrieved handler) 287 private class QueuedAssetRequest
185 { 288 {
186// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 289 public string uri;
290 public string id;
291 }
187 292
188 string uri = m_ServerURI + "/assets/" + id; 293 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
294 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
189 295
190 AssetBase asset = null; 296 private void AssetRequestProcessor()
191 if (m_Cache != null) 297 {
192 asset = m_Cache.Get(id); 298 QueuedAssetRequest r;
193 299
194 if (asset == null) 300 while (true)
195 { 301 {
196 lock (m_AssetHandlers) 302 r = m_requestQueue.Dequeue();
197 {
198 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
199
200 AssetRetrievedEx handlers;
201 if (m_AssetHandlers.TryGetValue(id, out handlers))
202 {
203 // Someone else is already loading this asset. It will notify our handler when done.
204 handlers += handlerEx;
205 return true;
206 }
207 303
208 // Load the asset ourselves 304 string uri = r.uri;
209 handlers += handlerEx; 305 string id = r.id;
210 m_AssetHandlers.Add(id, handlers);
211 }
212 306
213 bool success = false; 307 bool success = false;
214 try 308 try
215 { 309 {
216 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 310 AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30);
217 delegate(AssetBase a) 311 if (a != null)
218 { 312 {
219 if (m_Cache != null) 313 if (m_Cache != null)
220 m_Cache.Cache(a); 314 m_Cache.Cache(a);
221 315
222 AssetRetrievedEx handlers; 316 List<AssetRetrievedEx> handlers;
223 lock (m_AssetHandlers) 317 lock (m_AssetHandlers)
318 {
319 handlers = m_AssetHandlers[id];
320 m_AssetHandlers.Remove(id);
321 }
322 foreach (AssetRetrievedEx h in handlers)
323 {
324 Util.FireAndForget(x =>
224 { 325 {
225 handlers = m_AssetHandlers[id]; 326 h.Invoke(a);
226 m_AssetHandlers.Remove(id); 327 });
227 } 328 }
228 handlers.Invoke(a); 329 if (handlers != null)
229 }, m_maxAssetRequestConcurrency); 330 handlers.Clear();
230 331
231 success = true; 332 success = true;
333 }
232 } 334 }
233 finally 335 finally
234 { 336 {
235 if (!success) 337 if (!success)
236 { 338 {
339 List<AssetRetrievedEx> handlers;
237 lock (m_AssetHandlers) 340 lock (m_AssetHandlers)
238 { 341 {
342 handlers = m_AssetHandlers[id];
239 m_AssetHandlers.Remove(id); 343 m_AssetHandlers.Remove(id);
240 } 344 }
345 if (handlers != null)
346 handlers.Clear();
241 } 347 }
242 } 348 }
243 } 349 }
350 }
351
352 public bool Get(string id, Object sender, AssetRetrieved handler)
353 {
354 string uri = MapServer(id) + "/assets/" + id;
355
356 AssetBase asset = null;
357 if (m_Cache != null)
358 asset = m_Cache.Get(id);
359
360 if (asset == null || asset.Data == null || asset.Data.Length == 0)
361 {
362 lock (m_AssetHandlers)
363 {
364 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
365
366// AssetRetrievedEx handlers;
367 List<AssetRetrievedEx> handlers;
368 if (m_AssetHandlers.TryGetValue(id, out handlers))
369 {
370 // Someone else is already loading this asset. It will notify our handler when done.
371// handlers += handlerEx;
372 handlers.Add(handlerEx);
373 return true;
374 }
375
376 // Load the asset ourselves
377// handlers += handlerEx;
378 handlers = new List<AssetRetrievedEx>();
379 handlers.Add(handlerEx);
380
381 m_AssetHandlers.Add(id, handlers);
382 }
383
384 QueuedAssetRequest request = new QueuedAssetRequest();
385 request.id = id;
386 request.uri = uri;
387
388 m_requestQueue.Enqueue(request);
389 }
244 else 390 else
245 { 391 {
246 handler(id, sender, asset); 392 handler(id, sender, asset);
@@ -251,38 +397,95 @@ namespace OpenSim.Services.Connectors
251 397
252 public string Store(AssetBase asset) 398 public string Store(AssetBase asset)
253 { 399 {
254 if (asset.Temporary || asset.Local) 400 // Have to assign the asset ID here. This isn't likely to
401 // trigger since current callers don't pass emtpy IDs
402 // We need the asset ID to route the request to the proper
403 // cluster member, so we can't have the server assign one.
404 if (asset.ID == string.Empty)
255 { 405 {
256 if (m_Cache != null) 406 if (asset.FullID == UUID.Zero)
257 m_Cache.Cache(asset); 407 {
408 asset.FullID = UUID.Random();
409 }
410 asset.ID = asset.FullID.ToString();
411 }
412 else if (asset.FullID == UUID.Zero)
413 {
414 UUID uuid = UUID.Zero;
415 if (UUID.TryParse(asset.ID, out uuid))
416 {
417 asset.FullID = uuid;
418 }
419 else
420 {
421 asset.FullID = UUID.Random();
422 }
423 }
258 424
425 if (m_Cache != null)
426 m_Cache.Cache(asset);
427 if (asset.Temporary || asset.Local)
428 {
259 return asset.ID; 429 return asset.ID;
260 } 430 }
261 431
262 string uri = m_ServerURI + "/assets/"; 432 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
263 433
264 string newID = string.Empty; 434 string newID = string.Empty;
265 try 435 try
266 { 436 {
267 newID = SynchronousRestObjectRequester. 437 newID = SynchronousRestObjectRequester.
268 MakeRequest<AssetBase, string>("POST", uri, asset); 438 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
439 if (newID == null || newID == "")
440 {
441 newID = UUID.Zero.ToString();
442 }
269 } 443 }
270 catch (Exception e) 444 catch (Exception e)
271 { 445 {
272 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 446 newID = UUID.Zero.ToString();
273 } 447 }
274 448
275 if (newID != String.Empty) 449 if (newID == UUID.Zero.ToString())
450 {
451 //The asset upload failed, put it in a queue for later
452 asset.UploadAttempts++;
453 if (asset.UploadAttempts > 30)
454 {
455 //By this stage we've been in the queue for a good few hours;
456 //We're going to drop the asset.
457 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
458 }
459 else
460 {
461 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
462 {
463 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
464 }
465 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
466 m_queue.Add(asset);
467 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
468 m_retryTimer.Start();
469 }
470 }
471 else
276 { 472 {
277 // Placing this here, so that this work with old asset servers that don't send any reply back 473 if (asset.UploadAttempts > 0)
278 // SynchronousRestObjectRequester returns somethins that is not an empty string 474 {
279 if (newID != null) 475 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
280 asset.ID = newID; 476 }
477 if (newID != String.Empty)
478 {
479 // Placing this here, so that this work with old asset servers that don't send any reply back
480 // SynchronousRestObjectRequester returns somethins that is not an empty string
481 if (newID != null)
482 asset.ID = newID;
281 483
282 if (m_Cache != null) 484 if (m_Cache != null)
283 m_Cache.Cache(asset); 485 m_Cache.Cache(asset);
486 }
284 } 487 }
285 return newID; 488 return asset.ID;
286 } 489 }
287 490
288 public bool UpdateContent(string id, byte[] data) 491 public bool UpdateContent(string id, byte[] data)
@@ -303,7 +506,7 @@ namespace OpenSim.Services.Connectors
303 } 506 }
304 asset.Data = data; 507 asset.Data = data;
305 508
306 string uri = m_ServerURI + "/assets/" + id; 509 string uri = MapServer(id) + "/assets/" + id;
307 510
308 if (SynchronousRestObjectRequester. 511 if (SynchronousRestObjectRequester.
309 MakeRequest<AssetBase, bool>("POST", uri, asset)) 512 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -318,7 +521,7 @@ namespace OpenSim.Services.Connectors
318 521
319 public bool Delete(string id) 522 public bool Delete(string id)
320 { 523 {
321 string uri = m_ServerURI + "/assets/" + id; 524 string uri = MapServer(id) + "/assets/" + id;
322 525
323 if (SynchronousRestObjectRequester. 526 if (SynchronousRestObjectRequester.
324 MakeRequest<int, bool>("DELETE", uri, 0)) 527 MakeRequest<int, bool>("DELETE", uri, 0))