aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs318
1 files changed, 258 insertions, 60 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index 8b04d7f..3e06cf0 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -47,13 +49,22 @@ namespace OpenSim.Services.Connectors
47 49
48 private string m_ServerURI = String.Empty; 50 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 51 private IImprovedAssetCache m_Cache = null;
52 private int m_retryCounter;
53 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
54 private System.Timers.Timer m_retryTimer;
50 private int m_maxAssetRequestConcurrency = 30; 55 private int m_maxAssetRequestConcurrency = 30;
51 56
52 private delegate void AssetRetrievedEx(AssetBase asset); 57 private delegate void AssetRetrievedEx(AssetBase asset);
53 58
54 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 59 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
55 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 60 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
56 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 61// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
62
63 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
64
65 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
66
67 private Thread[] m_fetchThreads;
57 68
58 public int MaxAssetRequestConcurrency 69 public int MaxAssetRequestConcurrency
59 { 70 {
@@ -91,13 +102,102 @@ namespace OpenSim.Services.Connectors
91 string serviceURI = assetConfig.GetString("AssetServerURI", 102 string serviceURI = assetConfig.GetString("AssetServerURI",
92 String.Empty); 103 String.Empty);
93 104
105 m_ServerURI = serviceURI;
106
94 if (serviceURI == String.Empty) 107 if (serviceURI == String.Empty)
95 { 108 {
96 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 109 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
97 throw new Exception("Asset connector init error"); 110 throw new Exception("Asset connector init error");
98 } 111 }
99 112
100 m_ServerURI = serviceURI; 113
114 m_retryTimer = new System.Timers.Timer();
115 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
116 m_retryTimer.Interval = 60000;
117
118 Uri serverUri = new Uri(m_ServerURI);
119
120 string groupHost = serverUri.Host;
121
122 for (int i = 0 ; i < 256 ; i++)
123 {
124 string prefix = i.ToString("x2");
125 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
126
127 m_UriMap[prefix] = groupHost;
128 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
129 }
130
131 m_fetchThreads = new Thread[2];
132
133 for (int i = 0 ; i < 2 ; i++)
134 {
135 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
136 m_fetchThreads[i].Start();
137 }
138 }
139
140 private string MapServer(string id)
141 {
142 UriBuilder serverUri = new UriBuilder(m_ServerURI);
143
144 string prefix = id.Substring(0, 2).ToLower();
145
146 string host = m_UriMap[prefix];
147
148 serverUri.Host = host;
149
150 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
151
152 string ret = serverUri.Uri.AbsoluteUri;
153 if (ret.EndsWith("/"))
154 ret = ret.Substring(0, ret.Length - 1);
155 return ret;
156 }
157
158 protected void retryCheck(object source, ElapsedEventArgs e)
159 {
160 m_retryCounter++;
161 if (m_retryCounter > 60) m_retryCounter -= 60;
162 List<int> keys = new List<int>();
163 foreach (int a in m_retryQueue.Keys)
164 {
165 keys.Add(a);
166 }
167 foreach (int a in keys)
168 {
169 //We exponentially fall back on frequency until we reach one attempt per hour
170 //The net result is that we end up in the queue for roughly 24 hours..
171 //24 hours worth of assets could be a lot, so the hope is that the region admin
172 //will have gotten the asset connector back online quickly!
173
174 int timefactor = a ^ 2;
175 if (timefactor > 60)
176 {
177 timefactor = 60;
178 }
179
180 //First, find out if we care about this timefactor
181 if (timefactor % a == 0)
182 {
183 //Yes, we do!
184 List<AssetBase> retrylist = m_retryQueue[a];
185 m_retryQueue.Remove(a);
186
187 foreach(AssetBase ass in retrylist)
188 {
189 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
190 }
191 }
192 }
193
194 if (m_retryQueue.Count == 0)
195 {
196 //It might only be one tick per minute, but I have
197 //repented and abandoned my wasteful ways
198 m_retryCounter = 0;
199 m_retryTimer.Stop();
200 }
101 } 201 }
102 202
103 protected void SetCache(IImprovedAssetCache cache) 203 protected void SetCache(IImprovedAssetCache cache)
@@ -107,15 +207,13 @@ namespace OpenSim.Services.Connectors
107 207
108 public AssetBase Get(string id) 208 public AssetBase Get(string id)
109 { 209 {
110// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 210 string uri = MapServer(id) + "/assets/" + id;
111
112 string uri = m_ServerURI + "/assets/" + id;
113 211
114 AssetBase asset = null; 212 AssetBase asset = null;
115 if (m_Cache != null) 213 if (m_Cache != null)
116 asset = m_Cache.Get(id); 214 asset = m_Cache.Get(id);
117 215
118 if (asset == null) 216 if (asset == null || asset.Data == null || asset.Data.Length == 0)
119 { 217 {
120 asset = SynchronousRestObjectRequester. 218 asset = SynchronousRestObjectRequester.
121 MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); 219 MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency);
@@ -146,7 +244,7 @@ namespace OpenSim.Services.Connectors
146 return fullAsset.Metadata; 244 return fullAsset.Metadata;
147 } 245 }
148 246
149 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 247 string uri = MapServer(id) + "/assets/" + id + "/metadata";
150 248
151 AssetMetadata asset = SynchronousRestObjectRequester. 249 AssetMetadata asset = SynchronousRestObjectRequester.
152 MakeRequest<int, AssetMetadata>("GET", uri, 0); 250 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -163,7 +261,7 @@ namespace OpenSim.Services.Connectors
163 return fullAsset.Data; 261 return fullAsset.Data;
164 } 262 }
165 263
166 RestClient rc = new RestClient(m_ServerURI); 264 RestClient rc = new RestClient(MapServer(id));
167 rc.AddResourcePath("assets"); 265 rc.AddResourcePath("assets");
168 rc.AddResourcePath(id); 266 rc.AddResourcePath(id);
169 rc.AddResourcePath("data"); 267 rc.AddResourcePath("data");
@@ -186,66 +284,109 @@ namespace OpenSim.Services.Connectors
186 return null; 284 return null;
187 } 285 }
188 286
189 public bool Get(string id, Object sender, AssetRetrieved handler) 287 private class QueuedAssetRequest
190 { 288 {
191// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 289 public string uri;
290 public string id;
291 }
192 292
193 string uri = m_ServerURI + "/assets/" + id; 293 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
294 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
194 295
195 AssetBase asset = null; 296 private void AssetRequestProcessor()
196 if (m_Cache != null) 297 {
197 asset = m_Cache.Get(id); 298 QueuedAssetRequest r;
198 299
199 if (asset == null) 300 while (true)
200 { 301 {
201 lock (m_AssetHandlers) 302 r = m_requestQueue.Dequeue();
202 {
203 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
204
205 AssetRetrievedEx handlers;
206 if (m_AssetHandlers.TryGetValue(id, out handlers))
207 {
208 // Someone else is already loading this asset. It will notify our handler when done.
209 handlers += handlerEx;
210 return true;
211 }
212 303
213 // Load the asset ourselves 304 string uri = r.uri;
214 handlers += handlerEx; 305 string id = r.id;
215 m_AssetHandlers.Add(id, handlers);
216 }
217 306
218 bool success = false; 307 bool success = false;
219 try 308 try
220 { 309 {
221 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 310 AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30);
222 delegate(AssetBase a) 311 if (a != null)
223 { 312 {
224 if (m_Cache != null) 313 if (m_Cache != null)
225 m_Cache.Cache(a); 314 m_Cache.Cache(a);
226 315
227 AssetRetrievedEx handlers; 316 List<AssetRetrievedEx> handlers;
228 lock (m_AssetHandlers) 317 lock (m_AssetHandlers)
318 {
319 handlers = m_AssetHandlers[id];
320 m_AssetHandlers.Remove(id);
321 }
322 foreach (AssetRetrievedEx h in handlers)
323 {
324 Util.FireAndForget(x =>
229 { 325 {
230 handlers = m_AssetHandlers[id]; 326 h.Invoke(a);
231 m_AssetHandlers.Remove(id); 327 });
232 } 328 }
233 handlers.Invoke(a); 329 if (handlers != null)
234 }, m_maxAssetRequestConcurrency); 330 handlers.Clear();
235 331
236 success = true; 332 success = true;
333 }
237 } 334 }
238 finally 335 finally
239 { 336 {
240 if (!success) 337 if (!success)
241 { 338 {
339 List<AssetRetrievedEx> handlers;
242 lock (m_AssetHandlers) 340 lock (m_AssetHandlers)
243 { 341 {
342 handlers = m_AssetHandlers[id];
244 m_AssetHandlers.Remove(id); 343 m_AssetHandlers.Remove(id);
245 } 344 }
345 if (handlers != null)
346 handlers.Clear();
246 } 347 }
247 } 348 }
248 } 349 }
350 }
351
352 public bool Get(string id, Object sender, AssetRetrieved handler)
353 {
354 string uri = MapServer(id) + "/assets/" + id;
355
356 AssetBase asset = null;
357 if (m_Cache != null)
358 asset = m_Cache.Get(id);
359
360 if (asset == null || asset.Data == null || asset.Data.Length == 0)
361 {
362 lock (m_AssetHandlers)
363 {
364 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
365
366// AssetRetrievedEx handlers;
367 List<AssetRetrievedEx> handlers;
368 if (m_AssetHandlers.TryGetValue(id, out handlers))
369 {
370 // Someone else is already loading this asset. It will notify our handler when done.
371// handlers += handlerEx;
372 handlers.Add(handlerEx);
373 return true;
374 }
375
376 // Load the asset ourselves
377// handlers += handlerEx;
378 handlers = new List<AssetRetrievedEx>();
379 handlers.Add(handlerEx);
380
381 m_AssetHandlers.Add(id, handlers);
382 }
383
384 QueuedAssetRequest request = new QueuedAssetRequest();
385 request.id = id;
386 request.uri = uri;
387
388 m_requestQueue.Enqueue(request);
389 }
249 else 390 else
250 { 391 {
251 handler(id, sender, asset); 392 handler(id, sender, asset);
@@ -256,38 +397,95 @@ namespace OpenSim.Services.Connectors
256 397
257 public string Store(AssetBase asset) 398 public string Store(AssetBase asset)
258 { 399 {
259 if (asset.Local) 400 // Have to assign the asset ID here. This isn't likely to
401 // trigger since current callers don't pass emtpy IDs
402 // We need the asset ID to route the request to the proper
403 // cluster member, so we can't have the server assign one.
404 if (asset.ID == string.Empty)
260 { 405 {
261 if (m_Cache != null) 406 if (asset.FullID == UUID.Zero)
262 m_Cache.Cache(asset); 407 {
408 asset.FullID = UUID.Random();
409 }
410 asset.ID = asset.FullID.ToString();
411 }
412 else if (asset.FullID == UUID.Zero)
413 {
414 UUID uuid = UUID.Zero;
415 if (UUID.TryParse(asset.ID, out uuid))
416 {
417 asset.FullID = uuid;
418 }
419 else
420 {
421 asset.FullID = UUID.Random();
422 }
423 }
263 424
425 if (m_Cache != null)
426 m_Cache.Cache(asset);
427 if (asset.Temporary || asset.Local)
428 {
264 return asset.ID; 429 return asset.ID;
265 } 430 }
266 431
267 string uri = m_ServerURI + "/assets/"; 432 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
268 433
269 string newID = string.Empty; 434 string newID = string.Empty;
270 try 435 try
271 { 436 {
272 newID = SynchronousRestObjectRequester. 437 newID = SynchronousRestObjectRequester.
273 MakeRequest<AssetBase, string>("POST", uri, asset); 438 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
439 if (newID == null || newID == "")
440 {
441 newID = UUID.Zero.ToString();
442 }
274 } 443 }
275 catch (Exception e) 444 catch (Exception e)
276 { 445 {
277 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 446 newID = UUID.Zero.ToString();
278 } 447 }
279 448
280 if (newID != String.Empty) 449 if (newID == UUID.Zero.ToString())
450 {
451 //The asset upload failed, put it in a queue for later
452 asset.UploadAttempts++;
453 if (asset.UploadAttempts > 30)
454 {
455 //By this stage we've been in the queue for a good few hours;
456 //We're going to drop the asset.
457 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
458 }
459 else
460 {
461 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
462 {
463 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
464 }
465 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
466 m_queue.Add(asset);
467 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
468 m_retryTimer.Start();
469 }
470 }
471 else
281 { 472 {
282 // Placing this here, so that this work with old asset servers that don't send any reply back 473 if (asset.UploadAttempts > 0)
283 // SynchronousRestObjectRequester returns somethins that is not an empty string 474 {
284 if (newID != null) 475 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
285 asset.ID = newID; 476 }
477 if (newID != String.Empty)
478 {
479 // Placing this here, so that this work with old asset servers that don't send any reply back
480 // SynchronousRestObjectRequester returns somethins that is not an empty string
481 if (newID != null)
482 asset.ID = newID;
286 483
287 if (m_Cache != null) 484 if (m_Cache != null)
288 m_Cache.Cache(asset); 485 m_Cache.Cache(asset);
486 }
289 } 487 }
290 return newID; 488 return asset.ID;
291 } 489 }
292 490
293 public bool UpdateContent(string id, byte[] data) 491 public bool UpdateContent(string id, byte[] data)
@@ -308,7 +506,7 @@ namespace OpenSim.Services.Connectors
308 } 506 }
309 asset.Data = data; 507 asset.Data = data;
310 508
311 string uri = m_ServerURI + "/assets/" + id; 509 string uri = MapServer(id) + "/assets/" + id;
312 510
313 if (SynchronousRestObjectRequester. 511 if (SynchronousRestObjectRequester.
314 MakeRequest<AssetBase, bool>("POST", uri, asset)) 512 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -323,7 +521,7 @@ namespace OpenSim.Services.Connectors
323 521
324 public bool Delete(string id) 522 public bool Delete(string id)
325 { 523 {
326 string uri = m_ServerURI + "/assets/" + id; 524 string uri = MapServer(id) + "/assets/" + id;
327 525
328 if (SynchronousRestObjectRequester. 526 if (SynchronousRestObjectRequester.
329 MakeRequest<int, bool>("DELETE", uri, 0)) 527 MakeRequest<int, bool>("DELETE", uri, 0))