diff options
Diffstat (limited to '')
-rw-r--r-- | OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | 317 |
1 files changed, 257 insertions, 60 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs index 2b2f11f..4b502b7 100644 --- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | |||
@@ -27,9 +27,11 @@ | |||
27 | 27 | ||
28 | using log4net; | 28 | using log4net; |
29 | using System; | 29 | using System; |
30 | using System.Threading; | ||
30 | using System.Collections.Generic; | 31 | using System.Collections.Generic; |
31 | using System.IO; | 32 | using System.IO; |
32 | using System.Reflection; | 33 | using System.Reflection; |
34 | using System.Timers; | ||
33 | using Nini.Config; | 35 | using Nini.Config; |
34 | using OpenSim.Framework; | 36 | using OpenSim.Framework; |
35 | using OpenSim.Framework.Console; | 37 | using OpenSim.Framework.Console; |
@@ -47,14 +49,22 @@ namespace OpenSim.Services.Connectors | |||
47 | 49 | ||
48 | private string m_ServerURI = String.Empty; | 50 | private string m_ServerURI = String.Empty; |
49 | private IImprovedAssetCache m_Cache = null; | 51 | private IImprovedAssetCache m_Cache = null; |
52 | private int m_retryCounter; | ||
53 | private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); | ||
54 | private System.Timers.Timer m_retryTimer; | ||
50 | private int m_maxAssetRequestConcurrency = 30; | 55 | private int m_maxAssetRequestConcurrency = 30; |
51 | 56 | ||
52 | private delegate void AssetRetrievedEx(AssetBase asset); | 57 | private delegate void AssetRetrievedEx(AssetBase asset); |
53 | 58 | ||
54 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. | 59 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. |
55 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded | 60 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded |
56 | private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); | 61 | // private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); |
57 | 62 | ||
63 | private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>(); | ||
64 | |||
65 | private Dictionary<string, string> m_UriMap = new Dictionary<string, string>(); | ||
66 | |||
67 | private Thread[] m_fetchThreads; | ||
58 | 68 | ||
59 | public AssetServicesConnector() | 69 | public AssetServicesConnector() |
60 | { | 70 | { |
@@ -86,13 +96,102 @@ namespace OpenSim.Services.Connectors | |||
86 | string serviceURI = assetConfig.GetString("AssetServerURI", | 96 | string serviceURI = assetConfig.GetString("AssetServerURI", |
87 | String.Empty); | 97 | String.Empty); |
88 | 98 | ||
99 | m_ServerURI = serviceURI; | ||
100 | |||
89 | if (serviceURI == String.Empty) | 101 | if (serviceURI == String.Empty) |
90 | { | 102 | { |
91 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); | 103 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); |
92 | throw new Exception("Asset connector init error"); | 104 | throw new Exception("Asset connector init error"); |
93 | } | 105 | } |
94 | 106 | ||
95 | m_ServerURI = serviceURI; | 107 | |
108 | m_retryTimer = new System.Timers.Timer(); | ||
109 | m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); | ||
110 | m_retryTimer.Interval = 60000; | ||
111 | |||
112 | Uri serverUri = new Uri(m_ServerURI); | ||
113 | |||
114 | string groupHost = serverUri.Host; | ||
115 | |||
116 | for (int i = 0 ; i < 256 ; i++) | ||
117 | { | ||
118 | string prefix = i.ToString("x2"); | ||
119 | groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost); | ||
120 | |||
121 | m_UriMap[prefix] = groupHost; | ||
122 | //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); | ||
123 | } | ||
124 | |||
125 | m_fetchThreads = new Thread[2]; | ||
126 | |||
127 | for (int i = 0 ; i < 2 ; i++) | ||
128 | { | ||
129 | m_fetchThreads[i] = new Thread(AssetRequestProcessor); | ||
130 | m_fetchThreads[i].Start(); | ||
131 | } | ||
132 | } | ||
133 | |||
134 | private string MapServer(string id) | ||
135 | { | ||
136 | UriBuilder serverUri = new UriBuilder(m_ServerURI); | ||
137 | |||
138 | string prefix = id.Substring(0, 2).ToLower(); | ||
139 | |||
140 | string host = m_UriMap[prefix]; | ||
141 | |||
142 | serverUri.Host = host; | ||
143 | |||
144 | // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix); | ||
145 | |||
146 | string ret = serverUri.Uri.AbsoluteUri; | ||
147 | if (ret.EndsWith("/")) | ||
148 | ret = ret.Substring(0, ret.Length - 1); | ||
149 | return ret; | ||
150 | } | ||
151 | |||
152 | protected void retryCheck(object source, ElapsedEventArgs e) | ||
153 | { | ||
154 | m_retryCounter++; | ||
155 | if (m_retryCounter > 60) m_retryCounter -= 60; | ||
156 | List<int> keys = new List<int>(); | ||
157 | foreach (int a in m_retryQueue.Keys) | ||
158 | { | ||
159 | keys.Add(a); | ||
160 | } | ||
161 | foreach (int a in keys) | ||
162 | { | ||
163 | //We exponentially fall back on frequency until we reach one attempt per hour | ||
164 | //The net result is that we end up in the queue for roughly 24 hours.. | ||
165 | //24 hours worth of assets could be a lot, so the hope is that the region admin | ||
166 | //will have gotten the asset connector back online quickly! | ||
167 | |||
168 | int timefactor = a ^ 2; | ||
169 | if (timefactor > 60) | ||
170 | { | ||
171 | timefactor = 60; | ||
172 | } | ||
173 | |||
174 | //First, find out if we care about this timefactor | ||
175 | if (timefactor % a == 0) | ||
176 | { | ||
177 | //Yes, we do! | ||
178 | List<AssetBase> retrylist = m_retryQueue[a]; | ||
179 | m_retryQueue.Remove(a); | ||
180 | |||
181 | foreach(AssetBase ass in retrylist) | ||
182 | { | ||
183 | Store(ass); //Store my ass. This function will put it back in the dictionary if it fails | ||
184 | } | ||
185 | } | ||
186 | } | ||
187 | |||
188 | if (m_retryQueue.Count == 0) | ||
189 | { | ||
190 | //It might only be one tick per minute, but I have | ||
191 | //repented and abandoned my wasteful ways | ||
192 | m_retryCounter = 0; | ||
193 | m_retryTimer.Stop(); | ||
194 | } | ||
96 | } | 195 | } |
97 | 196 | ||
98 | protected void SetCache(IImprovedAssetCache cache) | 197 | protected void SetCache(IImprovedAssetCache cache) |
@@ -102,15 +201,13 @@ namespace OpenSim.Services.Connectors | |||
102 | 201 | ||
103 | public AssetBase Get(string id) | 202 | public AssetBase Get(string id) |
104 | { | 203 | { |
105 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); | 204 | string uri = MapServer(id) + "/assets/" + id; |
106 | |||
107 | string uri = m_ServerURI + "/assets/" + id; | ||
108 | 205 | ||
109 | AssetBase asset = null; | 206 | AssetBase asset = null; |
110 | if (m_Cache != null) | 207 | if (m_Cache != null) |
111 | asset = m_Cache.Get(id); | 208 | asset = m_Cache.Get(id); |
112 | 209 | ||
113 | if (asset == null) | 210 | if (asset == null || asset.Data == null || asset.Data.Length == 0) |
114 | { | 211 | { |
115 | asset = SynchronousRestObjectRequester. | 212 | asset = SynchronousRestObjectRequester. |
116 | MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); | 213 | MakeRequest<int, AssetBase>("GET", uri, 0, m_maxAssetRequestConcurrency); |
@@ -141,7 +238,7 @@ namespace OpenSim.Services.Connectors | |||
141 | return fullAsset.Metadata; | 238 | return fullAsset.Metadata; |
142 | } | 239 | } |
143 | 240 | ||
144 | string uri = m_ServerURI + "/assets/" + id + "/metadata"; | 241 | string uri = MapServer(id) + "/assets/" + id + "/metadata"; |
145 | 242 | ||
146 | AssetMetadata asset = SynchronousRestObjectRequester. | 243 | AssetMetadata asset = SynchronousRestObjectRequester. |
147 | MakeRequest<int, AssetMetadata>("GET", uri, 0); | 244 | MakeRequest<int, AssetMetadata>("GET", uri, 0); |
@@ -158,7 +255,7 @@ namespace OpenSim.Services.Connectors | |||
158 | return fullAsset.Data; | 255 | return fullAsset.Data; |
159 | } | 256 | } |
160 | 257 | ||
161 | RestClient rc = new RestClient(m_ServerURI); | 258 | RestClient rc = new RestClient(MapServer(id)); |
162 | rc.AddResourcePath("assets"); | 259 | rc.AddResourcePath("assets"); |
163 | rc.AddResourcePath(id); | 260 | rc.AddResourcePath(id); |
164 | rc.AddResourcePath("data"); | 261 | rc.AddResourcePath("data"); |
@@ -181,66 +278,109 @@ namespace OpenSim.Services.Connectors | |||
181 | return null; | 278 | return null; |
182 | } | 279 | } |
183 | 280 | ||
184 | public bool Get(string id, Object sender, AssetRetrieved handler) | 281 | private class QueuedAssetRequest |
185 | { | 282 | { |
186 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); | 283 | public string uri; |
284 | public string id; | ||
285 | } | ||
187 | 286 | ||
188 | string uri = m_ServerURI + "/assets/" + id; | 287 | private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue = |
288 | new OpenMetaverse.BlockingQueue<QueuedAssetRequest>(); | ||
189 | 289 | ||
190 | AssetBase asset = null; | 290 | private void AssetRequestProcessor() |
191 | if (m_Cache != null) | 291 | { |
192 | asset = m_Cache.Get(id); | 292 | QueuedAssetRequest r; |
193 | 293 | ||
194 | if (asset == null) | 294 | while (true) |
195 | { | 295 | { |
196 | lock (m_AssetHandlers) | 296 | r = m_requestQueue.Dequeue(); |
197 | { | ||
198 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
199 | |||
200 | AssetRetrievedEx handlers; | ||
201 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | ||
202 | { | ||
203 | // Someone else is already loading this asset. It will notify our handler when done. | ||
204 | handlers += handlerEx; | ||
205 | return true; | ||
206 | } | ||
207 | 297 | ||
208 | // Load the asset ourselves | 298 | string uri = r.uri; |
209 | handlers += handlerEx; | 299 | string id = r.id; |
210 | m_AssetHandlers.Add(id, handlers); | ||
211 | } | ||
212 | 300 | ||
213 | bool success = false; | 301 | bool success = false; |
214 | try | 302 | try |
215 | { | 303 | { |
216 | AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, | 304 | AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30); |
217 | delegate(AssetBase a) | 305 | if (a != null) |
218 | { | 306 | { |
219 | if (m_Cache != null) | 307 | if (m_Cache != null) |
220 | m_Cache.Cache(a); | 308 | m_Cache.Cache(a); |
221 | 309 | ||
222 | AssetRetrievedEx handlers; | 310 | List<AssetRetrievedEx> handlers; |
223 | lock (m_AssetHandlers) | 311 | lock (m_AssetHandlers) |
312 | { | ||
313 | handlers = m_AssetHandlers[id]; | ||
314 | m_AssetHandlers.Remove(id); | ||
315 | } | ||
316 | foreach (AssetRetrievedEx h in handlers) | ||
317 | { | ||
318 | Util.FireAndForget(x => | ||
224 | { | 319 | { |
225 | handlers = m_AssetHandlers[id]; | 320 | h.Invoke(a); |
226 | m_AssetHandlers.Remove(id); | 321 | }); |
227 | } | 322 | } |
228 | handlers.Invoke(a); | 323 | if (handlers != null) |
229 | }, m_maxAssetRequestConcurrency); | 324 | handlers.Clear(); |
230 | 325 | ||
231 | success = true; | 326 | success = true; |
327 | } | ||
232 | } | 328 | } |
233 | finally | 329 | finally |
234 | { | 330 | { |
235 | if (!success) | 331 | if (!success) |
236 | { | 332 | { |
333 | List<AssetRetrievedEx> handlers; | ||
237 | lock (m_AssetHandlers) | 334 | lock (m_AssetHandlers) |
238 | { | 335 | { |
336 | handlers = m_AssetHandlers[id]; | ||
239 | m_AssetHandlers.Remove(id); | 337 | m_AssetHandlers.Remove(id); |
240 | } | 338 | } |
339 | if (handlers != null) | ||
340 | handlers.Clear(); | ||
241 | } | 341 | } |
242 | } | 342 | } |
243 | } | 343 | } |
344 | } | ||
345 | |||
346 | public bool Get(string id, Object sender, AssetRetrieved handler) | ||
347 | { | ||
348 | string uri = MapServer(id) + "/assets/" + id; | ||
349 | |||
350 | AssetBase asset = null; | ||
351 | if (m_Cache != null) | ||
352 | asset = m_Cache.Get(id); | ||
353 | |||
354 | if (asset == null || asset.Data == null || asset.Data.Length == 0) | ||
355 | { | ||
356 | lock (m_AssetHandlers) | ||
357 | { | ||
358 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
359 | |||
360 | // AssetRetrievedEx handlers; | ||
361 | List<AssetRetrievedEx> handlers; | ||
362 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | ||
363 | { | ||
364 | // Someone else is already loading this asset. It will notify our handler when done. | ||
365 | // handlers += handlerEx; | ||
366 | handlers.Add(handlerEx); | ||
367 | return true; | ||
368 | } | ||
369 | |||
370 | // Load the asset ourselves | ||
371 | // handlers += handlerEx; | ||
372 | handlers = new List<AssetRetrievedEx>(); | ||
373 | handlers.Add(handlerEx); | ||
374 | |||
375 | m_AssetHandlers.Add(id, handlers); | ||
376 | } | ||
377 | |||
378 | QueuedAssetRequest request = new QueuedAssetRequest(); | ||
379 | request.id = id; | ||
380 | request.uri = uri; | ||
381 | |||
382 | m_requestQueue.Enqueue(request); | ||
383 | } | ||
244 | else | 384 | else |
245 | { | 385 | { |
246 | handler(id, sender, asset); | 386 | handler(id, sender, asset); |
@@ -251,38 +391,95 @@ namespace OpenSim.Services.Connectors | |||
251 | 391 | ||
252 | public string Store(AssetBase asset) | 392 | public string Store(AssetBase asset) |
253 | { | 393 | { |
254 | if (asset.Temporary || asset.Local) | 394 | // Have to assign the asset ID here. This isn't likely to |
395 | // trigger since current callers don't pass emtpy IDs | ||
396 | // We need the asset ID to route the request to the proper | ||
397 | // cluster member, so we can't have the server assign one. | ||
398 | if (asset.ID == string.Empty) | ||
255 | { | 399 | { |
256 | if (m_Cache != null) | 400 | if (asset.FullID == UUID.Zero) |
257 | m_Cache.Cache(asset); | 401 | { |
402 | asset.FullID = UUID.Random(); | ||
403 | } | ||
404 | asset.ID = asset.FullID.ToString(); | ||
405 | } | ||
406 | else if (asset.FullID == UUID.Zero) | ||
407 | { | ||
408 | UUID uuid = UUID.Zero; | ||
409 | if (UUID.TryParse(asset.ID, out uuid)) | ||
410 | { | ||
411 | asset.FullID = uuid; | ||
412 | } | ||
413 | else | ||
414 | { | ||
415 | asset.FullID = UUID.Random(); | ||
416 | } | ||
417 | } | ||
258 | 418 | ||
419 | if (m_Cache != null) | ||
420 | m_Cache.Cache(asset); | ||
421 | if (asset.Temporary || asset.Local) | ||
422 | { | ||
259 | return asset.ID; | 423 | return asset.ID; |
260 | } | 424 | } |
261 | 425 | ||
262 | string uri = m_ServerURI + "/assets/"; | 426 | string uri = MapServer(asset.FullID.ToString()) + "/assets/"; |
263 | 427 | ||
264 | string newID = string.Empty; | 428 | string newID = string.Empty; |
265 | try | 429 | try |
266 | { | 430 | { |
267 | newID = SynchronousRestObjectRequester. | 431 | newID = SynchronousRestObjectRequester. |
268 | MakeRequest<AssetBase, string>("POST", uri, asset); | 432 | MakeRequest<AssetBase, string>("POST", uri, asset, 25); |
433 | if (newID == null || newID == "") | ||
434 | { | ||
435 | newID = UUID.Zero.ToString(); | ||
436 | } | ||
269 | } | 437 | } |
270 | catch (Exception e) | 438 | catch (Exception e) |
271 | { | 439 | { |
272 | m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); | 440 | newID = UUID.Zero.ToString(); |
273 | } | 441 | } |
274 | 442 | ||
275 | if (newID != String.Empty) | 443 | if (newID == UUID.Zero.ToString()) |
444 | { | ||
445 | //The asset upload failed, put it in a queue for later | ||
446 | asset.UploadAttempts++; | ||
447 | if (asset.UploadAttempts > 30) | ||
448 | { | ||
449 | //By this stage we've been in the queue for a good few hours; | ||
450 | //We're going to drop the asset. | ||
451 | m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); | ||
452 | } | ||
453 | else | ||
454 | { | ||
455 | if (!m_retryQueue.ContainsKey(asset.UploadAttempts)) | ||
456 | { | ||
457 | m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>()); | ||
458 | } | ||
459 | List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts]; | ||
460 | m_queue.Add(asset); | ||
461 | m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); | ||
462 | m_retryTimer.Start(); | ||
463 | } | ||
464 | } | ||
465 | else | ||
276 | { | 466 | { |
277 | // Placing this here, so that this work with old asset servers that don't send any reply back | 467 | if (asset.UploadAttempts > 0) |
278 | // SynchronousRestObjectRequester returns somethins that is not an empty string | 468 | { |
279 | if (newID != null) | 469 | m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString()); |
280 | asset.ID = newID; | 470 | } |
471 | if (newID != String.Empty) | ||
472 | { | ||
473 | // Placing this here, so that this work with old asset servers that don't send any reply back | ||
474 | // SynchronousRestObjectRequester returns somethins that is not an empty string | ||
475 | if (newID != null) | ||
476 | asset.ID = newID; | ||
281 | 477 | ||
282 | if (m_Cache != null) | 478 | if (m_Cache != null) |
283 | m_Cache.Cache(asset); | 479 | m_Cache.Cache(asset); |
480 | } | ||
284 | } | 481 | } |
285 | return newID; | 482 | return asset.ID; |
286 | } | 483 | } |
287 | 484 | ||
288 | public bool UpdateContent(string id, byte[] data) | 485 | public bool UpdateContent(string id, byte[] data) |
@@ -303,7 +500,7 @@ namespace OpenSim.Services.Connectors | |||
303 | } | 500 | } |
304 | asset.Data = data; | 501 | asset.Data = data; |
305 | 502 | ||
306 | string uri = m_ServerURI + "/assets/" + id; | 503 | string uri = MapServer(id) + "/assets/" + id; |
307 | 504 | ||
308 | if (SynchronousRestObjectRequester. | 505 | if (SynchronousRestObjectRequester. |
309 | MakeRequest<AssetBase, bool>("POST", uri, asset)) | 506 | MakeRequest<AssetBase, bool>("POST", uri, asset)) |
@@ -318,7 +515,7 @@ namespace OpenSim.Services.Connectors | |||
318 | 515 | ||
319 | public bool Delete(string id) | 516 | public bool Delete(string id) |
320 | { | 517 | { |
321 | string uri = m_ServerURI + "/assets/" + id; | 518 | string uri = MapServer(id) + "/assets/" + id; |
322 | 519 | ||
323 | if (SynchronousRestObjectRequester. | 520 | if (SynchronousRestObjectRequester. |
324 | MakeRequest<int, bool>("DELETE", uri, 0)) | 521 | MakeRequest<int, bool>("DELETE", uri, 0)) |