diff options
Diffstat (limited to 'OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs')
-rw-r--r-- | OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | 463 |
1 files changed, 463 insertions, 0 deletions
diff --git a/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs new file mode 100644 index 0000000..2882906 --- /dev/null +++ b/OpenSim/Services/Connectors/Asset/AssetServicesConnector.cs | |||
@@ -0,0 +1,463 @@ | |||
1 | /* | ||
2 | * Copyright (c) Contributors, http://opensimulator.org/ | ||
3 | * See CONTRIBUTORS.TXT for a full list of copyright holders. | ||
4 | * | ||
5 | * Redistribution and use in source and binary forms, with or without | ||
6 | * modification, are permitted provided that the following conditions are met: | ||
7 | * * Redistributions of source code must retain the above copyright | ||
8 | * notice, this list of conditions and the following disclaimer. | ||
9 | * * Redistributions in binary form must reproduce the above copyright | ||
10 | * notice, this list of conditions and the following disclaimer in the | ||
11 | * documentation and/or other materials provided with the distribution. | ||
12 | * * Neither the name of the OpenSimulator Project nor the | ||
13 | * names of its contributors may be used to endorse or promote products | ||
14 | * derived from this software without specific prior written permission. | ||
15 | * | ||
16 | * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY | ||
17 | * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | ||
18 | * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
19 | * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY | ||
20 | * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | ||
21 | * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | ||
22 | * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | ||
23 | * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
24 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | ||
25 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
26 | */ | ||
27 | |||
28 | using log4net; | ||
29 | using System; | ||
30 | using System.Collections.Generic; | ||
31 | using System.IO; | ||
32 | using System.Reflection; | ||
33 | using System.Timers; | ||
34 | using Nini.Config; | ||
35 | using OpenSim.Framework; | ||
36 | using OpenSim.Framework.Console; | ||
37 | using OpenSim.Framework.Communications; | ||
38 | using OpenSim.Services.Interfaces; | ||
39 | using OpenMetaverse; | ||
40 | |||
41 | namespace OpenSim.Services.Connectors | ||
42 | { | ||
43 | public class AssetServicesConnector : IAssetService | ||
44 | { | ||
45 | private static readonly ILog m_log = | ||
46 | LogManager.GetLogger( | ||
47 | MethodBase.GetCurrentMethod().DeclaringType); | ||
48 | |||
49 | private string m_ServerURI = String.Empty; | ||
50 | private IImprovedAssetCache m_Cache = null; | ||
51 | private int m_retryCounter; | ||
52 | private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); | ||
53 | private Timer m_retryTimer; | ||
54 | private delegate void AssetRetrievedEx(AssetBase asset); | ||
55 | |||
56 | // Keeps track of concurrent requests for the same asset, so that it's only loaded once. | ||
57 | // Maps: Asset ID -> Handlers which will be called when the asset has been loaded | ||
58 | private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>(); | ||
59 | private Dictionary<string, string> m_UriMap = new Dictionary<string, string>(); | ||
60 | |||
61 | public AssetServicesConnector() | ||
62 | { | ||
63 | } | ||
64 | |||
65 | public AssetServicesConnector(string serverURI) | ||
66 | { | ||
67 | m_ServerURI = serverURI.TrimEnd('/'); | ||
68 | } | ||
69 | |||
70 | public AssetServicesConnector(IConfigSource source) | ||
71 | { | ||
72 | Initialise(source); | ||
73 | } | ||
74 | |||
75 | public virtual void Initialise(IConfigSource source) | ||
76 | { | ||
77 | IConfig assetConfig = source.Configs["AssetService"]; | ||
78 | if (assetConfig == null) | ||
79 | { | ||
80 | m_log.Error("[ASSET CONNECTOR]: AssetService missing from OpenSim.ini"); | ||
81 | throw new Exception("Asset connector init error"); | ||
82 | } | ||
83 | |||
84 | string serviceURI = assetConfig.GetString("AssetServerURI", | ||
85 | String.Empty); | ||
86 | |||
87 | m_ServerURI = serviceURI; | ||
88 | |||
89 | if (serviceURI == String.Empty) | ||
90 | { | ||
91 | m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService"); | ||
92 | throw new Exception("Asset connector init error"); | ||
93 | } | ||
94 | |||
95 | |||
96 | m_retryTimer = new Timer(); | ||
97 | m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); | ||
98 | m_retryTimer.Interval = 60000; | ||
99 | |||
100 | Uri serverUri = new Uri(m_ServerURI); | ||
101 | |||
102 | string groupHost = serverUri.Host; | ||
103 | |||
104 | for (int i = 0 ; i < 256 ; i++) | ||
105 | { | ||
106 | string prefix = i.ToString("x2"); | ||
107 | groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost); | ||
108 | |||
109 | m_UriMap[prefix] = groupHost; | ||
110 | //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); | ||
111 | } | ||
112 | } | ||
113 | |||
114 | private string MapServer(string id) | ||
115 | { | ||
116 | UriBuilder serverUri = new UriBuilder(m_ServerURI); | ||
117 | |||
118 | string prefix = id.Substring(0, 2).ToLower(); | ||
119 | |||
120 | string host = m_UriMap[prefix]; | ||
121 | |||
122 | serverUri.Host = host; | ||
123 | |||
124 | // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix); | ||
125 | |||
126 | return serverUri.Uri.AbsoluteUri; | ||
127 | } | ||
128 | |||
129 | protected void retryCheck(object source, ElapsedEventArgs e) | ||
130 | { | ||
131 | m_retryCounter++; | ||
132 | if (m_retryCounter > 60) m_retryCounter -= 60; | ||
133 | List<int> keys = new List<int>(); | ||
134 | foreach (int a in m_retryQueue.Keys) | ||
135 | { | ||
136 | keys.Add(a); | ||
137 | } | ||
138 | foreach (int a in keys) | ||
139 | { | ||
140 | //We exponentially fall back on frequency until we reach one attempt per hour | ||
141 | //The net result is that we end up in the queue for roughly 24 hours.. | ||
142 | //24 hours worth of assets could be a lot, so the hope is that the region admin | ||
143 | //will have gotten the asset connector back online quickly! | ||
144 | |||
145 | int timefactor = a ^ 2; | ||
146 | if (timefactor > 60) | ||
147 | { | ||
148 | timefactor = 60; | ||
149 | } | ||
150 | |||
151 | //First, find out if we care about this timefactor | ||
152 | if (timefactor % a == 0) | ||
153 | { | ||
154 | //Yes, we do! | ||
155 | List<AssetBase> retrylist = m_retryQueue[a]; | ||
156 | m_retryQueue.Remove(a); | ||
157 | |||
158 | foreach(AssetBase ass in retrylist) | ||
159 | { | ||
160 | Store(ass); //Store my ass. This function will put it back in the dictionary if it fails | ||
161 | } | ||
162 | } | ||
163 | } | ||
164 | |||
165 | if (m_retryQueue.Count == 0) | ||
166 | { | ||
167 | //It might only be one tick per minute, but I have | ||
168 | //repented and abandoned my wasteful ways | ||
169 | m_retryCounter = 0; | ||
170 | m_retryTimer.Stop(); | ||
171 | } | ||
172 | } | ||
173 | |||
174 | protected void SetCache(IImprovedAssetCache cache) | ||
175 | { | ||
176 | m_Cache = cache; | ||
177 | } | ||
178 | |||
179 | public AssetBase Get(string id) | ||
180 | { | ||
181 | string uri = MapServer(id) + "/assets/" + id; | ||
182 | |||
183 | AssetBase asset = null; | ||
184 | if (m_Cache != null) | ||
185 | asset = m_Cache.Get(id); | ||
186 | |||
187 | if (asset == null || asset.Data == null || asset.Data.Length == 0) | ||
188 | { | ||
189 | asset = SynchronousRestObjectRequester. | ||
190 | MakeRequest<int, AssetBase>("GET", uri, 0); | ||
191 | |||
192 | if (m_Cache != null) | ||
193 | m_Cache.Cache(asset); | ||
194 | } | ||
195 | return asset; | ||
196 | } | ||
197 | |||
198 | public AssetBase GetCached(string id) | ||
199 | { | ||
200 | // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Cache request for {0}", id); | ||
201 | |||
202 | if (m_Cache != null) | ||
203 | return m_Cache.Get(id); | ||
204 | |||
205 | return null; | ||
206 | } | ||
207 | |||
208 | public AssetMetadata GetMetadata(string id) | ||
209 | { | ||
210 | if (m_Cache != null) | ||
211 | { | ||
212 | AssetBase fullAsset = m_Cache.Get(id); | ||
213 | |||
214 | if (fullAsset != null) | ||
215 | return fullAsset.Metadata; | ||
216 | } | ||
217 | |||
218 | string uri = MapServer(id) + "/assets/" + id + "/metadata"; | ||
219 | |||
220 | AssetMetadata asset = SynchronousRestObjectRequester. | ||
221 | MakeRequest<int, AssetMetadata>("GET", uri, 0); | ||
222 | return asset; | ||
223 | } | ||
224 | |||
225 | public byte[] GetData(string id) | ||
226 | { | ||
227 | if (m_Cache != null) | ||
228 | { | ||
229 | AssetBase fullAsset = m_Cache.Get(id); | ||
230 | |||
231 | if (fullAsset != null) | ||
232 | return fullAsset.Data; | ||
233 | } | ||
234 | |||
235 | RestClient rc = new RestClient(MapServer(id)); | ||
236 | rc.AddResourcePath("assets"); | ||
237 | rc.AddResourcePath(id); | ||
238 | rc.AddResourcePath("data"); | ||
239 | |||
240 | rc.RequestMethod = "GET"; | ||
241 | |||
242 | Stream s = rc.Request(); | ||
243 | |||
244 | if (s == null) | ||
245 | return null; | ||
246 | |||
247 | if (s.Length > 0) | ||
248 | { | ||
249 | byte[] ret = new byte[s.Length]; | ||
250 | s.Read(ret, 0, (int)s.Length); | ||
251 | |||
252 | return ret; | ||
253 | } | ||
254 | |||
255 | return null; | ||
256 | } | ||
257 | |||
258 | public bool Get(string id, Object sender, AssetRetrieved handler) | ||
259 | { | ||
260 | string uri = MapServer(id) + "/assets/" + id; | ||
261 | |||
262 | AssetBase asset = null; | ||
263 | if (m_Cache != null) | ||
264 | asset = m_Cache.Get(id); | ||
265 | |||
266 | if (asset == null || asset.Data == null || asset.Data.Length == 0) | ||
267 | { | ||
268 | lock (m_AssetHandlers) | ||
269 | { | ||
270 | AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); | ||
271 | |||
272 | AssetRetrievedEx handlers; | ||
273 | if (m_AssetHandlers.TryGetValue(id, out handlers)) | ||
274 | { | ||
275 | // Someone else is already loading this asset. It will notify our handler when done. | ||
276 | handlers += handlerEx; | ||
277 | return true; | ||
278 | } | ||
279 | |||
280 | // Load the asset ourselves | ||
281 | handlers += handlerEx; | ||
282 | m_AssetHandlers.Add(id, handlers); | ||
283 | } | ||
284 | |||
285 | bool success = false; | ||
286 | try | ||
287 | { | ||
288 | AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, | ||
289 | delegate(AssetBase a) | ||
290 | { | ||
291 | if (m_Cache != null) | ||
292 | m_Cache.Cache(a); | ||
293 | |||
294 | AssetRetrievedEx handlers; | ||
295 | lock (m_AssetHandlers) | ||
296 | { | ||
297 | handlers = m_AssetHandlers[id]; | ||
298 | m_AssetHandlers.Remove(id); | ||
299 | } | ||
300 | handlers.Invoke(a); | ||
301 | }); | ||
302 | |||
303 | success = true; | ||
304 | } | ||
305 | finally | ||
306 | { | ||
307 | if (!success) | ||
308 | { | ||
309 | lock (m_AssetHandlers) | ||
310 | { | ||
311 | m_AssetHandlers.Remove(id); | ||
312 | } | ||
313 | } | ||
314 | } | ||
315 | } | ||
316 | else | ||
317 | { | ||
318 | handler(id, sender, asset); | ||
319 | } | ||
320 | |||
321 | return true; | ||
322 | } | ||
323 | |||
324 | public string Store(AssetBase asset) | ||
325 | { | ||
326 | // Have to assign the asset ID here. This isn't likely to | ||
327 | // trigger since current callers don't pass emtpy IDs | ||
328 | // We need the asset ID to route the request to the proper | ||
329 | // cluster member, so we can't have the server assign one. | ||
330 | if (asset.ID == string.Empty) | ||
331 | { | ||
332 | if (asset.FullID == UUID.Zero) | ||
333 | { | ||
334 | asset.FullID = UUID.Random(); | ||
335 | } | ||
336 | asset.ID = asset.FullID.ToString(); | ||
337 | } | ||
338 | else if (asset.FullID == UUID.Zero) | ||
339 | { | ||
340 | UUID uuid = UUID.Zero; | ||
341 | if (UUID.TryParse(asset.ID, out uuid)) | ||
342 | { | ||
343 | asset.FullID = uuid; | ||
344 | } | ||
345 | else | ||
346 | { | ||
347 | asset.FullID = UUID.Random(); | ||
348 | } | ||
349 | } | ||
350 | |||
351 | if (m_Cache != null) | ||
352 | m_Cache.Cache(asset); | ||
353 | if (asset.Temporary || asset.Local) | ||
354 | { | ||
355 | return asset.ID; | ||
356 | } | ||
357 | |||
358 | string uri = MapServer(asset.FullID.ToString()) + "/assets/"; | ||
359 | |||
360 | string newID = string.Empty; | ||
361 | try | ||
362 | { | ||
363 | newID = SynchronousRestObjectRequester. | ||
364 | MakeRequest<AssetBase, string>("POST", uri, asset, 25); | ||
365 | if (newID == null || newID == "") | ||
366 | { | ||
367 | newID = UUID.Zero.ToString(); | ||
368 | } | ||
369 | } | ||
370 | catch (Exception e) | ||
371 | { | ||
372 | newID = UUID.Zero.ToString(); | ||
373 | } | ||
374 | |||
375 | if (newID == UUID.Zero.ToString()) | ||
376 | { | ||
377 | //The asset upload failed, put it in a queue for later | ||
378 | asset.UploadAttempts++; | ||
379 | if (asset.UploadAttempts > 30) | ||
380 | { | ||
381 | //By this stage we've been in the queue for a good few hours; | ||
382 | //We're going to drop the asset. | ||
383 | m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); | ||
384 | } | ||
385 | else | ||
386 | { | ||
387 | if (!m_retryQueue.ContainsKey(asset.UploadAttempts)) | ||
388 | { | ||
389 | m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>()); | ||
390 | } | ||
391 | List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts]; | ||
392 | m_queue.Add(asset); | ||
393 | m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); | ||
394 | m_retryTimer.Start(); | ||
395 | } | ||
396 | } | ||
397 | else | ||
398 | { | ||
399 | if (asset.UploadAttempts > 0) | ||
400 | { | ||
401 | m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString()); | ||
402 | } | ||
403 | if (newID != String.Empty) | ||
404 | { | ||
405 | // Placing this here, so that this work with old asset servers that don't send any reply back | ||
406 | // SynchronousRestObjectRequester returns somethins that is not an empty string | ||
407 | if (newID != null) | ||
408 | asset.ID = newID; | ||
409 | |||
410 | if (m_Cache != null) | ||
411 | m_Cache.Cache(asset); | ||
412 | } | ||
413 | } | ||
414 | return asset.ID; | ||
415 | } | ||
416 | |||
417 | public bool UpdateContent(string id, byte[] data) | ||
418 | { | ||
419 | AssetBase asset = null; | ||
420 | |||
421 | if (m_Cache != null) | ||
422 | asset = m_Cache.Get(id); | ||
423 | |||
424 | if (asset == null) | ||
425 | { | ||
426 | AssetMetadata metadata = GetMetadata(id); | ||
427 | if (metadata == null) | ||
428 | return false; | ||
429 | |||
430 | asset = new AssetBase(metadata.FullID, metadata.Name, metadata.Type, UUID.Zero.ToString()); | ||
431 | asset.Metadata = metadata; | ||
432 | } | ||
433 | asset.Data = data; | ||
434 | |||
435 | string uri = MapServer(id) + "/assets/" + id; | ||
436 | |||
437 | if (SynchronousRestObjectRequester. | ||
438 | MakeRequest<AssetBase, bool>("POST", uri, asset)) | ||
439 | { | ||
440 | if (m_Cache != null) | ||
441 | m_Cache.Cache(asset); | ||
442 | |||
443 | return true; | ||
444 | } | ||
445 | return false; | ||
446 | } | ||
447 | |||
448 | public bool Delete(string id) | ||
449 | { | ||
450 | string uri = MapServer(id) + "/assets/" + id; | ||
451 | |||
452 | if (SynchronousRestObjectRequester. | ||
453 | MakeRequest<int, bool>("DELETE", uri, 0)) | ||
454 | { | ||
455 | if (m_Cache != null) | ||
456 | m_Cache.Expire(id); | ||
457 | |||
458 | return true; | ||
459 | } | ||
460 | return false; | ||
461 | } | ||
462 | } | ||
463 | } | ||