aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs291
1 files changed, 241 insertions, 50 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index 086b5ad..9d6d9ad 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -27,9 +27,11 @@
27 27
28using log4net; 28using log4net;
29using System; 29using System;
30using System.Threading;
30using System.Collections.Generic; 31using System.Collections.Generic;
31using System.IO; 32using System.IO;
32using System.Reflection; 33using System.Reflection;
34using System.Timers;
33using Nini.Config; 35using Nini.Config;
34using OpenSim.Framework; 36using OpenSim.Framework;
35using OpenSim.Framework.Console; 37using OpenSim.Framework.Console;
@@ -47,13 +49,20 @@ namespace OpenSim.Services.Connectors
47 49
48 private string m_ServerURI = String.Empty; 50 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 51 private IImprovedAssetCache m_Cache = null;
50 52 private int m_retryCounter;
53 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
54 private System.Timers.Timer m_retryTimer;
51 private delegate void AssetRetrievedEx(AssetBase asset); 55 private delegate void AssetRetrievedEx(AssetBase asset);
52 56
53 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 57 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
54 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 58 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
55 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 59// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
60
61 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
56 62
63 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
64
65 private Thread[] m_fetchThreads;
57 66
58 public AssetServicesConnector() 67 public AssetServicesConnector()
59 { 68 {
@@ -81,13 +90,102 @@ namespace OpenSim.Services.Connectors
81 string serviceURI = assetConfig.GetString("AssetServerURI", 90 string serviceURI = assetConfig.GetString("AssetServerURI",
82 String.Empty); 91 String.Empty);
83 92
93 m_ServerURI = serviceURI;
94
84 if (serviceURI == String.Empty) 95 if (serviceURI == String.Empty)
85 { 96 {
86 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 97 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
87 throw new Exception("Asset connector init error"); 98 throw new Exception("Asset connector init error");
88 } 99 }
89 100
90 m_ServerURI = serviceURI; 101
102 m_retryTimer = new System.Timers.Timer();
103 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
104 m_retryTimer.Interval = 60000;
105
106 Uri serverUri = new Uri(m_ServerURI);
107
108 string groupHost = serverUri.Host;
109
110 for (int i = 0 ; i < 256 ; i++)
111 {
112 string prefix = i.ToString("x2");
113 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
114
115 m_UriMap[prefix] = groupHost;
116 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
117 }
118
119 m_fetchThreads = new Thread[2];
120
121 for (int i = 0 ; i < 2 ; i++)
122 {
123 m_fetchThreads[i] = new Thread(AssetRequestProcessor);
124 m_fetchThreads[i].Start();
125 }
126 }
127
128 private string MapServer(string id)
129 {
130 UriBuilder serverUri = new UriBuilder(m_ServerURI);
131
132 string prefix = id.Substring(0, 2).ToLower();
133
134 string host = m_UriMap[prefix];
135
136 serverUri.Host = host;
137
138 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
139
140 string ret = serverUri.Uri.AbsoluteUri;
141 if (ret.EndsWith("/"))
142 ret = ret.Substring(0, ret.Length - 1);
143 return ret;
144 }
145
146 protected void retryCheck(object source, ElapsedEventArgs e)
147 {
148 m_retryCounter++;
149 if (m_retryCounter > 60) m_retryCounter -= 60;
150 List<int> keys = new List<int>();
151 foreach (int a in m_retryQueue.Keys)
152 {
153 keys.Add(a);
154 }
155 foreach (int a in keys)
156 {
157 //We exponentially fall back on frequency until we reach one attempt per hour
158 //The net result is that we end up in the queue for roughly 24 hours..
159 //24 hours worth of assets could be a lot, so the hope is that the region admin
160 //will have gotten the asset connector back online quickly!
161
162 int timefactor = a ^ 2;
163 if (timefactor > 60)
164 {
165 timefactor = 60;
166 }
167
168 //First, find out if we care about this timefactor
169 if (timefactor % a == 0)
170 {
171 //Yes, we do!
172 List<AssetBase> retrylist = m_retryQueue[a];
173 m_retryQueue.Remove(a);
174
175 foreach(AssetBase ass in retrylist)
176 {
177 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
178 }
179 }
180 }
181
182 if (m_retryQueue.Count == 0)
183 {
184 //It might only be one tick per minute, but I have
185 //repented and abandoned my wasteful ways
186 m_retryCounter = 0;
187 m_retryTimer.Stop();
188 }
91 } 189 }
92 190
93 protected void SetCache(IImprovedAssetCache cache) 191 protected void SetCache(IImprovedAssetCache cache)
@@ -97,15 +195,13 @@ namespace OpenSim.Services.Connectors
97 195
98 public AssetBase Get(string id) 196 public AssetBase Get(string id)
99 { 197 {
100// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 198 string uri = MapServer(id) + "/assets/" + id;
101
102 string uri = m_ServerURI + "/assets/" + id;
103 199
104 AssetBase asset = null; 200 AssetBase asset = null;
105 if (m_Cache != null) 201 if (m_Cache != null)
106 asset = m_Cache.Get(id); 202 asset = m_Cache.Get(id);
107 203
108 if (asset == null) 204 if (asset == null || asset.Data == null || asset.Data.Length == 0)
109 { 205 {
110 asset = SynchronousRestObjectRequester. 206 asset = SynchronousRestObjectRequester.
111 MakeRequest<int, AssetBase>("GET", uri, 0, 30); 207 MakeRequest<int, AssetBase>("GET", uri, 0, 30);
@@ -136,7 +232,7 @@ namespace OpenSim.Services.Connectors
136 return fullAsset.Metadata; 232 return fullAsset.Metadata;
137 } 233 }
138 234
139 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 235 string uri = MapServer(id) + "/assets/" + id + "/metadata";
140 236
141 AssetMetadata asset = SynchronousRestObjectRequester. 237 AssetMetadata asset = SynchronousRestObjectRequester.
142 MakeRequest<int, AssetMetadata>("GET", uri, 0); 238 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -153,7 +249,7 @@ namespace OpenSim.Services.Connectors
153 return fullAsset.Data; 249 return fullAsset.Data;
154 } 250 }
155 251
156 RestClient rc = new RestClient(m_ServerURI); 252 RestClient rc = new RestClient(MapServer(id));
157 rc.AddResourcePath("assets"); 253 rc.AddResourcePath("assets");
158 rc.AddResourcePath(id); 254 rc.AddResourcePath(id);
159 rc.AddResourcePath("data"); 255 rc.AddResourcePath("data");
@@ -176,34 +272,25 @@ namespace OpenSim.Services.Connectors
176 return null; 272 return null;
177 } 273 }
178 274
179 public bool Get(string id, Object sender, AssetRetrieved handler) 275 private class QueuedAssetRequest
180 { 276 {
181// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 277 public string uri;
278 public string id;
279 }
182 280
183 string uri = m_ServerURI + "/assets/" + id; 281 private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
282 new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
184 283
185 AssetBase asset = null; 284 private void AssetRequestProcessor()
186 if (m_Cache != null) 285 {
187 asset = m_Cache.Get(id); 286 QueuedAssetRequest r;
188 287
189 if (asset == null) 288 while (true)
190 { 289 {
191 lock (m_AssetHandlers) 290 r = m_requestQueue.Dequeue();
192 {
193 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
194 291
195 AssetRetrievedEx handlers; 292 string uri = r.uri;
196 if (m_AssetHandlers.TryGetValue(id, out handlers)) 293 string id = r.id;
197 {
198 // Someone else is already loading this asset. It will notify our handler when done.
199 handlers += handlerEx;
200 return true;
201 }
202
203 // Load the asset ourselves
204 handlers += handlerEx;
205 m_AssetHandlers.Add(id, handlers);
206 }
207 294
208 bool success = false; 295 bool success = false;
209 try 296 try
@@ -214,13 +301,16 @@ namespace OpenSim.Services.Connectors
214 if (m_Cache != null) 301 if (m_Cache != null)
215 m_Cache.Cache(a); 302 m_Cache.Cache(a);
216 303
217 AssetRetrievedEx handlers; 304 List<AssetRetrievedEx> handlers;
218 lock (m_AssetHandlers) 305 lock (m_AssetHandlers)
219 { 306 {
220 handlers = m_AssetHandlers[id]; 307 handlers = m_AssetHandlers[id];
221 m_AssetHandlers.Remove(id); 308 m_AssetHandlers.Remove(id);
222 } 309 }
223 handlers.Invoke(a); 310 foreach (AssetRetrievedEx h in handlers)
311 h.Invoke(a);
312 if (handlers != null)
313 handlers.Clear();
224 }, 30); 314 }, 30);
225 315
226 success = true; 316 success = true;
@@ -229,13 +319,57 @@ namespace OpenSim.Services.Connectors
229 { 319 {
230 if (!success) 320 if (!success)
231 { 321 {
322 List<AssetRetrievedEx> handlers;
232 lock (m_AssetHandlers) 323 lock (m_AssetHandlers)
233 { 324 {
325 handlers = m_AssetHandlers[id];
234 m_AssetHandlers.Remove(id); 326 m_AssetHandlers.Remove(id);
235 } 327 }
328 if (handlers != null)
329 handlers.Clear();
236 } 330 }
237 } 331 }
238 } 332 }
333 }
334
335 public bool Get(string id, Object sender, AssetRetrieved handler)
336 {
337 string uri = MapServer(id) + "/assets/" + id;
338
339 AssetBase asset = null;
340 if (m_Cache != null)
341 asset = m_Cache.Get(id);
342
343 if (asset == null || asset.Data == null || asset.Data.Length == 0)
344 {
345 lock (m_AssetHandlers)
346 {
347 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
348
349// AssetRetrievedEx handlers;
350 List<AssetRetrievedEx> handlers;
351 if (m_AssetHandlers.TryGetValue(id, out handlers))
352 {
353 // Someone else is already loading this asset. It will notify our handler when done.
354// handlers += handlerEx;
355 handlers.Add(handlerEx);
356 return true;
357 }
358
359 // Load the asset ourselves
360// handlers += handlerEx;
361 handlers = new List<AssetRetrievedEx>();
362 handlers.Add(handlerEx);
363
364 m_AssetHandlers.Add(id, handlers);
365 }
366
367 QueuedAssetRequest request = new QueuedAssetRequest();
368 request.id = id;
369 request.uri = uri;
370
371 m_requestQueue.Enqueue(request);
372 }
239 else 373 else
240 { 374 {
241 handler(id, sender, asset); 375 handler(id, sender, asset);
@@ -246,38 +380,95 @@ namespace OpenSim.Services.Connectors
246 380
247 public string Store(AssetBase asset) 381 public string Store(AssetBase asset)
248 { 382 {
249 if (asset.Temporary || asset.Local) 383 // Have to assign the asset ID here. This isn't likely to
384 // trigger since current callers don't pass emtpy IDs
385 // We need the asset ID to route the request to the proper
386 // cluster member, so we can't have the server assign one.
387 if (asset.ID == string.Empty)
250 { 388 {
251 if (m_Cache != null) 389 if (asset.FullID == UUID.Zero)
252 m_Cache.Cache(asset); 390 {
391 asset.FullID = UUID.Random();
392 }
393 asset.ID = asset.FullID.ToString();
394 }
395 else if (asset.FullID == UUID.Zero)
396 {
397 UUID uuid = UUID.Zero;
398 if (UUID.TryParse(asset.ID, out uuid))
399 {
400 asset.FullID = uuid;
401 }
402 else
403 {
404 asset.FullID = UUID.Random();
405 }
406 }
253 407
408 if (m_Cache != null)
409 m_Cache.Cache(asset);
410 if (asset.Temporary || asset.Local)
411 {
254 return asset.ID; 412 return asset.ID;
255 } 413 }
256 414
257 string uri = m_ServerURI + "/assets/"; 415 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
258 416
259 string newID = string.Empty; 417 string newID = string.Empty;
260 try 418 try
261 { 419 {
262 newID = SynchronousRestObjectRequester. 420 newID = SynchronousRestObjectRequester.
263 MakeRequest<AssetBase, string>("POST", uri, asset); 421 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
422 if (newID == null || newID == "")
423 {
424 newID = UUID.Zero.ToString();
425 }
264 } 426 }
265 catch (Exception e) 427 catch (Exception e)
266 { 428 {
267 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 429 newID = UUID.Zero.ToString();
268 } 430 }
269 431
270 if (newID != String.Empty) 432 if (newID == UUID.Zero.ToString())
271 { 433 {
272 // Placing this here, so that this work with old asset servers that don't send any reply back 434 //The asset upload failed, put it in a queue for later
273 // SynchronousRestObjectRequester returns somethins that is not an empty string 435 asset.UploadAttempts++;
274 if (newID != null) 436 if (asset.UploadAttempts > 30)
275 asset.ID = newID; 437 {
438 //By this stage we've been in the queue for a good few hours;
439 //We're going to drop the asset.
440 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
441 }
442 else
443 {
444 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
445 {
446 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
447 }
448 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
449 m_queue.Add(asset);
450 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
451 m_retryTimer.Start();
452 }
453 }
454 else
455 {
456 if (asset.UploadAttempts > 0)
457 {
458 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
459 }
460 if (newID != String.Empty)
461 {
462 // Placing this here, so that this work with old asset servers that don't send any reply back
463 // SynchronousRestObjectRequester returns somethins that is not an empty string
464 if (newID != null)
465 asset.ID = newID;
276 466
277 if (m_Cache != null) 467 if (m_Cache != null)
278 m_Cache.Cache(asset); 468 m_Cache.Cache(asset);
469 }
279 } 470 }
280 return newID; 471 return asset.ID;
281 } 472 }
282 473
283 public bool UpdateContent(string id, byte[] data) 474 public bool UpdateContent(string id, byte[] data)
@@ -298,7 +489,7 @@ namespace OpenSim.Services.Connectors
298 } 489 }
299 asset.Data = data; 490 asset.Data = data;
300 491
301 string uri = m_ServerURI + "/assets/" + id; 492 string uri = MapServer(id) + "/assets/" + id;
302 493
303 if (SynchronousRestObjectRequester. 494 if (SynchronousRestObjectRequester.
304 MakeRequest<AssetBase, bool>("POST", uri, asset)) 495 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -313,7 +504,7 @@ namespace OpenSim.Services.Connectors
313 504
314 public bool Delete(string id) 505 public bool Delete(string id)
315 { 506 {
316 string uri = m_ServerURI + "/assets/" + id; 507 string uri = MapServer(id) + "/assets/" + id;
317 508
318 if (SynchronousRestObjectRequester. 509 if (SynchronousRestObjectRequester.
319 MakeRequest<int, bool>("DELETE", uri, 0)) 510 MakeRequest<int, bool>("DELETE", uri, 0))