aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs228
1 files changed, 193 insertions, 35 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index e4c3eaf..45ebf3a 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -30,6 +30,7 @@ using System;
30using System.Collections.Generic; 30using System.Collections.Generic;
31using System.IO; 31using System.IO;
32using System.Reflection; 32using System.Reflection;
33using System.Timers;
33using Nini.Config; 34using Nini.Config;
34using OpenSim.Framework; 35using OpenSim.Framework;
35using OpenSim.Framework.Console; 36using OpenSim.Framework.Console;
@@ -47,13 +48,18 @@ namespace OpenSim.Services.Connectors
47 48
48 private string m_ServerURI = String.Empty; 49 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 50 private IImprovedAssetCache m_Cache = null;
50 51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private Timer m_retryTimer;
51 private delegate void AssetRetrievedEx(AssetBase asset); 54 private delegate void AssetRetrievedEx(AssetBase asset);
52 55
53 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 56 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
54 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 57 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
55 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 58// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
59
60 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
56 61
62 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
57 63
58 public AssetServicesConnector() 64 public AssetServicesConnector()
59 { 65 {
@@ -81,13 +87,91 @@ namespace OpenSim.Services.Connectors
81 string serviceURI = assetConfig.GetString("AssetServerURI", 87 string serviceURI = assetConfig.GetString("AssetServerURI",
82 String.Empty); 88 String.Empty);
83 89
90 m_ServerURI = serviceURI;
91
84 if (serviceURI == String.Empty) 92 if (serviceURI == String.Empty)
85 { 93 {
86 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 94 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
87 throw new Exception("Asset connector init error"); 95 throw new Exception("Asset connector init error");
88 } 96 }
89 97
90 m_ServerURI = serviceURI; 98
99 m_retryTimer = new Timer();
100 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
101 m_retryTimer.Interval = 60000;
102
103 Uri serverUri = new Uri(m_ServerURI);
104
105 string groupHost = serverUri.Host;
106
107 for (int i = 0 ; i < 256 ; i++)
108 {
109 string prefix = i.ToString("x2");
110 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
111
112 m_UriMap[prefix] = groupHost;
113 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
114 }
115 }
116
117 private string MapServer(string id)
118 {
119 UriBuilder serverUri = new UriBuilder(m_ServerURI);
120
121 string prefix = id.Substring(0, 2).ToLower();
122
123 string host = m_UriMap[prefix];
124
125 serverUri.Host = host;
126
127 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
128
129 return serverUri.Uri.AbsoluteUri;
130 }
131
132 protected void retryCheck(object source, ElapsedEventArgs e)
133 {
134 m_retryCounter++;
135 if (m_retryCounter > 60) m_retryCounter -= 60;
136 List<int> keys = new List<int>();
137 foreach (int a in m_retryQueue.Keys)
138 {
139 keys.Add(a);
140 }
141 foreach (int a in keys)
142 {
143 //We exponentially fall back on frequency until we reach one attempt per hour
144 //The net result is that we end up in the queue for roughly 24 hours..
145 //24 hours worth of assets could be a lot, so the hope is that the region admin
146 //will have gotten the asset connector back online quickly!
147
148 int timefactor = a ^ 2;
149 if (timefactor > 60)
150 {
151 timefactor = 60;
152 }
153
154 //First, find out if we care about this timefactor
155 if (timefactor % a == 0)
156 {
157 //Yes, we do!
158 List<AssetBase> retrylist = m_retryQueue[a];
159 m_retryQueue.Remove(a);
160
161 foreach(AssetBase ass in retrylist)
162 {
163 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
164 }
165 }
166 }
167
168 if (m_retryQueue.Count == 0)
169 {
170 //It might only be one tick per minute, but I have
171 //repented and abandoned my wasteful ways
172 m_retryCounter = 0;
173 m_retryTimer.Stop();
174 }
91 } 175 }
92 176
93 protected void SetCache(IImprovedAssetCache cache) 177 protected void SetCache(IImprovedAssetCache cache)
@@ -97,15 +181,13 @@ namespace OpenSim.Services.Connectors
97 181
98 public AssetBase Get(string id) 182 public AssetBase Get(string id)
99 { 183 {
100// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 184 string uri = MapServer(id) + "/assets/" + id;
101
102 string uri = m_ServerURI + "/assets/" + id;
103 185
104 AssetBase asset = null; 186 AssetBase asset = null;
105 if (m_Cache != null) 187 if (m_Cache != null)
106 asset = m_Cache.Get(id); 188 asset = m_Cache.Get(id);
107 189
108 if (asset == null) 190 if (asset == null || asset.Data == null || asset.Data.Length == 0)
109 { 191 {
110 asset = SynchronousRestObjectRequester. 192 asset = SynchronousRestObjectRequester.
111 MakeRequest<int, AssetBase>("GET", uri, 0); 193 MakeRequest<int, AssetBase>("GET", uri, 0);
@@ -136,7 +218,7 @@ namespace OpenSim.Services.Connectors
136 return fullAsset.Metadata; 218 return fullAsset.Metadata;
137 } 219 }
138 220
139 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 221 string uri = MapServer(id) + "/assets/" + id + "/metadata";
140 222
141 AssetMetadata asset = SynchronousRestObjectRequester. 223 AssetMetadata asset = SynchronousRestObjectRequester.
142 MakeRequest<int, AssetMetadata>("GET", uri, 0); 224 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -153,7 +235,7 @@ namespace OpenSim.Services.Connectors
153 return fullAsset.Data; 235 return fullAsset.Data;
154 } 236 }
155 237
156 RestClient rc = new RestClient(m_ServerURI); 238 RestClient rc = new RestClient(MapServer(id));
157 rc.AddResourcePath("assets"); 239 rc.AddResourcePath("assets");
158 rc.AddResourcePath(id); 240 rc.AddResourcePath(id);
159 rc.AddResourcePath("data"); 241 rc.AddResourcePath("data");
@@ -178,30 +260,33 @@ namespace OpenSim.Services.Connectors
178 260
179 public bool Get(string id, Object sender, AssetRetrieved handler) 261 public bool Get(string id, Object sender, AssetRetrieved handler)
180 { 262 {
181// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 263 string uri = MapServer(id) + "/assets/" + id;
182
183 string uri = m_ServerURI + "/assets/" + id;
184 264
185 AssetBase asset = null; 265 AssetBase asset = null;
186 if (m_Cache != null) 266 if (m_Cache != null)
187 asset = m_Cache.Get(id); 267 asset = m_Cache.Get(id);
188 268
189 if (asset == null) 269 if (asset == null || asset.Data == null || asset.Data.Length == 0)
190 { 270 {
191 lock (m_AssetHandlers) 271 lock (m_AssetHandlers)
192 { 272 {
193 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); 273 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
194 274
195 AssetRetrievedEx handlers; 275// AssetRetrievedEx handlers;
276 List<AssetRetrievedEx> handlers;
196 if (m_AssetHandlers.TryGetValue(id, out handlers)) 277 if (m_AssetHandlers.TryGetValue(id, out handlers))
197 { 278 {
198 // Someone else is already loading this asset. It will notify our handler when done. 279 // Someone else is already loading this asset. It will notify our handler when done.
199 handlers += handlerEx; 280// handlers += handlerEx;
281 handlers.Add(handlerEx);
200 return true; 282 return true;
201 } 283 }
202 284
203 // Load the asset ourselves 285 // Load the asset ourselves
204 handlers += handlerEx; 286// handlers += handlerEx;
287 handlers = new List<AssetRetrievedEx>();
288 handlers.Add(handlerEx);
289
205 m_AssetHandlers.Add(id, handlers); 290 m_AssetHandlers.Add(id, handlers);
206 } 291 }
207 292
@@ -213,14 +298,26 @@ namespace OpenSim.Services.Connectors
213 { 298 {
214 if (m_Cache != null) 299 if (m_Cache != null)
215 m_Cache.Cache(a); 300 m_Cache.Cache(a);
216 301/*
217 AssetRetrievedEx handlers; 302 AssetRetrievedEx handlers;
218 lock (m_AssetHandlers) 303 lock (m_AssetHandlers)
219 { 304 {
220 handlers = m_AssetHandlers[id]; 305 handlers = m_AssetHandlers[id];
221 m_AssetHandlers.Remove(id); 306 m_AssetHandlers.Remove(id);
222 } 307 }
308
223 handlers.Invoke(a); 309 handlers.Invoke(a);
310*/
311 List<AssetRetrievedEx> handlers;
312 lock (m_AssetHandlers)
313 {
314 handlers = m_AssetHandlers[id];
315 m_AssetHandlers.Remove(id);
316 }
317 foreach (AssetRetrievedEx h in handlers)
318 h.Invoke(a);
319 if (handlers != null)
320 handlers.Clear();
224 }); 321 });
225 322
226 success = true; 323 success = true;
@@ -229,10 +326,14 @@ namespace OpenSim.Services.Connectors
229 { 326 {
230 if (!success) 327 if (!success)
231 { 328 {
329 List<AssetRetrievedEx> handlers;
232 lock (m_AssetHandlers) 330 lock (m_AssetHandlers)
233 { 331 {
332 handlers = m_AssetHandlers[id];
234 m_AssetHandlers.Remove(id); 333 m_AssetHandlers.Remove(id);
235 } 334 }
335 if (handlers != null)
336 handlers.Clear();
236 } 337 }
237 } 338 }
238 } 339 }
@@ -246,38 +347,95 @@ namespace OpenSim.Services.Connectors
246 347
247 public string Store(AssetBase asset) 348 public string Store(AssetBase asset)
248 { 349 {
249 if (asset.Temporary || asset.Local) 350 // Have to assign the asset ID here. This isn't likely to
351 // trigger since current callers don't pass emtpy IDs
352 // We need the asset ID to route the request to the proper
353 // cluster member, so we can't have the server assign one.
354 if (asset.ID == string.Empty)
250 { 355 {
251 if (m_Cache != null) 356 if (asset.FullID == UUID.Zero)
252 m_Cache.Cache(asset); 357 {
358 asset.FullID = UUID.Random();
359 }
360 asset.ID = asset.FullID.ToString();
361 }
362 else if (asset.FullID == UUID.Zero)
363 {
364 UUID uuid = UUID.Zero;
365 if (UUID.TryParse(asset.ID, out uuid))
366 {
367 asset.FullID = uuid;
368 }
369 else
370 {
371 asset.FullID = UUID.Random();
372 }
373 }
253 374
375 if (m_Cache != null)
376 m_Cache.Cache(asset);
377 if (asset.Temporary || asset.Local)
378 {
254 return asset.ID; 379 return asset.ID;
255 } 380 }
256 381
257 string uri = m_ServerURI + "/assets/"; 382 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
258 383
259 string newID = string.Empty; 384 string newID = string.Empty;
260 try 385 try
261 { 386 {
262 newID = SynchronousRestObjectRequester. 387 newID = SynchronousRestObjectRequester.
263 MakeRequest<AssetBase, string>("POST", uri, asset); 388 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
389 if (newID == null || newID == "")
390 {
391 newID = UUID.Zero.ToString();
392 }
264 } 393 }
265 catch (Exception e) 394 catch (Exception e)
266 { 395 {
267 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 396 newID = UUID.Zero.ToString();
268 } 397 }
269 398
270 if (newID != String.Empty) 399 if (newID == UUID.Zero.ToString())
271 { 400 {
272 // Placing this here, so that this work with old asset servers that don't send any reply back 401 //The asset upload failed, put it in a queue for later
273 // SynchronousRestObjectRequester returns somethins that is not an empty string 402 asset.UploadAttempts++;
274 if (newID != null) 403 if (asset.UploadAttempts > 30)
275 asset.ID = newID; 404 {
405 //By this stage we've been in the queue for a good few hours;
406 //We're going to drop the asset.
407 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
408 }
409 else
410 {
411 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
412 {
413 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
414 }
415 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
416 m_queue.Add(asset);
417 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
418 m_retryTimer.Start();
419 }
420 }
421 else
422 {
423 if (asset.UploadAttempts > 0)
424 {
425 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
426 }
427 if (newID != String.Empty)
428 {
429 // Placing this here, so that this work with old asset servers that don't send any reply back
430 // SynchronousRestObjectRequester returns somethins that is not an empty string
431 if (newID != null)
432 asset.ID = newID;
276 433
277 if (m_Cache != null) 434 if (m_Cache != null)
278 m_Cache.Cache(asset); 435 m_Cache.Cache(asset);
436 }
279 } 437 }
280 return newID; 438 return asset.ID;
281 } 439 }
282 440
283 public bool UpdateContent(string id, byte[] data) 441 public bool UpdateContent(string id, byte[] data)
@@ -298,7 +456,7 @@ namespace OpenSim.Services.Connectors
298 } 456 }
299 asset.Data = data; 457 asset.Data = data;
300 458
301 string uri = m_ServerURI + "/assets/" + id; 459 string uri = MapServer(id) + "/assets/" + id;
302 460
303 if (SynchronousRestObjectRequester. 461 if (SynchronousRestObjectRequester.
304 MakeRequest<AssetBase, bool>("POST", uri, asset)) 462 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -313,7 +471,7 @@ namespace OpenSim.Services.Connectors
313 471
314 public bool Delete(string id) 472 public bool Delete(string id)
315 { 473 {
316 string uri = m_ServerURI + "/assets/" + id; 474 string uri = MapServer(id) + "/assets/" + id;
317 475
318 if (SynchronousRestObjectRequester. 476 if (SynchronousRestObjectRequester.
319 MakeRequest<int, bool>("DELETE", uri, 0)) 477 MakeRequest<int, bool>("DELETE", uri, 0))
@@ -326,4 +484,4 @@ namespace OpenSim.Services.Connectors
326 return false; 484 return false;
327 } 485 }
328 } 486 }
329} \ No newline at end of file 487}