diff options
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r-- | OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | 324 |
1 files changed, 264 insertions, 60 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs index 8b04d7f..bf0cc35 100644 --- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | |||
@@ -27,9 +27,11 @@ | |||
27 | 27 | ||
28 | using log4net; | 28 | using log4net; |
29 | using System; | 29 | using System; |
30 | using System.Threading; | ||
30 | using System.Collections.Generic; | 31 | using System.Collections.Generic; |
31 | using System.IO; | 32 | using System.IO; |
32 | using System.Reflection; | 33 | using System.Reflection; |
34 | using System.Timers; | ||
33 | using Nini.Config; | 35 | using Nini.Config; |
34 | using OpenSim.Framework; | 36 | using OpenSim.Framework; |
35 | using OpenSim.Framework.Console; | 37 | using OpenSim.Framework.Console; |
@@ -47,13 +49,22 @@ namespace OpenSim.Services.Connectors | |||
47 | 49 | ||
48 | private string m_ServerURI = String.Empty; | 50 | private string m_ServerURI = String.Empty; |
49 | private IImprovedAssetCache m_Cache = null; | 51 | private IImprovedAssetCache m_Cache = null; |
52 | private int m_retryCounter; | ||
53 | private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); | ||
54 | private System.Timers.Timer m_retryTimer; | ||
50 | private int m_maxAssetRequestConcurrency = 30; | 55 | private int m_maxAssetRequestConcurrency = 30; |
51 | 56 | ||
52 | private delegate void AssetRetrievedEx(AssetBase asset); | 57 | private delegate void AssetRetrievedEx(AssetBase asset); |
53 | 58 | ||
54 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. | 59 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. |
55 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded | 60 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded |
56 | private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); | 61 | // private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); |
62 | |||
63 | private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>(); | ||
64 | |||
65 | private Dictionary<string, string> m_UriMap = new Dictionary<string, string>(); | ||
66 | |||
67 | private Thread[] m_fetchThreads; | ||
57 | 68 | ||
58 | public int MaxAssetRequestConcurrency | 69 | public int MaxAssetRequestConcurrency |
59 | { | 70 | { |
@@ -91,13 +102,108 @@ namespace OpenSim.Services.Connectors | |||
91 | string serviceURI = assetConfig.GetString("AssetServerURI", | 102 | string serviceURI = assetConfig.GetString("AssetServerURI", |
92 | String.Empty); | 103 | String.Empty); |
93 | 104 | ||
105 | m_ServerURI = serviceURI; | ||
106 | |||
94 | if (serviceURI == String.Empty) | 107 | if (serviceURI == String.Empty) |
95 | { | 108 | { |
96 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); | 109 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); |
97 | throw new Exception("Asset connector init error"); | 110 | throw new Exception("Asset connector init error"); |
98 | } | 111 | } |
99 | 112 | ||
100 | m_ServerURI = serviceURI; | 113 | |
114 | m_retryTimer = new System.Timers.Timer(); | ||
115 | m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); | ||
116 | m_retryTimer.Interval = 60000; | ||
117 | |||
118 | Uri serverUri = new Uri(m_ServerURI); | ||
119 | |||
120 | string groupHost = serverUri.Host; | ||
121 | |||
122 | for (int i = 0 ; i < 256 ; i++) | ||
123 | { | ||
124 | string prefix = i.ToString("x2"); | ||
125 | groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost); | ||
126 | |||
127 | m_UriMap[prefix] = groupHost; | ||
128 | //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); | ||
129 | } | ||
130 | |||
131 | m_fetchThreads = new Thread[2]; | ||
132 | |||
133 | for (int i = 0 ; i < 2 ; i++) | ||
134 | { | ||
135 | m_fetchThreads[i] = new Thread(AssetRequestProcessor); | ||
136 | m_fetchThreads[i].Start(); | ||
137 | } | ||
138 | } | ||
139 | |||
140 | private string MapServer(string id) | ||
141 | { | ||
142 | UriBuilder serverUri = new UriBuilder(m_ServerURI); | ||
143 | |||
144 | string prefix = id.Substring(0, 2).ToLower(); | ||
145 | |||
146 | string host; | ||
147 | |||
148 | // HG URLs will not be valid UUIDS | ||
149 | if (m_UriMap.ContainsKey(prefix)) | ||
150 | host = m_UriMap[prefix]; | ||
151 | else | ||
152 | host = m_UriMap["00"]; | ||
153 | |||
154 | serverUri.Host = host; | ||
155 | |||
156 | // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix); | ||
157 | |||
158 | string ret = serverUri.Uri.AbsoluteUri; | ||
159 | if (ret.EndsWith("/")) | ||
160 | ret = ret.Substring(0, ret.Length - 1); | ||
161 | return ret; | ||
162 | } | ||
163 | |||
164 | protected void retryCheck(object source, ElapsedEventArgs e) | ||
165 | { | ||
166 | m_retryCounter++; | ||
167 | if (m_retryCounter > 60) m_retryCounter -= 60; | ||
168 | List<int> keys = new List<int>(); | ||
169 | foreach (int a in m_retryQueue.Keys) | ||
170 | { | ||
171 | keys.Add(a); | ||
172 | } | ||
173 | foreach (int a in keys) | ||
174 | { | ||
175 | //We exponentially fall back on frequency until we reach one attempt per hour | ||
176 | //The net result is that we end up in the queue for roughly 24 hours.. | ||
177 | //24 hours worth of assets could be a lot, so the hope is that the region admin | ||
178 | //will have gotten the asset connector back online quickly! | ||
179 | |||
180 | int timefactor = a ^ 2; | ||
181 | if (timefactor > 60) | ||
182 | { | ||
183 | timefactor = 60; | ||
184 | } | ||
185 | |||
186 | //First, find out if we care about this timefactor | ||
187 | if (timefactor % a == 0) | ||
188 | { | ||
189 | //Yes, we do! | ||
190 | List<AssetBase> retrylist = m_retryQueue[a]; | ||
191 | m_retryQueue.Remove(a); | ||
192 | |||
193 | foreach(AssetBase ass in retrylist) | ||
194 | { | ||
195 | Store(ass); //Store my ass. This function will put it back in the dictionary if it fails | ||
196 | } | ||
197 | } | ||
198 | } | ||
199 | |||
200 | if (m_retryQueue.Count == 0) | ||
201 | { | ||
202 | //It might only be one tick per minute, but I have | ||
203 | //repented and abandoned my wasteful ways | ||
204 | m_retryCounter = 0; | ||
205 | m_retryTimer.Stop(); | ||
206 | } | ||
101 | } | 207 | } |
102 | 208 | ||
103 | protected void SetCache(IImprovedAssetCache cache) | 209 | protected void SetCache(IImprovedAssetCache cache) |
@@ -107,15 +213,13 @@ namespace OpenSim.Services.Connectors | |||
107 | 213 | ||
108 | public AssetBase Get(string id) | 214 | public AssetBase Get(string id) |
109 | { | 215 | { |
110 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); | 216 | string uri = MapServer(id) + "/assets/" + id; |
111 | |||
112 | string uri = m_ServerURI + "/assets/" + id; | ||
113 | 217 | ||
114 | AssetBase asset = null; | 218 | AssetBase asset = null; |
115 | if (m_Cache != null) | 219 | if (m_Cache != null) |
116 | asset = m_Cache.Get(id); | 220 | asset = m_Cache.Get(id); |
117 | 221 | ||
118 | if (asset == null) | 222 | if (asset == null || asset.Data == null || asset.Data.Length == 0) |
119 | { | 223 | { |
120 | asset = SynchronousRestObjectRequester. | 224 | asset = SynchronousRestObjectRequester. |
121 | MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); | 225 | MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); |
@@ -146,7 +250,7 @@ namespace OpenSim.Services.Connectors | |||
146 | return fullAsset.Metadata; | 250 | return fullAsset.Metadata; |
147 | } | 251 | } |
148 | 252 | ||
149 | string uri = m_ServerURI + "/assets/" + id + "/metadata"; | 253 | string uri = MapServer(id) + "/assets/" + id + "/metadata"; |
150 | 254 | ||
151 | AssetMetadata asset = SynchronousRestObjectRequester. | 255 | AssetMetadata asset = SynchronousRestObjectRequester. |
152 | MakeRequest<int, AssetMetadata>("GET", uri, 0); | 256 | MakeRequest<int, AssetMetadata>("GET", uri, 0); |
@@ -163,7 +267,7 @@ namespace OpenSim.Services.Connectors | |||
163 | return fullAsset.Data; | 267 | return fullAsset.Data; |
164 | } | 268 | } |
165 | 269 | ||
166 | RestClient rc = new RestClient(m_ServerURI); | 270 | RestClient rc = new RestClient(MapServer(id)); |
167 | rc.AddResourcePath("assets"); | 271 | rc.AddResourcePath("assets"); |
168 | rc.AddResourcePath(id); | 272 | rc.AddResourcePath(id); |
169 | rc.AddResourcePath("data"); | 273 | rc.AddResourcePath("data"); |
@@ -186,66 +290,109 @@ namespace OpenSim.Services.Connectors | |||
186 | return null; | 290 | return null; |
187 | } | 291 | } |
188 | 292 | ||
189 | public bool Get(string id, Object sender, AssetRetrieved handler) | 293 | private class QueuedAssetRequest |
190 | { | 294 | { |
191 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); | 295 | public string uri; |
296 | public string id; | ||
297 | } | ||
192 | 298 | ||
193 | string uri = m_ServerURI + "/assets/" + id; | 299 | private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue = |
300 | new OpenMetaverse.BlockingQueue<QueuedAssetRequest>(); | ||
194 | 301 | ||
195 | AssetBase asset = null; | 302 | private void AssetRequestProcessor() |
196 | if (m_Cache != null) | 303 | { |
197 | asset = m_Cache.Get(id); | 304 | QueuedAssetRequest r; |
198 | 305 | ||
199 | if (asset == null) | 306 | while (true) |
200 | { | 307 | { |
201 | lock (m_AssetHandlers) | 308 | r = m_requestQueue.Dequeue(); |
202 | { | ||
203 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
204 | |||
205 | AssetRetrievedEx handlers; | ||
206 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | ||
207 | { | ||
208 | // Someone else is already loading this asset. It will notify our handler when done. | ||
209 | handlers += handlerEx; | ||
210 | return true; | ||
211 | } | ||
212 | 309 | ||
213 | // Load the asset ourselves | 310 | string uri = r.uri; |
214 | handlers += handlerEx; | 311 | string id = r.id; |
215 | m_AssetHandlers.Add(id, handlers); | ||
216 | } | ||
217 | 312 | ||
218 | bool success = false; | 313 | bool success = false; |
219 | try | 314 | try |
220 | { | 315 | { |
221 | AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, | 316 | AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30); |
222 | delegate(AssetBase a) | 317 | if (a != null) |
223 | { | 318 | { |
224 | if (m_Cache != null) | 319 | if (m_Cache != null) |
225 | m_Cache.Cache(a); | 320 | m_Cache.Cache(a); |
226 | 321 | ||
227 | AssetRetrievedEx handlers; | 322 | List<AssetRetrievedEx> handlers; |
228 | lock (m_AssetHandlers) | 323 | lock (m_AssetHandlers) |
324 | { | ||
325 | handlers = m_AssetHandlers[id]; | ||
326 | m_AssetHandlers.Remove(id); | ||
327 | } | ||
328 | foreach (AssetRetrievedEx h in handlers) | ||
329 | { | ||
330 | Util.FireAndForget(x => | ||
229 | { | 331 | { |
230 | handlers = m_AssetHandlers[id]; | 332 | h.Invoke(a); |
231 | m_AssetHandlers.Remove(id); | 333 | }); |
232 | } | 334 | } |
233 | handlers.Invoke(a); | 335 | if (handlers != null) |
234 | }, m_maxAssetRequestConcurrency); | 336 | handlers.Clear(); |
235 | 337 | ||
236 | success = true; | 338 | success = true; |
339 | } | ||
237 | } | 340 | } |
238 | finally | 341 | finally |
239 | { | 342 | { |
240 | if (!success) | 343 | if (!success) |
241 | { | 344 | { |
345 | List<AssetRetrievedEx> handlers; | ||
242 | lock (m_AssetHandlers) | 346 | lock (m_AssetHandlers) |
243 | { | 347 | { |
348 | handlers = m_AssetHandlers[id]; | ||
244 | m_AssetHandlers.Remove(id); | 349 | m_AssetHandlers.Remove(id); |
245 | } | 350 | } |
351 | if (handlers != null) | ||
352 | handlers.Clear(); | ||
246 | } | 353 | } |
247 | } | 354 | } |
248 | } | 355 | } |
356 | } | ||
357 | |||
358 | public bool Get(string id, Object sender, AssetRetrieved handler) | ||
359 | { | ||
360 | string uri = MapServer(id) + "/assets/" + id; | ||
361 | |||
362 | AssetBase asset = null; | ||
363 | if (m_Cache != null) | ||
364 | asset = m_Cache.Get(id); | ||
365 | |||
366 | if (asset == null || asset.Data == null || asset.Data.Length == 0) | ||
367 | { | ||
368 | lock (m_AssetHandlers) | ||
369 | { | ||
370 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
371 | |||
372 | // AssetRetrievedEx handlers; | ||
373 | List<AssetRetrievedEx> handlers; | ||
374 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | ||
375 | { | ||
376 | // Someone else is already loading this asset. It will notify our handler when done. | ||
377 | // handlers += handlerEx; | ||
378 | handlers.Add(handlerEx); | ||
379 | return true; | ||
380 | } | ||
381 | |||
382 | // Load the asset ourselves | ||
383 | // handlers += handlerEx; | ||
384 | handlers = new List<AssetRetrievedEx>(); | ||
385 | handlers.Add(handlerEx); | ||
386 | |||
387 | m_AssetHandlers.Add(id, handlers); | ||
388 | } | ||
389 | |||
390 | QueuedAssetRequest request = new QueuedAssetRequest(); | ||
391 | request.id = id; | ||
392 | request.uri = uri; | ||
393 | |||
394 | m_requestQueue.Enqueue(request); | ||
395 | } | ||
249 | else | 396 | else |
250 | { | 397 | { |
251 | handler(id, sender, asset); | 398 | handler(id, sender, asset); |
@@ -256,38 +403,95 @@ namespace OpenSim.Services.Connectors | |||
256 | 403 | ||
257 | public string Store(AssetBase asset) | 404 | public string Store(AssetBase asset) |
258 | { | 405 | { |
259 | if (asset.Local) | 406 | // Have to assign the asset ID here. This isn't likely to |
407 | // trigger since current callers don't pass emtpy IDs | ||
408 | // We need the asset ID to route the request to the proper | ||
409 | // cluster member, so we can't have the server assign one. | ||
410 | if (asset.ID == string.Empty) | ||
260 | { | 411 | { |
261 | if (m_Cache != null) | 412 | if (asset.FullID == UUID.Zero) |
262 | m_Cache.Cache(asset); | 413 | { |
414 | asset.FullID = UUID.Random(); | ||
415 | } | ||
416 | asset.ID = asset.FullID.ToString(); | ||
417 | } | ||
418 | else if (asset.FullID == UUID.Zero) | ||
419 | { | ||
420 | UUID uuid = UUID.Zero; | ||
421 | if (UUID.TryParse(asset.ID, out uuid)) | ||
422 | { | ||
423 | asset.FullID = uuid; | ||
424 | } | ||
425 | else | ||
426 | { | ||
427 | asset.FullID = UUID.Random(); | ||
428 | } | ||
429 | } | ||
263 | 430 | ||
431 | if (m_Cache != null) | ||
432 | m_Cache.Cache(asset); | ||
433 | if (asset.Temporary || asset.Local) | ||
434 | { | ||
264 | return asset.ID; | 435 | return asset.ID; |
265 | } | 436 | } |
266 | 437 | ||
267 | string uri = m_ServerURI + "/assets/"; | 438 | string uri = MapServer(asset.FullID.ToString()) + "/assets/"; |
268 | 439 | ||
269 | string newID = string.Empty; | 440 | string newID = string.Empty; |
270 | try | 441 | try |
271 | { | 442 | { |
272 | newID = SynchronousRestObjectRequester. | 443 | newID = SynchronousRestObjectRequester. |
273 | MakeRequest<AssetBase, string>("POST", uri, asset); | 444 | MakeRequest<AssetBase, string>("POST", uri, asset, 25); |
445 | if (newID == null || newID == "") | ||
446 | { | ||
447 | newID = UUID.Zero.ToString(); | ||
448 | } | ||
274 | } | 449 | } |
275 | catch (Exception e) | 450 | catch (Exception e) |
276 | { | 451 | { |
277 | m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); | 452 | newID = UUID.Zero.ToString(); |
278 | } | 453 | } |
279 | 454 | ||
280 | if (newID != String.Empty) | 455 | if (newID == UUID.Zero.ToString()) |
456 | { | ||
457 | //The asset upload failed, put it in a queue for later | ||
458 | asset.UploadAttempts++; | ||
459 | if (asset.UploadAttempts > 30) | ||
460 | { | ||
461 | //By this stage we've been in the queue for a good few hours; | ||
462 | //We're going to drop the asset. | ||
463 | m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); | ||
464 | } | ||
465 | else | ||
466 | { | ||
467 | if (!m_retryQueue.ContainsKey(asset.UploadAttempts)) | ||
468 | { | ||
469 | m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>()); | ||
470 | } | ||
471 | List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts]; | ||
472 | m_queue.Add(asset); | ||
473 | m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); | ||
474 | m_retryTimer.Start(); | ||
475 | } | ||
476 | } | ||
477 | else | ||
281 | { | 478 | { |
282 | // Placing this here, so that this work with old asset servers that don't send any reply back | 479 | if (asset.UploadAttempts > 0) |
283 | // SynchronousRestObjectRequester returns somethins that is not an empty string | 480 | { |
284 | if (newID != null) | 481 | m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString()); |
285 | asset.ID = newID; | 482 | } |
483 | if (newID != String.Empty) | ||
484 | { | ||
485 | // Placing this here, so that this work with old asset servers that don't send any reply back | ||
486 | // SynchronousRestObjectRequester returns somethins that is not an empty string | ||
487 | if (newID != null) | ||
488 | asset.ID = newID; | ||
286 | 489 | ||
287 | if (m_Cache != null) | 490 | if (m_Cache != null) |
288 | m_Cache.Cache(asset); | 491 | m_Cache.Cache(asset); |
492 | } | ||
289 | } | 493 | } |
290 | return newID; | 494 | return asset.ID; |
291 | } | 495 | } |
292 | 496 | ||
293 | public bool UpdateContent(string id, byte[] data) | 497 | public bool UpdateContent(string id, byte[] data) |
@@ -308,7 +512,7 @@ namespace OpenSim.Services.Connectors | |||
308 | } | 512 | } |
309 | asset.Data = data; | 513 | asset.Data = data; |
310 | 514 | ||
311 | string uri = m_ServerURI + "/assets/" + id; | 515 | string uri = MapServer(id) + "/assets/" + id; |
312 | 516 | ||
313 | if (SynchronousRestObjectRequester. | 517 | if (SynchronousRestObjectRequester. |
314 | MakeRequest<AssetBase, bool>("POST", uri, asset)) | 518 | MakeRequest<AssetBase, bool>("POST", uri, asset)) |
@@ -323,7 +527,7 @@ namespace OpenSim.Services.Connectors | |||
323 | 527 | ||
324 | public bool Delete(string id) | 528 | public bool Delete(string id) |
325 | { | 529 | { |
326 | string uri = m_ServerURI + "/assets/" + id; | 530 | string uri = MapServer(id) + "/assets/" + id; |
327 | 531 | ||
328 | if (SynchronousRestObjectRequester. | 532 | if (SynchronousRestObjectRequester. |
329 | MakeRequest<int, bool>("DELETE", uri, 0)) | 533 | MakeRequest<int, bool>("DELETE", uri, 0)) |