aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs235
1 files changed, 198 insertions, 37 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
index e4c3eaf..daf38bc 100644
--- a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -30,6 +30,7 @@ using System;
30using System.Collections.Generic; 30using System.Collections.Generic;
31using System.IO; 31using System.IO;
32using System.Reflection; 32using System.Reflection;
33using System.Timers;
33using Nini.Config; 34using Nini.Config;
34using OpenSim.Framework; 35using OpenSim.Framework;
35using OpenSim.Framework.Console; 36using OpenSim.Framework.Console;
@@ -47,13 +48,18 @@ namespace OpenSim.Services.Connectors
47 48
48 private string m_ServerURI = String.Empty; 49 private string m_ServerURI = String.Empty;
49 private IImprovedAssetCache m_Cache = null; 50 private IImprovedAssetCache m_Cache = null;
50 51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private Timer m_retryTimer;
51 private delegate void AssetRetrievedEx(AssetBase asset); 54 private delegate void AssetRetrievedEx(AssetBase asset);
52 55
53 // Keeps track of concurrent requests for the same asset, so that it's only loaded once. 56 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
54 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded 57 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
55 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); 58// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
59
60 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
56 61
62 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
57 63
58 public AssetServicesConnector() 64 public AssetServicesConnector()
59 { 65 {
@@ -81,13 +87,94 @@ namespace OpenSim.Services.Connectors
81 string serviceURI = assetConfig.GetString("AssetServerURI", 87 string serviceURI = assetConfig.GetString("AssetServerURI",
82 String.Empty); 88 String.Empty);
83 89
90 m_ServerURI = serviceURI;
91
84 if (serviceURI == String.Empty) 92 if (serviceURI == String.Empty)
85 { 93 {
86 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); 94 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
87 throw new Exception("Asset connector init error"); 95 throw new Exception("Asset connector init error");
88 } 96 }
89 97
90 m_ServerURI = serviceURI; 98
99 m_retryTimer = new Timer();
100 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
101 m_retryTimer.Interval = 60000;
102
103 Uri serverUri = new Uri(m_ServerURI);
104
105 string groupHost = serverUri.Host;
106
107 for (int i = 0 ; i < 256 ; i++)
108 {
109 string prefix = i.ToString("x2");
110 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
111
112 m_UriMap[prefix] = groupHost;
113 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
114 }
115 }
116
117 private string MapServer(string id)
118 {
119 UriBuilder serverUri = new UriBuilder(m_ServerURI);
120
121 string prefix = id.Substring(0, 2).ToLower();
122
123 string host = m_UriMap[prefix];
124
125 serverUri.Host = host;
126
127 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
128
129 string ret = serverUri.Uri.AbsoluteUri;
130 if (ret.EndsWith("/"))
131 ret = ret.Substring(0, ret.Length - 1);
132 return ret;
133 }
134
135 protected void retryCheck(object source, ElapsedEventArgs e)
136 {
137 m_retryCounter++;
138 if (m_retryCounter > 60) m_retryCounter -= 60;
139 List<int> keys = new List<int>();
140 foreach (int a in m_retryQueue.Keys)
141 {
142 keys.Add(a);
143 }
144 foreach (int a in keys)
145 {
146 //We exponentially fall back on frequency until we reach one attempt per hour
147 //The net result is that we end up in the queue for roughly 24 hours..
148 //24 hours worth of assets could be a lot, so the hope is that the region admin
149 //will have gotten the asset connector back online quickly!
150
151 int timefactor = a ^ 2;
152 if (timefactor > 60)
153 {
154 timefactor = 60;
155 }
156
157 //First, find out if we care about this timefactor
158 if (timefactor % a == 0)
159 {
160 //Yes, we do!
161 List<AssetBase> retrylist = m_retryQueue[a];
162 m_retryQueue.Remove(a);
163
164 foreach(AssetBase ass in retrylist)
165 {
166 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
167 }
168 }
169 }
170
171 if (m_retryQueue.Count == 0)
172 {
173 //It might only be one tick per minute, but I have
174 //repented and abandoned my wasteful ways
175 m_retryCounter = 0;
176 m_retryTimer.Stop();
177 }
91 } 178 }
92 179
93 protected void SetCache(IImprovedAssetCache cache) 180 protected void SetCache(IImprovedAssetCache cache)
@@ -97,18 +184,16 @@ namespace OpenSim.Services.Connectors
97 184
98 public AssetBase Get(string id) 185 public AssetBase Get(string id)
99 { 186 {
100// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Synchronous get request for {0}", id); 187 string uri = MapServer(id) + "/assets/" + id;
101
102 string uri = m_ServerURI + "/assets/" + id;
103 188
104 AssetBase asset = null; 189 AssetBase asset = null;
105 if (m_Cache != null) 190 if (m_Cache != null)
106 asset = m_Cache.Get(id); 191 asset = m_Cache.Get(id);
107 192
108 if (asset == null) 193 if (asset == null || asset.Data == null || asset.Data.Length == 0)
109 { 194 {
110 asset = SynchronousRestObjectRequester. 195 asset = SynchronousRestObjectRequester.
111 MakeRequest<int, AssetBase>("GET", uri, 0); 196 MakeRequest<int, AssetBase>("GET", uri, 0, 30);
112 197
113 if (m_Cache != null) 198 if (m_Cache != null)
114 m_Cache.Cache(asset); 199 m_Cache.Cache(asset);
@@ -136,7 +221,7 @@ namespace OpenSim.Services.Connectors
136 return fullAsset.Metadata; 221 return fullAsset.Metadata;
137 } 222 }
138 223
139 string uri = m_ServerURI + "/assets/" + id + "/metadata"; 224 string uri = MapServer(id) + "/assets/" + id + "/metadata";
140 225
141 AssetMetadata asset = SynchronousRestObjectRequester. 226 AssetMetadata asset = SynchronousRestObjectRequester.
142 MakeRequest<int, AssetMetadata>("GET", uri, 0); 227 MakeRequest<int, AssetMetadata>("GET", uri, 0);
@@ -153,7 +238,7 @@ namespace OpenSim.Services.Connectors
153 return fullAsset.Data; 238 return fullAsset.Data;
154 } 239 }
155 240
156 RestClient rc = new RestClient(m_ServerURI); 241 RestClient rc = new RestClient(MapServer(id));
157 rc.AddResourcePath("assets"); 242 rc.AddResourcePath("assets");
158 rc.AddResourcePath(id); 243 rc.AddResourcePath(id);
159 rc.AddResourcePath("data"); 244 rc.AddResourcePath("data");
@@ -178,30 +263,33 @@ namespace OpenSim.Services.Connectors
178 263
179 public bool Get(string id, Object sender, AssetRetrieved handler) 264 public bool Get(string id, Object sender, AssetRetrieved handler)
180 { 265 {
181// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Potentially asynchronous get request for {0}", id); 266 string uri = MapServer(id) + "/assets/" + id;
182
183 string uri = m_ServerURI + "/assets/" + id;
184 267
185 AssetBase asset = null; 268 AssetBase asset = null;
186 if (m_Cache != null) 269 if (m_Cache != null)
187 asset = m_Cache.Get(id); 270 asset = m_Cache.Get(id);
188 271
189 if (asset == null) 272 if (asset == null || asset.Data == null || asset.Data.Length == 0)
190 { 273 {
191 lock (m_AssetHandlers) 274 lock (m_AssetHandlers)
192 { 275 {
193 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); 276 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
194 277
195 AssetRetrievedEx handlers; 278// AssetRetrievedEx handlers;
279 List<AssetRetrievedEx> handlers;
196 if (m_AssetHandlers.TryGetValue(id, out handlers)) 280 if (m_AssetHandlers.TryGetValue(id, out handlers))
197 { 281 {
198 // Someone else is already loading this asset. It will notify our handler when done. 282 // Someone else is already loading this asset. It will notify our handler when done.
199 handlers += handlerEx; 283// handlers += handlerEx;
284 handlers.Add(handlerEx);
200 return true; 285 return true;
201 } 286 }
202 287
203 // Load the asset ourselves 288 // Load the asset ourselves
204 handlers += handlerEx; 289// handlers += handlerEx;
290 handlers = new List<AssetRetrievedEx>();
291 handlers.Add(handlerEx);
292
205 m_AssetHandlers.Add(id, handlers); 293 m_AssetHandlers.Add(id, handlers);
206 } 294 }
207 295
@@ -213,15 +301,27 @@ namespace OpenSim.Services.Connectors
213 { 301 {
214 if (m_Cache != null) 302 if (m_Cache != null)
215 m_Cache.Cache(a); 303 m_Cache.Cache(a);
216 304/*
217 AssetRetrievedEx handlers; 305 AssetRetrievedEx handlers;
218 lock (m_AssetHandlers) 306 lock (m_AssetHandlers)
219 { 307 {
220 handlers = m_AssetHandlers[id]; 308 handlers = m_AssetHandlers[id];
221 m_AssetHandlers.Remove(id); 309 m_AssetHandlers.Remove(id);
222 } 310 }
311
223 handlers.Invoke(a); 312 handlers.Invoke(a);
224 }); 313*/
314 List<AssetRetrievedEx> handlers;
315 lock (m_AssetHandlers)
316 {
317 handlers = m_AssetHandlers[id];
318 m_AssetHandlers.Remove(id);
319 }
320 foreach (AssetRetrievedEx h in handlers)
321 h.Invoke(a);
322 if (handlers != null)
323 handlers.Clear();
324 }, 30);
225 325
226 success = true; 326 success = true;
227 } 327 }
@@ -229,10 +329,14 @@ namespace OpenSim.Services.Connectors
229 { 329 {
230 if (!success) 330 if (!success)
231 { 331 {
332 List<AssetRetrievedEx> handlers;
232 lock (m_AssetHandlers) 333 lock (m_AssetHandlers)
233 { 334 {
335 handlers = m_AssetHandlers[id];
234 m_AssetHandlers.Remove(id); 336 m_AssetHandlers.Remove(id);
235 } 337 }
338 if (handlers != null)
339 handlers.Clear();
236 } 340 }
237 } 341 }
238 } 342 }
@@ -246,38 +350,95 @@ namespace OpenSim.Services.Connectors
246 350
247 public string Store(AssetBase asset) 351 public string Store(AssetBase asset)
248 { 352 {
249 if (asset.Temporary || asset.Local) 353 // Have to assign the asset ID here. This isn't likely to
354 // trigger since current callers don't pass emtpy IDs
355 // We need the asset ID to route the request to the proper
356 // cluster member, so we can't have the server assign one.
357 if (asset.ID == string.Empty)
250 { 358 {
251 if (m_Cache != null) 359 if (asset.FullID == UUID.Zero)
252 m_Cache.Cache(asset); 360 {
361 asset.FullID = UUID.Random();
362 }
363 asset.ID = asset.FullID.ToString();
364 }
365 else if (asset.FullID == UUID.Zero)
366 {
367 UUID uuid = UUID.Zero;
368 if (UUID.TryParse(asset.ID, out uuid))
369 {
370 asset.FullID = uuid;
371 }
372 else
373 {
374 asset.FullID = UUID.Random();
375 }
376 }
253 377
378 if (m_Cache != null)
379 m_Cache.Cache(asset);
380 if (asset.Temporary || asset.Local)
381 {
254 return asset.ID; 382 return asset.ID;
255 } 383 }
256 384
257 string uri = m_ServerURI + "/assets/"; 385 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
258 386
259 string newID = string.Empty; 387 string newID = string.Empty;
260 try 388 try
261 { 389 {
262 newID = SynchronousRestObjectRequester. 390 newID = SynchronousRestObjectRequester.
263 MakeRequest<AssetBase, string>("POST", uri, asset); 391 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
392 if (newID == null || newID == "")
393 {
394 newID = UUID.Zero.ToString();
395 }
264 } 396 }
265 catch (Exception e) 397 catch (Exception e)
266 { 398 {
267 m_log.WarnFormat("[ASSET CONNECTOR]: Unable to send asset {0} to asset server. Reason: {1}", asset.ID, e.Message); 399 newID = UUID.Zero.ToString();
268 } 400 }
269 401
270 if (newID != String.Empty) 402 if (newID == UUID.Zero.ToString())
271 { 403 {
272 // Placing this here, so that this work with old asset servers that don't send any reply back 404 //The asset upload failed, put it in a queue for later
273 // SynchronousRestObjectRequester returns somethins that is not an empty string 405 asset.UploadAttempts++;
274 if (newID != null) 406 if (asset.UploadAttempts > 30)
275 asset.ID = newID; 407 {
408 //By this stage we've been in the queue for a good few hours;
409 //We're going to drop the asset.
410 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
411 }
412 else
413 {
414 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
415 {
416 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
417 }
418 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
419 m_queue.Add(asset);
420 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
421 m_retryTimer.Start();
422 }
423 }
424 else
425 {
426 if (asset.UploadAttempts > 0)
427 {
428 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
429 }
430 if (newID != String.Empty)
431 {
432 // Placing this here, so that this work with old asset servers that don't send any reply back
433 // SynchronousRestObjectRequester returns somethins that is not an empty string
434 if (newID != null)
435 asset.ID = newID;
276 436
277 if (m_Cache != null) 437 if (m_Cache != null)
278 m_Cache.Cache(asset); 438 m_Cache.Cache(asset);
439 }
279 } 440 }
280 return newID; 441 return asset.ID;
281 } 442 }
282 443
283 public bool UpdateContent(string id, byte[] data) 444 public bool UpdateContent(string id, byte[] data)
@@ -298,7 +459,7 @@ namespace OpenSim.Services.Connectors
298 } 459 }
299 asset.Data = data; 460 asset.Data = data;
300 461
301 string uri = m_ServerURI + "/assets/" + id; 462 string uri = MapServer(id) + "/assets/" + id;
302 463
303 if (SynchronousRestObjectRequester. 464 if (SynchronousRestObjectRequester.
304 MakeRequest<AssetBase, bool>("POST", uri, asset)) 465 MakeRequest<AssetBase, bool>("POST", uri, asset))
@@ -313,7 +474,7 @@ namespace OpenSim.Services.Connectors
313 474
314 public bool Delete(string id) 475 public bool Delete(string id)
315 { 476 {
316 string uri = m_ServerURI + "/assets/" + id; 477 string uri = MapServer(id) + "/assets/" + id;
317 478
318 if (SynchronousRestObjectRequester. 479 if (SynchronousRestObjectRequester.
319 MakeRequest<int, bool>("DELETE", uri, 0)) 480 MakeRequest<int, bool>("DELETE", uri, 0))
@@ -326,4 +487,4 @@ namespace OpenSim.Services.Connectors
326 return false; 487 return false;
327 } 488 }
328 } 489 }
329} \ No newline at end of file 490}