aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs487
1 files changed, 487 insertions, 0 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
new file mode 100644
index 0000000..45ebf3a
--- /dev/null
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -0,0 +1,487 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using log4net;
29using System;
30using System.Collections.Generic;
31using System.IO;
32using System.Reflection;
33using System.Timers;
34using Nini.Config;
35using OpenSim.Framework;
36using OpenSim.Framework.Console;
37using OpenSim.Framework.Communications;
38using OpenSim.Services.Interfaces;
39using OpenMetaverse;
40
41namespace OpenSim.Services.Connectors
42{
43 public class AssetServicesConnector : IAssetService
44 {
45 private static readonly ILog m_log =
46 LogManager.GetLogger(
47 MethodBase.GetCurrentMethod().DeclaringType);
48
49 private string m_ServerURI = String.Empty;
50 private IImprovedAssetCache m_Cache = null;
51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private Timer m_retryTimer;
54 private delegate void AssetRetrievedEx(AssetBase asset);
55
56 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
57 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
58// private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
59
60 private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
61
62 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
63
64 public AssetServicesConnector()
65 {
66 }
67
68 public AssetServicesConnector(string serverURI)
69 {
70 m_ServerURI = serverURI.TrimEnd('/');
71 }
72
73 public AssetServicesConnector(IConfigSource source)
74 {
75 Initialise(source);
76 }
77
78 public virtual void Initialise(IConfigSource source)
79 {
80 IConfig assetConfig = source.Configs["AssetService"];
81 if (assetConfig == null)
82 {
83 m_log.Error("[ASSET CONNECTOR]: AssetService missing from OpenSim.ini");
84 throw new Exception("Asset connector init error");
85 }
86
87 string serviceURI = assetConfig.GetString("AssetServerURI",
88 String.Empty);
89
90 m_ServerURI = serviceURI;
91
92 if (serviceURI == String.Empty)
93 {
94 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
95 throw new Exception("Asset connector init error");
96 }
97
98
99 m_retryTimer = new Timer();
100 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
101 m_retryTimer.Interval = 60000;
102
103 Uri serverUri = new Uri(m_ServerURI);
104
105 string groupHost = serverUri.Host;
106
107 for (int i = 0 ; i < 256 ; i++)
108 {
109 string prefix = i.ToString("x2");
110 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
111
112 m_UriMap[prefix] = groupHost;
113 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
114 }
115 }
116
117 private string MapServer(string id)
118 {
119 UriBuilder serverUri = new UriBuilder(m_ServerURI);
120
121 string prefix = id.Substring(0, 2).ToLower();
122
123 string host = m_UriMap[prefix];
124
125 serverUri.Host = host;
126
127 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
128
129 return serverUri.Uri.AbsoluteUri;
130 }
131
132 protected void retryCheck(object source, ElapsedEventArgs e)
133 {
134 m_retryCounter++;
135 if (m_retryCounter > 60) m_retryCounter -= 60;
136 List<int> keys = new List<int>();
137 foreach (int a in m_retryQueue.Keys)
138 {
139 keys.Add(a);
140 }
141 foreach (int a in keys)
142 {
143 //We exponentially fall back on frequency until we reach one attempt per hour
144 //The net result is that we end up in the queue for roughly 24 hours..
145 //24 hours worth of assets could be a lot, so the hope is that the region admin
146 //will have gotten the asset connector back online quickly!
147
148 int timefactor = a ^ 2;
149 if (timefactor > 60)
150 {
151 timefactor = 60;
152 }
153
154 //First, find out if we care about this timefactor
155 if (timefactor % a == 0)
156 {
157 //Yes, we do!
158 List<AssetBase> retrylist = m_retryQueue[a];
159 m_retryQueue.Remove(a);
160
161 foreach(AssetBase ass in retrylist)
162 {
163 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
164 }
165 }
166 }
167
168 if (m_retryQueue.Count == 0)
169 {
170 //It might only be one tick per minute, but I have
171 //repented and abandoned my wasteful ways
172 m_retryCounter = 0;
173 m_retryTimer.Stop();
174 }
175 }
176
177 protected void SetCache(IImprovedAssetCache cache)
178 {
179 m_Cache = cache;
180 }
181
182 public AssetBase Get(string id)
183 {
184 string uri = MapServer(id) + "/assets/" + id;
185
186 AssetBase asset = null;
187 if (m_Cache != null)
188 asset = m_Cache.Get(id);
189
190 if (asset == null || asset.Data == null || asset.Data.Length == 0)
191 {
192 asset = SynchronousRestObjectRequester.
193 MakeRequest<int, AssetBase>("GET", uri, 0);
194
195 if (m_Cache != null)
196 m_Cache.Cache(asset);
197 }
198 return asset;
199 }
200
201 public AssetBase GetCached(string id)
202 {
203// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Cache request for {0}", id);
204
205 if (m_Cache != null)
206 return m_Cache.Get(id);
207
208 return null;
209 }
210
211 public AssetMetadata GetMetadata(string id)
212 {
213 if (m_Cache != null)
214 {
215 AssetBase fullAsset = m_Cache.Get(id);
216
217 if (fullAsset != null)
218 return fullAsset.Metadata;
219 }
220
221 string uri = MapServer(id) + "/assets/" + id + "/metadata";
222
223 AssetMetadata asset = SynchronousRestObjectRequester.
224 MakeRequest<int, AssetMetadata>("GET", uri, 0);
225 return asset;
226 }
227
228 public byte[] GetData(string id)
229 {
230 if (m_Cache != null)
231 {
232 AssetBase fullAsset = m_Cache.Get(id);
233
234 if (fullAsset != null)
235 return fullAsset.Data;
236 }
237
238 RestClient rc = new RestClient(MapServer(id));
239 rc.AddResourcePath("assets");
240 rc.AddResourcePath(id);
241 rc.AddResourcePath("data");
242
243 rc.RequestMethod = "GET";
244
245 Stream s = rc.Request();
246
247 if (s == null)
248 return null;
249
250 if (s.Length > 0)
251 {
252 byte[] ret = new byte[s.Length];
253 s.Read(ret, 0, (int)s.Length);
254
255 return ret;
256 }
257
258 return null;
259 }
260
261 public bool Get(string id, Object sender, AssetRetrieved handler)
262 {
263 string uri = MapServer(id) + "/assets/" + id;
264
265 AssetBase asset = null;
266 if (m_Cache != null)
267 asset = m_Cache.Get(id);
268
269 if (asset == null || asset.Data == null || asset.Data.Length == 0)
270 {
271 lock (m_AssetHandlers)
272 {
273 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
274
275// AssetRetrievedEx handlers;
276 List<AssetRetrievedEx> handlers;
277 if (m_AssetHandlers.TryGetValue(id, out handlers))
278 {
279 // Someone else is already loading this asset. It will notify our handler when done.
280// handlers += handlerEx;
281 handlers.Add(handlerEx);
282 return true;
283 }
284
285 // Load the asset ourselves
286// handlers += handlerEx;
287 handlers = new List<AssetRetrievedEx>();
288 handlers.Add(handlerEx);
289
290 m_AssetHandlers.Add(id, handlers);
291 }
292
293 bool success = false;
294 try
295 {
296 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0,
297 delegate(AssetBase a)
298 {
299 if (m_Cache != null)
300 m_Cache.Cache(a);
301/*
302 AssetRetrievedEx handlers;
303 lock (m_AssetHandlers)
304 {
305 handlers = m_AssetHandlers[id];
306 m_AssetHandlers.Remove(id);
307 }
308
309 handlers.Invoke(a);
310*/
311 List<AssetRetrievedEx> handlers;
312 lock (m_AssetHandlers)
313 {
314 handlers = m_AssetHandlers[id];
315 m_AssetHandlers.Remove(id);
316 }
317 foreach (AssetRetrievedEx h in handlers)
318 h.Invoke(a);
319 if (handlers != null)
320 handlers.Clear();
321 });
322
323 success = true;
324 }
325 finally
326 {
327 if (!success)
328 {
329 List<AssetRetrievedEx> handlers;
330 lock (m_AssetHandlers)
331 {
332 handlers = m_AssetHandlers[id];
333 m_AssetHandlers.Remove(id);
334 }
335 if (handlers != null)
336 handlers.Clear();
337 }
338 }
339 }
340 else
341 {
342 handler(id, sender, asset);
343 }
344
345 return true;
346 }
347
348 public string Store(AssetBase asset)
349 {
350 // Have to assign the asset ID here. This isn't likely to
351 // trigger since current callers don't pass emtpy IDs
352 // We need the asset ID to route the request to the proper
353 // cluster member, so we can't have the server assign one.
354 if (asset.ID == string.Empty)
355 {
356 if (asset.FullID == UUID.Zero)
357 {
358 asset.FullID = UUID.Random();
359 }
360 asset.ID = asset.FullID.ToString();
361 }
362 else if (asset.FullID == UUID.Zero)
363 {
364 UUID uuid = UUID.Zero;
365 if (UUID.TryParse(asset.ID, out uuid))
366 {
367 asset.FullID = uuid;
368 }
369 else
370 {
371 asset.FullID = UUID.Random();
372 }
373 }
374
375 if (m_Cache != null)
376 m_Cache.Cache(asset);
377 if (asset.Temporary || asset.Local)
378 {
379 return asset.ID;
380 }
381
382 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
383
384 string newID = string.Empty;
385 try
386 {
387 newID = SynchronousRestObjectRequester.
388 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
389 if (newID == null || newID == "")
390 {
391 newID = UUID.Zero.ToString();
392 }
393 }
394 catch (Exception e)
395 {
396 newID = UUID.Zero.ToString();
397 }
398
399 if (newID == UUID.Zero.ToString())
400 {
401 //The asset upload failed, put it in a queue for later
402 asset.UploadAttempts++;
403 if (asset.UploadAttempts > 30)
404 {
405 //By this stage we've been in the queue for a good few hours;
406 //We're going to drop the asset.
407 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
408 }
409 else
410 {
411 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
412 {
413 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
414 }
415 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
416 m_queue.Add(asset);
417 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
418 m_retryTimer.Start();
419 }
420 }
421 else
422 {
423 if (asset.UploadAttempts > 0)
424 {
425 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
426 }
427 if (newID != String.Empty)
428 {
429 // Placing this here, so that this work with old asset servers that don't send any reply back
430 // SynchronousRestObjectRequester returns somethins that is not an empty string
431 if (newID != null)
432 asset.ID = newID;
433
434 if (m_Cache != null)
435 m_Cache.Cache(asset);
436 }
437 }
438 return asset.ID;
439 }
440
441 public bool UpdateContent(string id, byte[] data)
442 {
443 AssetBase asset = null;
444
445 if (m_Cache != null)
446 asset = m_Cache.Get(id);
447
448 if (asset == null)
449 {
450 AssetMetadata metadata = GetMetadata(id);
451 if (metadata == null)
452 return false;
453
454 asset = new AssetBase(metadata.FullID, metadata.Name, metadata.Type, UUID.Zero.ToString());
455 asset.Metadata = metadata;
456 }
457 asset.Data = data;
458
459 string uri = MapServer(id) + "/assets/" + id;
460
461 if (SynchronousRestObjectRequester.
462 MakeRequest<AssetBase, bool>("POST", uri, asset))
463 {
464 if (m_Cache != null)
465 m_Cache.Cache(asset);
466
467 return true;
468 }
469 return false;
470 }
471
472 public bool Delete(string id)
473 {
474 string uri = MapServer(id) + "/assets/" + id;
475
476 if (SynchronousRestObjectRequester.
477 MakeRequest<int, bool>("DELETE", uri, 0))
478 {
479 if (m_Cache != null)
480 m_Cache.Expire(id);
481
482 return true;
483 }
484 return false;
485 }
486 }
487}