aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
diff options
context:
space:
mode:
authorMelanie2012-05-10 00:42:10 +0100
committerMelanie2012-05-10 00:42:10 +0100
commita90822f4b8ce8b10f8d38e09adf45a1e8e4ed86a (patch)
treee14d17475f8dcf643f4153f882f92e89b869a686 /OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
parentMerge branch 'master' into careminster (diff)
parentWhere necessary, rename OpenSim/Services/Connectors/*.cs files to reflect the... (diff)
downloadopensim-SC_OLD-a90822f4b8ce8b10f8d38e09adf45a1e8e4ed86a.zip
opensim-SC_OLD-a90822f4b8ce8b10f8d38e09adf45a1e8e4ed86a.tar.gz
opensim-SC_OLD-a90822f4b8ce8b10f8d38e09adf45a1e8e4ed86a.tar.bz2
opensim-SC_OLD-a90822f4b8ce8b10f8d38e09adf45a1e8e4ed86a.tar.xz
Merge branch 'master' into careminster
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r--OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs463
1 files changed, 463 insertions, 0 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
new file mode 100644
index 0000000..2882906
--- /dev/null
+++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs
@@ -0,0 +1,463 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using log4net;
29using System;
30using System.Collections.Generic;
31using System.IO;
32using System.Reflection;
33using System.Timers;
34using Nini.Config;
35using OpenSim.Framework;
36using OpenSim.Framework.Console;
37using OpenSim.Framework.Communications;
38using OpenSim.Services.Interfaces;
39using OpenMetaverse;
40
41namespace OpenSim.Services.Connectors
42{
43 public class AssetServicesConnector : IAssetService
44 {
45 private static readonly ILog m_log =
46 LogManager.GetLogger(
47 MethodBase.GetCurrentMethod().DeclaringType);
48
49 private string m_ServerURI = String.Empty;
50 private IImprovedAssetCache m_Cache = null;
51 private int m_retryCounter;
52 private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53 private Timer m_retryTimer;
54 private delegate void AssetRetrievedEx(AssetBase asset);
55
56 // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
57 // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
58 private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
59 private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
60
61 public AssetServicesConnector()
62 {
63 }
64
65 public AssetServicesConnector(string serverURI)
66 {
67 m_ServerURI = serverURI.TrimEnd('/');
68 }
69
70 public AssetServicesConnector(IConfigSource source)
71 {
72 Initialise(source);
73 }
74
75 public virtual void Initialise(IConfigSource source)
76 {
77 IConfig assetConfig = source.Configs["AssetService"];
78 if (assetConfig == null)
79 {
80 m_log.Error("[ASSET CONNECTOR]: AssetService missing from OpenSim.ini");
81 throw new Exception("Asset connector init error");
82 }
83
84 string serviceURI = assetConfig.GetString("AssetServerURI",
85 String.Empty);
86
87 m_ServerURI = serviceURI;
88
89 if (serviceURI == String.Empty)
90 {
91 m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
92 throw new Exception("Asset connector init error");
93 }
94
95
96 m_retryTimer = new Timer();
97 m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
98 m_retryTimer.Interval = 60000;
99
100 Uri serverUri = new Uri(m_ServerURI);
101
102 string groupHost = serverUri.Host;
103
104 for (int i = 0 ; i < 256 ; i++)
105 {
106 string prefix = i.ToString("x2");
107 groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
108
109 m_UriMap[prefix] = groupHost;
110 //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
111 }
112 }
113
114 private string MapServer(string id)
115 {
116 UriBuilder serverUri = new UriBuilder(m_ServerURI);
117
118 string prefix = id.Substring(0, 2).ToLower();
119
120 string host = m_UriMap[prefix];
121
122 serverUri.Host = host;
123
124 // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
125
126 return serverUri.Uri.AbsoluteUri;
127 }
128
129 protected void retryCheck(object source, ElapsedEventArgs e)
130 {
131 m_retryCounter++;
132 if (m_retryCounter > 60) m_retryCounter -= 60;
133 List<int> keys = new List<int>();
134 foreach (int a in m_retryQueue.Keys)
135 {
136 keys.Add(a);
137 }
138 foreach (int a in keys)
139 {
140 //We exponentially fall back on frequency until we reach one attempt per hour
141 //The net result is that we end up in the queue for roughly 24 hours..
142 //24 hours worth of assets could be a lot, so the hope is that the region admin
143 //will have gotten the asset connector back online quickly!
144
145 int timefactor = a ^ 2;
146 if (timefactor > 60)
147 {
148 timefactor = 60;
149 }
150
151 //First, find out if we care about this timefactor
152 if (timefactor % a == 0)
153 {
154 //Yes, we do!
155 List<AssetBase> retrylist = m_retryQueue[a];
156 m_retryQueue.Remove(a);
157
158 foreach(AssetBase ass in retrylist)
159 {
160 Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
161 }
162 }
163 }
164
165 if (m_retryQueue.Count == 0)
166 {
167 //It might only be one tick per minute, but I have
168 //repented and abandoned my wasteful ways
169 m_retryCounter = 0;
170 m_retryTimer.Stop();
171 }
172 }
173
174 protected void SetCache(IImprovedAssetCache cache)
175 {
176 m_Cache = cache;
177 }
178
179 public AssetBase Get(string id)
180 {
181 string uri = MapServer(id) + "/assets/" + id;
182
183 AssetBase asset = null;
184 if (m_Cache != null)
185 asset = m_Cache.Get(id);
186
187 if (asset == null || asset.Data == null || asset.Data.Length == 0)
188 {
189 asset = SynchronousRestObjectRequester.
190 MakeRequest<int, AssetBase>("GET", uri, 0);
191
192 if (m_Cache != null)
193 m_Cache.Cache(asset);
194 }
195 return asset;
196 }
197
198 public AssetBase GetCached(string id)
199 {
200// m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Cache request for {0}", id);
201
202 if (m_Cache != null)
203 return m_Cache.Get(id);
204
205 return null;
206 }
207
208 public AssetMetadata GetMetadata(string id)
209 {
210 if (m_Cache != null)
211 {
212 AssetBase fullAsset = m_Cache.Get(id);
213
214 if (fullAsset != null)
215 return fullAsset.Metadata;
216 }
217
218 string uri = MapServer(id) + "/assets/" + id + "/metadata";
219
220 AssetMetadata asset = SynchronousRestObjectRequester.
221 MakeRequest<int, AssetMetadata>("GET", uri, 0);
222 return asset;
223 }
224
225 public byte[] GetData(string id)
226 {
227 if (m_Cache != null)
228 {
229 AssetBase fullAsset = m_Cache.Get(id);
230
231 if (fullAsset != null)
232 return fullAsset.Data;
233 }
234
235 RestClient rc = new RestClient(MapServer(id));
236 rc.AddResourcePath("assets");
237 rc.AddResourcePath(id);
238 rc.AddResourcePath("data");
239
240 rc.RequestMethod = "GET";
241
242 Stream s = rc.Request();
243
244 if (s == null)
245 return null;
246
247 if (s.Length > 0)
248 {
249 byte[] ret = new byte[s.Length];
250 s.Read(ret, 0, (int)s.Length);
251
252 return ret;
253 }
254
255 return null;
256 }
257
258 public bool Get(string id, Object sender, AssetRetrieved handler)
259 {
260 string uri = MapServer(id) + "/assets/" + id;
261
262 AssetBase asset = null;
263 if (m_Cache != null)
264 asset = m_Cache.Get(id);
265
266 if (asset == null || asset.Data == null || asset.Data.Length == 0)
267 {
268 lock (m_AssetHandlers)
269 {
270 AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
271
272 AssetRetrievedEx handlers;
273 if (m_AssetHandlers.TryGetValue(id, out handlers))
274 {
275 // Someone else is already loading this asset. It will notify our handler when done.
276 handlers += handlerEx;
277 return true;
278 }
279
280 // Load the asset ourselves
281 handlers += handlerEx;
282 m_AssetHandlers.Add(id, handlers);
283 }
284
285 bool success = false;
286 try
287 {
288 AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0,
289 delegate(AssetBase a)
290 {
291 if (m_Cache != null)
292 m_Cache.Cache(a);
293
294 AssetRetrievedEx handlers;
295 lock (m_AssetHandlers)
296 {
297 handlers = m_AssetHandlers[id];
298 m_AssetHandlers.Remove(id);
299 }
300 handlers.Invoke(a);
301 });
302
303 success = true;
304 }
305 finally
306 {
307 if (!success)
308 {
309 lock (m_AssetHandlers)
310 {
311 m_AssetHandlers.Remove(id);
312 }
313 }
314 }
315 }
316 else
317 {
318 handler(id, sender, asset);
319 }
320
321 return true;
322 }
323
324 public string Store(AssetBase asset)
325 {
326 // Have to assign the asset ID here. This isn't likely to
327 // trigger since current callers don't pass emtpy IDs
328 // We need the asset ID to route the request to the proper
329 // cluster member, so we can't have the server assign one.
330 if (asset.ID == string.Empty)
331 {
332 if (asset.FullID == UUID.Zero)
333 {
334 asset.FullID = UUID.Random();
335 }
336 asset.ID = asset.FullID.ToString();
337 }
338 else if (asset.FullID == UUID.Zero)
339 {
340 UUID uuid = UUID.Zero;
341 if (UUID.TryParse(asset.ID, out uuid))
342 {
343 asset.FullID = uuid;
344 }
345 else
346 {
347 asset.FullID = UUID.Random();
348 }
349 }
350
351 if (m_Cache != null)
352 m_Cache.Cache(asset);
353 if (asset.Temporary || asset.Local)
354 {
355 return asset.ID;
356 }
357
358 string uri = MapServer(asset.FullID.ToString()) + "/assets/";
359
360 string newID = string.Empty;
361 try
362 {
363 newID = SynchronousRestObjectRequester.
364 MakeRequest<AssetBase, string>("POST", uri, asset, 25);
365 if (newID == null || newID == "")
366 {
367 newID = UUID.Zero.ToString();
368 }
369 }
370 catch (Exception e)
371 {
372 newID = UUID.Zero.ToString();
373 }
374
375 if (newID == UUID.Zero.ToString())
376 {
377 //The asset upload failed, put it in a queue for later
378 asset.UploadAttempts++;
379 if (asset.UploadAttempts > 30)
380 {
381 //By this stage we've been in the queue for a good few hours;
382 //We're going to drop the asset.
383 m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
384 }
385 else
386 {
387 if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
388 {
389 m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
390 }
391 List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
392 m_queue.Add(asset);
393 m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
394 m_retryTimer.Start();
395 }
396 }
397 else
398 {
399 if (asset.UploadAttempts > 0)
400 {
401 m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
402 }
403 if (newID != String.Empty)
404 {
405 // Placing this here, so that this work with old asset servers that don't send any reply back
406 // SynchronousRestObjectRequester returns somethins that is not an empty string
407 if (newID != null)
408 asset.ID = newID;
409
410 if (m_Cache != null)
411 m_Cache.Cache(asset);
412 }
413 }
414 return asset.ID;
415 }
416
417 public bool UpdateContent(string id, byte[] data)
418 {
419 AssetBase asset = null;
420
421 if (m_Cache != null)
422 asset = m_Cache.Get(id);
423
424 if (asset == null)
425 {
426 AssetMetadata metadata = GetMetadata(id);
427 if (metadata == null)
428 return false;
429
430 asset = new AssetBase(metadata.FullID, metadata.Name, metadata.Type, UUID.Zero.ToString());
431 asset.Metadata = metadata;
432 }
433 asset.Data = data;
434
435 string uri = MapServer(id) + "/assets/" + id;
436
437 if (SynchronousRestObjectRequester.
438 MakeRequest<AssetBase, bool>("POST", uri, asset))
439 {
440 if (m_Cache != null)
441 m_Cache.Cache(asset);
442
443 return true;
444 }
445 return false;
446 }
447
448 public bool Delete(string id)
449 {
450 string uri = MapServer(id) + "/assets/" + id;
451
452 if (SynchronousRestObjectRequester.
453 MakeRequest<int, bool>("DELETE", uri, 0))
454 {
455 if (m_Cache != null)
456 m_Cache.Expire(id);
457
458 return true;
459 }
460 return false;
461 }
462 }
463}