diff options
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r-- | OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | 291 |
1 files changed, 241 insertions, 50 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs index 086b5ad..9d6d9ad 100644 --- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | |||
@@ -27,9 +27,11 @@ | |||
27 | 27 | ||
28 | using log4net; | 28 | using log4net; |
29 | using System; | 29 | using System; |
30 | using System.Threading; | ||
30 | using System.Collections.Generic; | 31 | using System.Collections.Generic; |
31 | using System.IO; | 32 | using System.IO; |
32 | using System.Reflection; | 33 | using System.Reflection; |
34 | using System.Timers; | ||
33 | using Nini.Config; | 35 | using Nini.Config; |
34 | using OpenSim.Framework; | 36 | using OpenSim.Framework; |
35 | using OpenSim.Framework.Console; | 37 | using OpenSim.Framework.Console; |
@@ -47,13 +49,20 @@ namespace OpenSim.Services.Connectors | |||
47 | 49 | ||
48 | private string m_ServerURI = String.Empty; | 50 | private string m_ServerURI = String.Empty; |
49 | private IImprovedAssetCache m_Cache = null; | 51 | private IImprovedAssetCache m_Cache = null; |
50 | 52 | private int m_retryCounter; | |
53 | private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); | ||
54 | private System.Timers.Timer m_retryTimer; | ||
51 | private delegate void AssetRetrievedEx(AssetBase asset); | 55 | private delegate void AssetRetrievedEx(AssetBase asset); |
52 | 56 | ||
53 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. | 57 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. |
54 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded | 58 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded |
55 | private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); | 59 | // private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); |
60 | |||
61 | private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>(); | ||
56 | 62 | ||
63 | private Dictionary<string, string> m_UriMap = new Dictionary<string, string>(); | ||
64 | |||
65 | private Thread[] m_fetchThreads; | ||
57 | 66 | ||
58 | public AssetServicesConnector() | 67 | public AssetServicesConnector() |
59 | { | 68 | { |
@@ -81,13 +90,102 @@ namespace OpenSim.Services.Connectors | |||
81 | string serviceURI = assetConfig.GetString("AssetServerURI", | 90 | string serviceURI = assetConfig.GetString("AssetServerURI", |
82 | String.Empty); | 91 | String.Empty); |
83 | 92 | ||
93 | m_ServerURI = serviceURI; | ||
94 | |||
84 | if (serviceURI == String.Empty) | 95 | if (serviceURI == String.Empty) |
85 | { | 96 | { |
86 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); | 97 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); |
87 | throw new Exception("Asset connector init error"); | 98 | throw new Exception("Asset connector init error"); |
88 | } | 99 | } |
89 | 100 | ||
90 | m_ServerURI = serviceURI; | 101 | |
102 | m_retryTimer = new System.Timers.Timer(); | ||
103 | m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); | ||
104 | m_retryTimer.Interval = 60000; | ||
105 | |||
106 | Uri serverUri = new Uri(m_ServerURI); | ||
107 | |||
108 | string groupHost = serverUri.Host; | ||
109 | |||
110 | for (int i = 0 ; i < 256 ; i++) | ||
111 | { | ||
112 | string prefix = i.ToString("x2"); | ||
113 | groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost); | ||
114 | |||
115 | m_UriMap[prefix] = groupHost; | ||
116 | //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); | ||
117 | } | ||
118 | |||
119 | m_fetchThreads = new Thread[2]; | ||
120 | |||
121 | for (int i = 0 ; i < 2 ; i++) | ||
122 | { | ||
123 | m_fetchThreads[i] = new Thread(AssetRequestProcessor); | ||
124 | m_fetchThreads[i].Start(); | ||
125 | } | ||
126 | } | ||
127 | |||
128 | private string MapServer(string id) | ||
129 | { | ||
130 | UriBuilder serverUri = new UriBuilder(m_ServerURI); | ||
131 | |||
132 | string prefix = id.Substring(0, 2).ToLower(); | ||
133 | |||
134 | string host = m_UriMap[prefix]; | ||
135 | |||
136 | serverUri.Host = host; | ||
137 | |||
138 | // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix); | ||
139 | |||
140 | string ret = serverUri.Uri.AbsoluteUri; | ||
141 | if (ret.EndsWith("/")) | ||
142 | ret = ret.Substring(0, ret.Length - 1); | ||
143 | return ret; | ||
144 | } | ||
145 | |||
146 | protected void retryCheck(object source, ElapsedEventArgs e) | ||
147 | { | ||
148 | m_retryCounter++; | ||
149 | if (m_retryCounter > 60) m_retryCounter -= 60; | ||
150 | List<int> keys = new List<int>(); | ||
151 | foreach (int a in m_retryQueue.Keys) | ||
152 | { | ||
153 | keys.Add(a); | ||
154 | } | ||
155 | foreach (int a in keys) | ||
156 | { | ||
157 | //We exponentially fall back on frequency until we reach one attempt per hour | ||
158 | //The net result is that we end up in the queue for roughly 24 hours.. | ||
159 | //24 hours worth of assets could be a lot, so the hope is that the region admin | ||
160 | //will have gotten the asset connector back online quickly! | ||
161 | |||
162 | int timefactor = a ^ 2; | ||
163 | if (timefactor > 60) | ||
164 | { | ||
165 | timefactor = 60; | ||
166 | } | ||
167 | |||
168 | //First, find out if we care about this timefactor | ||
169 | if (timefactor % a == 0) | ||
170 | { | ||
171 | //Yes, we do! | ||
172 | List<AssetBase> retrylist = m_retryQueue[a]; | ||
173 | m_retryQueue.Remove(a); | ||
174 | |||
175 | foreach(AssetBase ass in retrylist) | ||
176 | { | ||
177 | Store(ass); //Store my ass. This function will put it back in the dictionary if it fails | ||
178 | } | ||
179 | } | ||
180 | } | ||
181 | |||
182 | if (m_retryQueue.Count == 0) | ||
183 | { | ||
184 | //It might only be one tick per minute, but I have | ||
185 | //repented and abandoned my wasteful ways | ||
186 | m_retryCounter = 0; | ||
187 | m_retryTimer.Stop(); | ||
188 | } | ||
91 | } | 189 | } |
92 | 190 | ||
93 | protected void SetCache(IImprovedAssetCache cache) | 191 | protected void SetCache(IImprovedAssetCache cache) |
@@ -97,15 +195,13 @@ namespace OpenSim.Services.Connectors | |||
97 | 195 | ||
98 | public AssetBase Get(string id) | 196 | public AssetBase Get(string id) |
99 | { | 197 | { |
100 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); | 198 | string uri = MapServer(id) + "/assets/" + id; |
101 | |||
102 | string uri = m_ServerURI + "/assets/" + id; | ||
103 | 199 | ||
104 | AssetBase asset = null; | 200 | AssetBase asset = null; |
105 | if (m_Cache != null) | 201 | if (m_Cache != null) |
106 | asset = m_Cache.Get(id); | 202 | asset = m_Cache.Get(id); |
107 | 203 | ||
108 | if (asset == null) | 204 | if (asset == null || asset.Data == null || asset.Data.Length == 0) |
109 | { | 205 | { |
110 | asset = SynchronousRestObjectRequester. | 206 | asset = SynchronousRestObjectRequester. |
111 | MakeRequest<int, AssetBase>("GET", uri, 0, 30); | 207 | MakeRequest<int, AssetBase>("GET", uri, 0, 30); |
@@ -136,7 +232,7 @@ namespace OpenSim.Services.Connectors | |||
136 | return fullAsset.Metadata; | 232 | return fullAsset.Metadata; |
137 | } | 233 | } |
138 | 234 | ||
139 | string uri = m_ServerURI + "/assets/" + id + "/metadata"; | 235 | string uri = MapServer(id) + "/assets/" + id + "/metadata"; |
140 | 236 | ||
141 | AssetMetadata asset = SynchronousRestObjectRequester. | 237 | AssetMetadata asset = SynchronousRestObjectRequester. |
142 | MakeRequest<int, AssetMetadata>("GET", uri, 0); | 238 | MakeRequest<int, AssetMetadata>("GET", uri, 0); |
@@ -153,7 +249,7 @@ namespace OpenSim.Services.Connectors | |||
153 | return fullAsset.Data; | 249 | return fullAsset.Data; |
154 | } | 250 | } |
155 | 251 | ||
156 | RestClient rc = new RestClient(m_ServerURI); | 252 | RestClient rc = new RestClient(MapServer(id)); |
157 | rc.AddResourcePath("assets"); | 253 | rc.AddResourcePath("assets"); |
158 | rc.AddResourcePath(id); | 254 | rc.AddResourcePath(id); |
159 | rc.AddResourcePath("data"); | 255 | rc.AddResourcePath("data"); |
@@ -176,34 +272,25 @@ namespace OpenSim.Services.Connectors | |||
176 | return null; | 272 | return null; |
177 | } | 273 | } |
178 | 274 | ||
179 | public bool Get(string id, Object sender, AssetRetrieved handler) | 275 | private class QueuedAssetRequest |
180 | { | 276 | { |
181 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); | 277 | public string uri; |
278 | public string id; | ||
279 | } | ||
182 | 280 | ||
183 | string uri = m_ServerURI + "/assets/" + id; | 281 | private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue = |
282 | new OpenMetaverse.BlockingQueue<QueuedAssetRequest>(); | ||
184 | 283 | ||
185 | AssetBase asset = null; | 284 | private void AssetRequestProcessor() |
186 | if (m_Cache != null) | 285 | { |
187 | asset = m_Cache.Get(id); | 286 | QueuedAssetRequest r; |
188 | 287 | ||
189 | if (asset == null) | 288 | while (true) |
190 | { | 289 | { |
191 | lock (m_AssetHandlers) | 290 | r = m_requestQueue.Dequeue(); |
192 | { | ||
193 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
194 | 291 | ||
195 | AssetRetrievedEx handlers; | 292 | string uri = r.uri; |
196 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | 293 | string id = r.id; |
197 | { | ||
198 | // Someone else is already loading this asset. It will notify our handler when done. | ||
199 | handlers += handlerEx; | ||
200 | return true; | ||
201 | } | ||
202 | |||
203 | // Load the asset ourselves | ||
204 | handlers += handlerEx; | ||
205 | m_AssetHandlers.Add(id, handlers); | ||
206 | } | ||
207 | 294 | ||
208 | bool success = false; | 295 | bool success = false; |
209 | try | 296 | try |
@@ -214,13 +301,16 @@ namespace OpenSim.Services.Connectors | |||
214 | if (m_Cache != null) | 301 | if (m_Cache != null) |
215 | m_Cache.Cache(a); | 302 | m_Cache.Cache(a); |
216 | 303 | ||
217 | AssetRetrievedEx handlers; | 304 | List<AssetRetrievedEx> handlers; |
218 | lock (m_AssetHandlers) | 305 | lock (m_AssetHandlers) |
219 | { | 306 | { |
220 | handlers = m_AssetHandlers[id]; | 307 | handlers = m_AssetHandlers[id]; |
221 | m_AssetHandlers.Remove(id); | 308 | m_AssetHandlers.Remove(id); |
222 | } | 309 | } |
223 | handlers.Invoke(a); | 310 | foreach (AssetRetrievedEx h in handlers) |
311 | h.Invoke(a); | ||
312 | if (handlers != null) | ||
313 | handlers.Clear(); | ||
224 | }, 30); | 314 | }, 30); |
225 | 315 | ||
226 | success = true; | 316 | success = true; |
@@ -229,13 +319,57 @@ namespace OpenSim.Services.Connectors | |||
229 | { | 319 | { |
230 | if (!success) | 320 | if (!success) |
231 | { | 321 | { |
322 | List<AssetRetrievedEx> handlers; | ||
232 | lock (m_AssetHandlers) | 323 | lock (m_AssetHandlers) |
233 | { | 324 | { |
325 | handlers = m_AssetHandlers[id]; | ||
234 | m_AssetHandlers.Remove(id); | 326 | m_AssetHandlers.Remove(id); |
235 | } | 327 | } |
328 | if (handlers != null) | ||
329 | handlers.Clear(); | ||
236 | } | 330 | } |
237 | } | 331 | } |
238 | } | 332 | } |
333 | } | ||
334 | |||
335 | public bool Get(string id, Object sender, AssetRetrieved handler) | ||
336 | { | ||
337 | string uri = MapServer(id) + "/assets/" + id; | ||
338 | |||
339 | AssetBase asset = null; | ||
340 | if (m_Cache != null) | ||
341 | asset = m_Cache.Get(id); | ||
342 | |||
343 | if (asset == null || asset.Data == null || asset.Data.Length == 0) | ||
344 | { | ||
345 | lock (m_AssetHandlers) | ||
346 | { | ||
347 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
348 | |||
349 | // AssetRetrievedEx handlers; | ||
350 | List<AssetRetrievedEx> handlers; | ||
351 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | ||
352 | { | ||
353 | // Someone else is already loading this asset. It will notify our handler when done. | ||
354 | // handlers += handlerEx; | ||
355 | handlers.Add(handlerEx); | ||
356 | return true; | ||
357 | } | ||
358 | |||
359 | // Load the asset ourselves | ||
360 | // handlers += handlerEx; | ||
361 | handlers = new List<AssetRetrievedEx>(); | ||
362 | handlers.Add(handlerEx); | ||
363 | |||
364 | m_AssetHandlers.Add(id, handlers); | ||
365 | } | ||
366 | |||
367 | QueuedAssetRequest request = new QueuedAssetRequest(); | ||
368 | request.id = id; | ||
369 | request.uri = uri; | ||
370 | |||
371 | m_requestQueue.Enqueue(request); | ||
372 | } | ||
239 | else | 373 | else |
240 | { | 374 | { |
241 | handler(id, sender, asset); | 375 | handler(id, sender, asset); |
@@ -246,38 +380,95 @@ namespace OpenSim.Services.Connectors | |||
246 | 380 | ||
247 | public string Store(AssetBase asset) | 381 | public string Store(AssetBase asset) |
248 | { | 382 | { |
249 | if (asset.Temporary || asset.Local) | 383 | // Have to assign the asset ID here. This isn't likely to |
384 | // trigger since current callers don't pass emtpy IDs | ||
385 | // We need the asset ID to route the request to the proper | ||
386 | // cluster member, so we can't have the server assign one. | ||
387 | if (asset.ID == string.Empty) | ||
250 | { | 388 | { |
251 | if (m_Cache != null) | 389 | if (asset.FullID == UUID.Zero) |
252 | m_Cache.Cache(asset); | 390 | { |
391 | asset.FullID = UUID.Random(); | ||
392 | } | ||
393 | asset.ID = asset.FullID.ToString(); | ||
394 | } | ||
395 | else if (asset.FullID == UUID.Zero) | ||
396 | { | ||
397 | UUID uuid = UUID.Zero; | ||
398 | if (UUID.TryParse(asset.ID, out uuid)) | ||
399 | { | ||
400 | asset.FullID = uuid; | ||
401 | } | ||
402 | else | ||
403 | { | ||
404 | asset.FullID = UUID.Random(); | ||
405 | } | ||
406 | } | ||
253 | 407 | ||
408 | if (m_Cache != null) | ||
409 | m_Cache.Cache(asset); | ||
410 | if (asset.Temporary || asset.Local) | ||
411 | { | ||
254 | return asset.ID; | 412 | return asset.ID; |
255 | } | 413 | } |
256 | 414 | ||
257 | string uri = m_ServerURI + "/assets/"; | 415 | string uri = MapServer(asset.FullID.ToString()) + "/assets/"; |
258 | 416 | ||
259 | string newID = string.Empty; | 417 | string newID = string.Empty; |
260 | try | 418 | try |
261 | { | 419 | { |
262 | newID = SynchronousRestObjectRequester. | 420 | newID = SynchronousRestObjectRequester. |
263 | MakeRequest<AssetBase, string>("POST", uri, asset); | 421 | MakeRequest<AssetBase, string>("POST", uri, asset, 25); |
422 | if (newID == null || newID == "") | ||
423 | { | ||
424 | newID = UUID.Zero.ToString(); | ||
425 | } | ||
264 | } | 426 | } |
265 | catch (Exception e) | 427 | catch (Exception e) |
266 | { | 428 | { |
267 | m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); | 429 | newID = UUID.Zero.ToString(); |
268 | } | 430 | } |
269 | 431 | ||
270 | if (newID != String.Empty) | 432 | if (newID == UUID.Zero.ToString()) |
271 | { | 433 | { |
272 | // Placing this here, so that this work with old asset servers that don't send any reply back | 434 | //The asset upload failed, put it in a queue for later |
273 | // SynchronousRestObjectRequester returns somethins that is not an empty string | 435 | asset.UploadAttempts++; |
274 | if (newID != null) | 436 | if (asset.UploadAttempts > 30) |
275 | asset.ID = newID; | 437 | { |
438 | //By this stage we've been in the queue for a good few hours; | ||
439 | //We're going to drop the asset. | ||
440 | m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); | ||
441 | } | ||
442 | else | ||
443 | { | ||
444 | if (!m_retryQueue.ContainsKey(asset.UploadAttempts)) | ||
445 | { | ||
446 | m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>()); | ||
447 | } | ||
448 | List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts]; | ||
449 | m_queue.Add(asset); | ||
450 | m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); | ||
451 | m_retryTimer.Start(); | ||
452 | } | ||
453 | } | ||
454 | else | ||
455 | { | ||
456 | if (asset.UploadAttempts > 0) | ||
457 | { | ||
458 | m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString()); | ||
459 | } | ||
460 | if (newID != String.Empty) | ||
461 | { | ||
462 | // Placing this here, so that this work with old asset servers that don't send any reply back | ||
463 | // SynchronousRestObjectRequester returns somethins that is not an empty string | ||
464 | if (newID != null) | ||
465 | asset.ID = newID; | ||
276 | 466 | ||
277 | if (m_Cache != null) | 467 | if (m_Cache != null) |
278 | m_Cache.Cache(asset); | 468 | m_Cache.Cache(asset); |
469 | } | ||
279 | } | 470 | } |
280 | return newID; | 471 | return asset.ID; |
281 | } | 472 | } |
282 | 473 | ||
283 | public bool UpdateContent(string id, byte[] data) | 474 | public bool UpdateContent(string id, byte[] data) |
@@ -298,7 +489,7 @@ namespace OpenSim.Services.Connectors | |||
298 | } | 489 | } |
299 | asset.Data = data; | 490 | asset.Data = data; |
300 | 491 | ||
301 | string uri = m_ServerURI + "/assets/" + id; | 492 | string uri = MapServer(id) + "/assets/" + id; |
302 | 493 | ||
303 | if (SynchronousRestObjectRequester. | 494 | if (SynchronousRestObjectRequester. |
304 | MakeRequest<AssetBase, bool>("POST", uri, asset)) | 495 | MakeRequest<AssetBase, bool>("POST", uri, asset)) |
@@ -313,7 +504,7 @@ namespace OpenSim.Services.Connectors | |||
313 | 504 | ||
314 | public bool Delete(string id) | 505 | public bool Delete(string id) |
315 | { | 506 | { |
316 | string uri = m_ServerURI + "/assets/" + id; | 507 | string uri = MapServer(id) + "/assets/" + id; |
317 | 508 | ||
318 | if (SynchronousRestObjectRequester. | 509 | if (SynchronousRestObjectRequester. |
319 | MakeRequest<int, bool>("DELETE", uri, 0)) | 510 | MakeRequest<int, bool>("DELETE", uri, 0)) |