
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;

namespace Kubectl {
    /// <summary>
    /// A synchronisation context that runs all calls scheduled on it (via <see cref="SynchronizationContext.Post"/>) on a single thread.
    /// </summary>
    /// <remarks>
    /// With thanks to Stephen Toub.
    /// </remarks>
    public sealed class ThreadAffinitiveSynchronizationContext
        : SynchronizationContext, IDisposable {
        /// <summary>
        /// A blocking collection (effectively a queue) of work items to execute, consisting of callback delegates and their callback state (if any).
        /// </summary>
        BlockingCollection<KeyValuePair<SendOrPostCallback, object>> _workItemQueue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();

        /// <summary>
        /// Create a new thread-affinitive synchronisation context.
        /// </summary>
        ThreadAffinitiveSynchronizationContext() {

        /// <summary>
        /// Dispose of resources being used by the synchronisation context.
        /// </summary>
        void IDisposable.Dispose() {
            if (_workItemQueue != null) {
                _workItemQueue = null;

        /// <summary>
        /// Check if the synchronisation context has been disposed.
        /// </summary>
        void CheckDisposed() {
            if (_workItemQueue == null)
                throw new ObjectDisposedException(GetType().Name);

        /// <summary>
        /// Run the message pump for the callback queue on the current thread.
        /// </summary>
        void RunMessagePump() {

            KeyValuePair<SendOrPostCallback, object> workItem;
            while (_workItemQueue.TryTake(out workItem, Timeout.InfiniteTimeSpan)) {

                // Has the synchronisation context been disposed?
                if (_workItemQueue == null)

        /// <summary>
        /// Terminate the message pump once all callbacks have completed.
        /// </summary>
        void TerminateMessagePump() {


        /// <summary>
        /// Dispatch an asynchronous message to the synchronization context.
        /// </summary>
        /// <param name="callback">
        /// The <see cref="SendOrPostCallback"/> delegate to call in the synchronisation context.
        /// </param>
        /// <param name="callbackState">
        /// Optional state data passed to the callback.
        /// </param>
        /// <exception cref="InvalidOperationException">
        /// The message pump has already been started, and then terminated by calling <see cref="TerminateMessagePump"/>.
        /// </exception>
        public override void Post(SendOrPostCallback callback, object callbackState) {
            if (callback == null)
                throw new ArgumentNullException(nameof(callback));


            try {
                    new KeyValuePair<SendOrPostCallback, object>(
                        key: callback,
                        value: callbackState
            } catch (InvalidOperationException eMessagePumpAlreadyTerminated) {
                throw new InvalidOperationException(
                    "Cannot enqueue the specified callback because the synchronisation context's message pump has already been terminated.",

        /// <summary>
        /// Run an asynchronous operation using the current thread as its synchronisation context.
        /// </summary>
        /// <param name="asyncOperation">
        /// A <see cref="Func{TResult}"/> delegate representing the asynchronous operation to run.
        /// </param>
        public static void RunSynchronized(Func<Task> asyncOperation) {
            if (asyncOperation == null)
                throw new ArgumentNullException(nameof(asyncOperation));

            SynchronizationContext savedContext = Current;
            try {
                using (ThreadAffinitiveSynchronizationContext synchronizationContext = new ThreadAffinitiveSynchronizationContext()) {

                    Task rootOperationTask = asyncOperation();
                    if (rootOperationTask == null)
                        throw new InvalidOperationException("The asynchronous operation delegate cannot return null.");

                        operationTask =>


                    try {
                    } catch (AggregateException eWaitForTask) // The TPL will almost always wrap an AggregateException around any exception thrown by the async operation.
                        // Is this just a wrapped exception?
                        AggregateException flattenedAggregate = eWaitForTask.Flatten();
                        if (flattenedAggregate.InnerExceptions.Count != 1)
                            throw; // Nope, genuine aggregate.

                        // Yep, so rethrow (preserving original stack-trace).
            } finally {

        /// <summary>
        /// Run an asynchronous operation using the current thread as its synchronisation context.
        /// </summary>
        /// <typeparam name="TResult">
        /// The operation result type.
        /// </typeparam>
        /// <param name="asyncOperation">
        /// A <see cref="Func{TResult}"/> delegate representing the asynchronous operation to run.
        /// </param>
        /// <returns>
        /// The operation result.
        /// </returns>
        public static TResult RunSynchronized<TResult>(Func<Task<TResult>> asyncOperation) {
            if (asyncOperation == null)
                throw new ArgumentNullException(nameof(asyncOperation));

            SynchronizationContext savedContext = Current;
            try {
                using (ThreadAffinitiveSynchronizationContext synchronizationContext = new ThreadAffinitiveSynchronizationContext()) {

                    Task<TResult> rootOperationTask = asyncOperation();
                    if (rootOperationTask == null)
                        throw new InvalidOperationException("The asynchronous operation delegate cannot return null.");

                        operationTask =>


                    try {
                    } catch (AggregateException eWaitForTask) // The TPL will almost always wrap an AggregateException around any exception thrown by the async operation.
                        // Is this just a wrapped exception?
                        AggregateException flattenedAggregate = eWaitForTask.Flatten();
                        if (flattenedAggregate.InnerExceptions.Count != 1)
                            throw; // Nope, genuine aggregate.

                        // Yep, so rethrow (preserving original stack-trace).

                        throw; // Never reached.
            } finally {