ThreadingHelper.cs 8.5 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.ComponentModel;
  4. using System.Linq;
  5. using System.Reflection;
  6. using System.Threading;
  7. using BepInEx.Logging;
  8. using UnityEngine;
  9. namespace BepInEx
  10. {
  11. /// <summary>
  12. /// Provides methods for running code on other threads and synchronizing with the main thread.
  13. /// </summary>
  14. public sealed class ThreadingHelper : MonoBehaviour, ISynchronizeInvoke
  15. {
  16. private readonly object _invokeLock = new object();
  17. private Action _invokeList;
  18. private Thread _mainThread;
  19. /// <summary>
  20. /// Current instance of the helper.
  21. /// </summary>
  22. public static ThreadingHelper Instance { get; private set; }
  23. /// <summary>
  24. /// Gives methods for invoking delegates on the main unity thread, both synchronously and asynchronously.
  25. /// Can be used in many built-in framework types, for example <see cref="System.IO.FileSystemWatcher.SynchronizingObject"/>
  26. /// and <see cref="System.Timers.Timer.SynchronizingObject"/> to make their events fire on the main unity thread.
  27. /// </summary>
  28. public static ISynchronizeInvoke SynchronizingObject => Instance;
  29. internal static void Initialize()
  30. {
  31. var go = new GameObject("BepInEx_ThreadingHelper");
  32. DontDestroyOnLoad(go);
  33. Instance = go.AddComponent<ThreadingHelper>();
  34. }
  35. /// <summary>
  36. /// Queue the delegate to be invoked on the main unity thread. Use to synchronize your threads.
  37. /// </summary>
  38. public void StartSyncInvoke(Action action)
  39. {
  40. if (action == null) throw new ArgumentNullException(nameof(action));
  41. lock (_invokeLock) _invokeList += action;
  42. }
  43. private void Update()
  44. {
  45. // The CurrentThread can change between Awake and later methods, it's safest to get it here.
  46. if (_mainThread == null)
  47. _mainThread = Thread.CurrentThread;
  48. // Safe to do outside of lock because nothing can remove callbacks, at worst we execute with 1 frame delay
  49. if (_invokeList == null) return;
  50. Action toRun;
  51. lock (_invokeLock)
  52. {
  53. toRun = _invokeList;
  54. _invokeList = null;
  55. }
  56. // Need to execute outside of the lock in case the callback itself calls Invoke we could deadlock
  57. // The invocation would also block any threads that call Invoke
  58. foreach (var action in toRun.GetInvocationList().Cast<Action>())
  59. {
  60. try
  61. {
  62. action();
  63. }
  64. catch (Exception ex)
  65. {
  66. LogInvocationException(ex);
  67. }
  68. }
  69. }
  70. /// <summary>
  71. /// Queue the delegate to be invoked on a background thread. Use this to run slow tasks without affecting the game.
  72. /// NOTE: Most of Unity API can not be accessed while running on another thread!
  73. /// </summary>
  74. /// <param name="action">
  75. /// Task to be executed on another thread. Can optionally return an Action that will be executed on the main thread.
  76. /// You can use this action to return results of your work safely. Return null if this is not needed.
  77. /// </param>
  78. public void StartAsyncInvoke(Func<Action> action)
  79. {
  80. void DoWork(object _)
  81. {
  82. try
  83. {
  84. var result = action();
  85. if (result != null)
  86. StartSyncInvoke(result);
  87. }
  88. catch (Exception ex)
  89. {
  90. LogInvocationException(ex);
  91. }
  92. }
  93. if (!ThreadPool.QueueUserWorkItem(DoWork))
  94. throw new NotSupportedException("Failed to queue the action on ThreadPool");
  95. }
  96. private static void LogInvocationException(Exception ex)
  97. {
  98. Logging.Logger.Log(LogLevel.Error, ex);
  99. if (ex.InnerException != null) Logging.Logger.Log(LogLevel.Error, "INNER: " + ex.InnerException);
  100. }
  101. #region ISynchronizeInvoke
  102. IAsyncResult ISynchronizeInvoke.BeginInvoke(Delegate method, object[] args)
  103. {
  104. var result = new InvokeResult();
  105. object Invoke()
  106. {
  107. try
  108. {
  109. return method.DynamicInvoke(args);
  110. }
  111. catch (Exception ex)
  112. {
  113. result.ExceptionThrown = true;
  114. return ex;
  115. }
  116. }
  117. if (!InvokeRequired)
  118. result.Finish(Invoke(), true);
  119. else
  120. StartSyncInvoke(() => result.Finish(Invoke(), false));
  121. return result;
  122. }
  123. object ISynchronizeInvoke.EndInvoke(IAsyncResult result)
  124. {
  125. var invokeResult = (InvokeResult)result;
  126. invokeResult.AsyncWaitHandle.WaitOne();
  127. if (invokeResult.ExceptionThrown)
  128. throw (Exception)invokeResult.AsyncState;
  129. return invokeResult.AsyncState;
  130. }
  131. object ISynchronizeInvoke.Invoke(Delegate method, object[] args)
  132. {
  133. var invokeResult = ((ISynchronizeInvoke)this).BeginInvoke(method, args);
  134. return ((ISynchronizeInvoke)this).EndInvoke(invokeResult);
  135. }
  136. /// <summary>
  137. /// False if current code is executing on the main unity thread, otherwise True.
  138. /// Warning: Will return true before the first frame finishes (i.e. inside plugin Awake and Start methods).
  139. /// </summary>
  140. /// <inheritdoc />
  141. public bool InvokeRequired => _mainThread == null || _mainThread != Thread.CurrentThread;
  142. private sealed class InvokeResult : IAsyncResult
  143. {
  144. public InvokeResult()
  145. {
  146. AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
  147. }
  148. public void Finish(object result, bool completedSynchronously)
  149. {
  150. AsyncState = result;
  151. CompletedSynchronously = completedSynchronously;
  152. IsCompleted = true;
  153. ((EventWaitHandle)AsyncWaitHandle).Set();
  154. }
  155. public bool IsCompleted { get; private set; }
  156. public WaitHandle AsyncWaitHandle { get; }
  157. public object AsyncState { get; private set; }
  158. public bool CompletedSynchronously { get; private set; }
  159. internal bool ExceptionThrown;
  160. }
  161. #endregion
  162. }
  163. /// <summary>
  164. /// Convenience extensions for utilizing multiple threads and using the <see cref="ThreadingHelper"/>.
  165. /// </summary>
  166. public static class ThreadingExtensions
  167. {
  168. /// <inheritdoc cref="RunParallel{TIn,TOut}(IList{TIn},Func{TIn,TOut},int)"/>
  169. public static IEnumerable<TOut> RunParallel<TIn, TOut>(this IEnumerable<TIn> data, Func<TIn, TOut> work, int workerCount = -1)
  170. {
  171. foreach (var result in RunParallel(data.ToList(), work))
  172. yield return result;
  173. }
  174. /// <summary>
  175. /// Apply a function to a collection of data by spreading the work on multiple threads.
  176. /// Outputs of the functions are returned to the current thread and yielded one by one.
  177. /// </summary>
  178. /// <typeparam name="TIn">Type of the input values.</typeparam>
  179. /// <typeparam name="TOut">Type of the output values.</typeparam>
  180. /// <param name="data">Input values for the work function.</param>
  181. /// <param name="work">Function to apply to the data on multiple threads at once.</param>
  182. /// <param name="workerCount">Number of worker threads. By default SystemInfo.processorCount is used.</param>
  183. /// <exception cref="TargetInvocationException">An exception was thrown inside one of the threads, and the operation was aborted.</exception>
  184. /// <exception cref="ArgumentException">Need at least 1 workerCount.</exception>
  185. public static IEnumerable<TOut> RunParallel<TIn, TOut>(this IList<TIn> data, Func<TIn, TOut> work, int workerCount = -1)
  186. {
  187. if (workerCount < 0)
  188. workerCount = Mathf.Max(2, Environment.ProcessorCount);
  189. else if (workerCount == 0)
  190. throw new ArgumentException("Need at least 1 worker", nameof(workerCount));
  191. var perThreadCount = Mathf.CeilToInt(data.Count / (float)workerCount);
  192. var doneCount = 0;
  193. var lockObj = new object();
  194. var are = new ManualResetEvent(false);
  195. IEnumerable<TOut> doneItems = null;
  196. Exception exceptionThrown = null;
  197. // Start threads to process the data
  198. for (var i = 0; i < workerCount; i++)
  199. {
  200. int first = i * perThreadCount;
  201. int last = Mathf.Min(first + perThreadCount, data.Count);
  202. ThreadPool.QueueUserWorkItem(
  203. _ =>
  204. {
  205. var results = new List<TOut>(perThreadCount);
  206. try
  207. {
  208. for (int dataIndex = first; dataIndex < last; dataIndex++)
  209. {
  210. if (exceptionThrown != null) break;
  211. results.Add(work(data[dataIndex]));
  212. }
  213. }
  214. catch (Exception ex)
  215. {
  216. exceptionThrown = ex;
  217. }
  218. lock (lockObj)
  219. {
  220. doneItems = doneItems == null ? results : results.Concat(doneItems);
  221. doneCount++;
  222. are.Set();
  223. }
  224. });
  225. }
  226. // Main thread waits for results and returns them until all threads finish
  227. while (true)
  228. {
  229. are.WaitOne();
  230. IEnumerable<TOut> toOutput;
  231. bool isDone;
  232. lock (lockObj)
  233. {
  234. toOutput = doneItems;
  235. doneItems = null;
  236. isDone = doneCount == workerCount;
  237. }
  238. if (toOutput != null)
  239. {
  240. foreach (var doneItem in toOutput)
  241. yield return doneItem;
  242. }
  243. if (isDone)
  244. break;
  245. }
  246. if (exceptionThrown != null)
  247. throw new TargetInvocationException("An exception was thrown inside one of the threads", exceptionThrown);
  248. }
  249. }
  250. }