aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs89
-rw-r--r--OpenSim/Framework/Monitoring/WorkManager.cs10
-rw-r--r--OpenSim/Framework/Util.cs10
-rw-r--r--OpenSim/Framework/WebUtil.cs20
4 files changed, 64 insertions, 65 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
index a6a059d..115871e 100644
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring
57 /// <remarks> 57 /// <remarks>
58 /// Will be null if no job is currently running. 58 /// Will be null if no job is currently running.
59 /// </remarks> 59 /// </remarks>
60 public Job CurrentJob { get; private set; } 60 private Job m_currentJob;
61 public Job CurrentJob { get { return m_currentJob;} }
61 62
62 /// <summary> 63 /// <summary>
63 /// Number of jobs waiting to be processed. 64 /// Number of jobs waiting to be processed.
@@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring
82 83
83 private CancellationTokenSource m_cancelSource; 84 private CancellationTokenSource m_cancelSource;
84 85
85 /// <summary> 86 private int m_timeout = -1;
86 /// Used to signal that we are ready to complete stop. 87
87 /// </summary> 88 private bool m_threadRunnig = false;
88 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
89 89
90 public JobEngine(string name, string loggingName) 90 public JobEngine(string name, string loggingName, int timeout = -1)
91 { 91 {
92 Name = name; 92 Name = name;
93 LoggingName = loggingName; 93 LoggingName = loggingName;
94 94 m_timeout = timeout;
95 RequestProcessTimeoutOnStop = 5000; 95 RequestProcessTimeoutOnStop = 5000;
96 } 96 }
97 97
@@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring
104 104
105 IsRunning = true; 105 IsRunning = true;
106 106
107 m_finishedProcessingAfterStop.Reset();
108
109 m_cancelSource = new CancellationTokenSource(); 107 m_cancelSource = new CancellationTokenSource();
110 108 WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
111 WorkManager.StartThread( 109 m_threadRunnig = true;
112 ProcessRequests,
113 Name,
114 ThreadPriority.Normal,
115 false,
116 true,
117 null,
118 int.MaxValue);
119 } 110 }
120 } 111 }
121 112
@@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring
131 m_log.DebugFormat("[JobEngine] Stopping {0}", Name); 122 m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
132 123
133 IsRunning = false; 124 IsRunning = false;
134 125 if(m_threadRunnig)
135 m_finishedProcessingAfterStop.Reset(); 126 {
136 if(m_jobQueue.Count <= 0)
137 m_cancelSource.Cancel(); 127 m_cancelSource.Cancel();
138 128 m_threadRunnig = false;
139 m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); 129 }
140 m_finishedProcessingAfterStop.Close();
141 } 130 }
142 finally 131 finally
143 { 132 {
144 if(m_cancelSource != null) 133 if(m_cancelSource != null)
145 m_cancelSource.Dispose(); 134 m_cancelSource.Dispose();
146 if(m_finishedProcessingAfterStop != null)
147 m_finishedProcessingAfterStop.Dispose();
148 } 135 }
149 } 136 }
150 } 137 }
@@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring
203 /// </param> 190 /// </param>
204 public bool QueueJob(Job job) 191 public bool QueueJob(Job job)
205 { 192 {
193 lock(JobLock)
194 {
195 if(!IsRunning)
196 return false;
197
198 if(!m_threadRunnig)
199 {
200 WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
201 m_threadRunnig = true;
202 }
203 }
204
206 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) 205 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
207 { 206 {
208 m_jobQueue.Add(job); 207 m_jobQueue.Add(job);
@@ -222,59 +221,53 @@ namespace OpenSim.Framework.Monitoring
222 221
223 m_warnOverMaxQueue = false; 222 m_warnOverMaxQueue = false;
224 } 223 }
225
226 return false; 224 return false;
227 } 225 }
228 } 226 }
229 227
230 private void ProcessRequests() 228 private void ProcessRequests(Object o)
231 { 229 {
232 while(IsRunning || m_jobQueue.Count > 0) 230 while(IsRunning)
233 { 231 {
234 try 232 try
235 { 233 {
236 CurrentJob = m_jobQueue.Take(m_cancelSource.Token); 234 if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token))
237 }
238 catch(ObjectDisposedException e)
239 {
240 // If we see this whilst not running then it may be due to a race where this thread checks
241 // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
242 if(IsRunning)
243 throw e;
244 else
245 { 235 {
246 m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", 236 lock(JobLock)
247 Name,m_jobQueue.Count); 237 m_threadRunnig = false;
248 break; 238 break;
249 } 239 }
250 } 240 }
241 catch(ObjectDisposedException e)
242 {
243 m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
244 Name,m_jobQueue.Count);
245 break;
246 }
251 catch(OperationCanceledException) 247 catch(OperationCanceledException)
252 { 248 {
253 break; 249 break;
254 } 250 }
255 251
256 if(LogLevel >= 1) 252 if(LogLevel >= 1)
257 m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); 253 m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name);
258 254
259 try 255 try
260 { 256 {
261 CurrentJob.Action(); 257 m_currentJob.Action();
262 } 258 }
263 catch(Exception e) 259 catch(Exception e)
264 { 260 {
265 m_log.Error( 261 m_log.Error(
266 string.Format( 262 string.Format(
267 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); 263 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e);
268 } 264 }
269 265
270 if(LogLevel >= 1) 266 if(LogLevel >= 1)
271 m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); 267 m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name);
272 268
273 CurrentJob = null; 269 m_currentJob = null;
274 } 270 }
275
276 Watchdog.RemoveThread(false);
277 m_finishedProcessingAfterStop.Set();
278 } 271 }
279 272
280 public class Job 273 public class Job
diff --git a/OpenSim/Framework/Monitoring/WorkManager.cs b/OpenSim/Framework/Monitoring/WorkManager.cs
index 9d52f71..5d9b185 100644
--- a/OpenSim/Framework/Monitoring/WorkManager.cs
+++ b/OpenSim/Framework/Monitoring/WorkManager.cs
@@ -57,7 +57,7 @@ namespace OpenSim.Framework.Monitoring
57 57
58 static WorkManager() 58 static WorkManager()
59 { 59 {
60 JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE"); 60 JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE", 30000);
61 61
62 StatsManager.RegisterStat( 62 StatsManager.RegisterStat(
63 new Stat( 63 new Stat(
@@ -182,9 +182,9 @@ namespace OpenSim.Framework.Monitoring
182 /// <param name="callback"></param> 182 /// <param name="callback"></param>
183 /// <param name="obj"></param> 183 /// <param name="obj"></param>
184 /// <param name="name">The name of the job. This is used in monitoring and debugging.</param> 184 /// <param name="name">The name of the job. This is used in monitoring and debugging.</param>
185 public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name) 185 public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name, bool timeout = true)
186 { 186 {
187 Util.FireAndForget(callback, obj, name); 187 Util.FireAndForget(callback, obj, name, timeout);
188 } 188 }
189 189
190 /// <summary> 190 /// <summary>
@@ -231,10 +231,8 @@ namespace OpenSim.Framework.Monitoring
231 JobEngine.QueueJob(name, () => callback(obj)); 231 JobEngine.QueueJob(name, () => callback(obj));
232 else if (canRunInThisThread) 232 else if (canRunInThisThread)
233 callback(obj); 233 callback(obj);
234 else if (mustNotTimeout)
235 RunInThread(callback, obj, name, log);
236 else 234 else
237 Util.FireAndForget(callback, obj, name); 235 Util.FireAndForget(callback, obj, name, !mustNotTimeout);
238 } 236 }
239 237
240 private static void HandleControlCommand(string module, string[] args) 238 private static void HandleControlCommand(string module, string[] args)
diff --git a/OpenSim/Framework/Util.cs b/OpenSim/Framework/Util.cs
index f52a84c..9a1e348 100644
--- a/OpenSim/Framework/Util.cs
+++ b/OpenSim/Framework/Util.cs
@@ -2492,8 +2492,9 @@ namespace OpenSim.Framework
2492 public bool Running { get; set; } 2492 public bool Running { get; set; }
2493 public bool Aborted { get; set; } 2493 public bool Aborted { get; set; }
2494 private int started; 2494 private int started;
2495 public bool DoTimeout;
2495 2496
2496 public ThreadInfo(long threadFuncNum, string context) 2497 public ThreadInfo(long threadFuncNum, string context, bool dotimeout = true)
2497 { 2498 {
2498 ThreadFuncNum = threadFuncNum; 2499 ThreadFuncNum = threadFuncNum;
2499 this.context = context; 2500 this.context = context;
@@ -2501,6 +2502,7 @@ namespace OpenSim.Framework
2501 Thread = null; 2502 Thread = null;
2502 Running = false; 2503 Running = false;
2503 Aborted = false; 2504 Aborted = false;
2505 DoTimeout = dotimeout;
2504 } 2506 }
2505 2507
2506 public void Started() 2508 public void Started()
@@ -2571,7 +2573,7 @@ namespace OpenSim.Framework
2571 foreach (KeyValuePair<long, ThreadInfo> entry in activeThreads) 2573 foreach (KeyValuePair<long, ThreadInfo> entry in activeThreads)
2572 { 2574 {
2573 ThreadInfo t = entry.Value; 2575 ThreadInfo t = entry.Value;
2574 if (t.Running && !t.Aborted && (t.Elapsed() >= THREAD_TIMEOUT)) 2576 if (t.DoTimeout && t.Running && !t.Aborted && (t.Elapsed() >= THREAD_TIMEOUT))
2575 { 2577 {
2576 m_log.WarnFormat("Timeout in threadfunc {0} ({1}) {2}", t.ThreadFuncNum, t.Thread.Name, t.GetStackTrace()); 2578 m_log.WarnFormat("Timeout in threadfunc {0} ({1}) {2}", t.ThreadFuncNum, t.Thread.Name, t.GetStackTrace());
2577 t.Abort(); 2579 t.Abort();
@@ -2612,7 +2614,7 @@ namespace OpenSim.Framework
2612 FireAndForget(callback, obj, null); 2614 FireAndForget(callback, obj, null);
2613 } 2615 }
2614 2616
2615 public static void FireAndForget(System.Threading.WaitCallback callback, object obj, string context) 2617 public static void FireAndForget(System.Threading.WaitCallback callback, object obj, string context, bool dotimeout = true)
2616 { 2618 {
2617 Interlocked.Increment(ref numTotalThreadFuncsCalled); 2619 Interlocked.Increment(ref numTotalThreadFuncsCalled);
2618 2620
@@ -2634,7 +2636,7 @@ namespace OpenSim.Framework
2634 bool loggingEnabled = LogThreadPool > 0; 2636 bool loggingEnabled = LogThreadPool > 0;
2635 2637
2636 long threadFuncNum = Interlocked.Increment(ref nextThreadFuncNum); 2638 long threadFuncNum = Interlocked.Increment(ref nextThreadFuncNum);
2637 ThreadInfo threadInfo = new ThreadInfo(threadFuncNum, context); 2639 ThreadInfo threadInfo = new ThreadInfo(threadFuncNum, context, dotimeout);
2638 2640
2639 if (FireAndForgetMethod == FireAndForgetMethod.RegressionTest) 2641 if (FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
2640 { 2642 {
diff --git a/OpenSim/Framework/WebUtil.cs b/OpenSim/Framework/WebUtil.cs
index 7b085d0..48078ad 100644
--- a/OpenSim/Framework/WebUtil.cs
+++ b/OpenSim/Framework/WebUtil.cs
@@ -1262,18 +1262,24 @@ namespace OpenSim.Framework
1262 { 1262 {
1263 if (hwr.StatusCode == HttpStatusCode.NotFound) 1263 if (hwr.StatusCode == HttpStatusCode.NotFound)
1264 return deserial; 1264 return deserial;
1265
1265 if (hwr.StatusCode == HttpStatusCode.Unauthorized) 1266 if (hwr.StatusCode == HttpStatusCode.Unauthorized)
1266 { 1267 {
1267 m_log.Error(string.Format( 1268 m_log.ErrorFormat("[SynchronousRestObjectRequester]: Web request {0} requires authentication",
1268 "[SynchronousRestObjectRequester]: Web request {0} requires authentication ", 1269 requestUrl);
1269 requestUrl)); 1270 }
1270 return deserial; 1271 else
1272 {
1273 m_log.WarnFormat("[SynchronousRestObjectRequester]: Web request {0} returned error: {1}",
1274 requestUrl, hwr.StatusCode);
1271 } 1275 }
1272 } 1276 }
1273 else 1277 else
1274 m_log.Error(string.Format( 1278 m_log.ErrorFormat(
1275 "[SynchronousRestObjectRequester]: WebException for {0} {1} {2} ", 1279 "[SynchronousRestObjectRequester]: WebException for {0} {1} {2} {3}",
1276 verb, requestUrl, typeof(TResponse).ToString()), e); 1280 verb, requestUrl, typeof(TResponse).ToString(), e.Message);
1281
1282 return deserial;
1277 } 1283 }
1278 } 1284 }
1279 catch (System.InvalidOperationException) 1285 catch (System.InvalidOperationException)