core/modules/monkeyjob/helpers/MonkeyJob.cs
using System;
using System.Collections.Generic; using System.Collections.ObjectModel; using System.Text; using System.Linq; using System.Management.Automation; using System.Management.Automation.Runspaces; using System.Threading; using System.Threading.Tasks; /// <summary> /// Represents a PowerShell job for Monkey365. /// </summary> public class MonkeyJob : System.Management.Automation.Job { private System.Management.Automation.PowerShell _innerJob; private readonly object _lockObject = new object(); private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private bool _failedOnUnblock = false; private string _status = "NotStarted"; private System.IAsyncResult Handle; private bool _isDisposed = false; // PowerShell requires a public parameterless constructor for some scenarios public MonkeyJob() : base() { } /// <summary> /// Initializes a new instance of the <see cref="MonkeyJob"/> class. /// </summary> /// <param name="powerShell">The PowerShell instance to run.</param> /// <param name="name">The job name.</param> public MonkeyJob(PowerShell powerShell, string name) : base() { if (powerShell == null) { throw new ArgumentNullException("powerShell"); } _innerJob = powerShell; this.PSJobTypeName = "MonkeyJob"; this.Name = name; SetUpStreams(name); } /// <inheritdoc/> public override string Location { get { return "localhost"; } } /// <inheritdoc/> public override string StatusMessage { get { return _status; } } /// <inheritdoc/> public override bool HasMoreData { get { // Avoid LINQ for performance if (Output != null && Output.Count > 0) return true; if (Progress != null && Progress.Count > 0) return true; if (Error != null && Error.Count > 0) return true; if (Warning != null && Warning.Count > 0) return true; if (Verbose != null && Verbose.Count > 0) return true; if (Debug != null && Debug.Count > 0) return true; return false; } } private void SetUpStreams(string name) { _innerJob.Streams.Verbose = this.Verbose; _innerJob.Streams.Error = this.Error; _innerJob.Streams.Debug = this.Debug; _innerJob.Streams.Warning = this.Warning; _innerJob.InvocationStateChanged += new EventHandler<PSInvocationStateChangedEventArgs>(PowerShellInvocationStateChanged); // Name is already set in constructor } private void PowerShellInvocationStateChanged(object sender, PSInvocationStateChangedEventArgs e) { if (e.InvocationStateInfo.State == PSInvocationState.Completed) { lock (_lockObject) { _status = "Completed"; SetJobState(JobState.Completed); Complete(); } } } /// <inheritdoc/> protected override void Dispose(bool disposing) { if (disposing) { if (!_isDisposed) { _isDisposed = true; // Use local variables for thread safety var innerJob = _innerJob; var cts = _cancellationTokenSource; try { if (!IsFailedOrCancelled(JobStateInfo.State)) { try { StopJob(); } catch { /* Ignore exceptions from StopJob in Dispose */ } } foreach (Job job in ChildJobs) { job.Dispose(); } if (innerJob != null) { // Detach event handler to avoid memory leaks innerJob.InvocationStateChanged -= new EventHandler<PSInvocationStateChangedEventArgs>(PowerShellInvocationStateChanged); innerJob.Dispose(); _innerJob = null; } if (cts != null) { cts.Dispose(); _cancellationTokenSource = null; } } finally { base.Dispose(disposing); } } } } /// <summary> /// Checks if the job is completed, failed, or stopped. /// </summary> internal bool IsFailedOrCancelled(JobState state) { return (state == JobState.Completed || state == JobState.Failed || state == JobState.Stopped); } /// <summary> /// Throws if the job is failed or cancelled. /// </summary> protected void ThrowIfJobFailedOrCancelled() { lock (_lockObject) { if (IsFailedOrCancelled(JobStateInfo.State)) { throw new InvalidOperationException("Monkey365 job is stopped or cancelled"); } } } /// <inheritdoc/> public override void StopJob() { if (_innerJob != null) { _innerJob.Stop(); if (Handle != null) { _innerJob.EndInvoke(Handle); } } lock (_lockObject) { _status = "Stopped"; SetJobState(JobState.Stopped); } } /// <summary> /// Forcibly stops the job and sets its state to Failed. /// </summary> public void ForceStop() { lock (_lockObject) { if (_innerJob != null) { try { _innerJob.Stop(); } catch { /* Ignore exceptions */ } if (Handle != null) { try { _innerJob.EndInvoke(Handle); } catch { /* Ignore exceptions */ } } } _status = "Failed"; SetJobState(JobState.Failed); } } /// <summary> /// Starts the job. /// </summary> public void Start() { Handle = _innerJob.BeginInvoke<PSObject, PSObject>(null, Output); lock (_lockObject) { SetJobState(JobState.Running); _status = "Running"; } } /// <summary> /// Waits for the job to complete. /// </summary> public void WaitJob() { if (Handle != null) { Handle.AsyncWaitHandle.WaitOne(); } } /// <summary> /// Waits for the job to complete or timeout. /// </summary> public void WaitJob(TimeSpan timeout) { if (Handle != null) { Handle.AsyncWaitHandle.WaitOne(timeout); } } /// <summary> /// Returns true if the job is finished. /// </summary> public bool IsFinished() { return Handle != null && Handle.IsCompleted; } /// <summary> /// Gets the job status as a PSObject. /// </summary> public PSObject JobStatus() { PSObject responseObject = new PSObject(); responseObject.Members.Add(new PSNoteProperty("InstanceId", _innerJob.InstanceId)); responseObject.Members.Add(new PSNoteProperty("State", _innerJob.InvocationStateInfo.State)); if (Handle != null) { responseObject.Members.Add(new PSNoteProperty("Reason", Handle.IsCompleted)); responseObject.Members.Add(new PSNoteProperty("AsyncState", Handle.AsyncState)); } responseObject.Members.Add(new PSNoteProperty("Error", _innerJob.Streams.Error)); responseObject.Members.Add(new PSNoteProperty("Warning", _innerJob.Streams.Warning)); responseObject.Members.Add(new PSNoteProperty("Info", _innerJob.Streams.Information)); responseObject.Members.Add(new PSNoteProperty("Verbose", _innerJob.Streams.Verbose)); responseObject.Members.Add(new PSNoteProperty("Debug", _innerJob.Streams.Debug)); return responseObject; } private bool TryStart() { bool result = false; lock (_lockObject) { if (!IsFailedOrCancelled(JobStateInfo.State)) { _status = "Running"; SetJobState(JobState.Running); result = true; } } return result; } private void Fail() { lock (_lockObject) { _status = "Failed"; SetJobState(JobState.Failed); } } private void Complete() { lock (_lockObject) { if (_failedOnUnblock) { _status = "Failed"; SetJobState(JobState.Failed); } else if (JobStateInfo != null && !IsFailedOrCancelled(JobStateInfo.State)) { _status = "Completed"; SetJobState(JobState.Completed); } } } private void Cancel() { lock (_lockObject) { _status = "Stopped"; SetJobState(JobState.Stopped); } } /// <summary> /// Starts the job asynchronously and returns the results. /// </summary> public async Task<PSDataCollection<PSObject>> StartTask() { PSDataCollection<PSObject> results = new PSDataCollection<PSObject>(); if (TryStart()) { try { results = await Task<PSDataCollection<PSObject>>.Factory.FromAsync(_innerJob.BeginInvoke(), pResult => _innerJob.EndInvoke(pResult)).ConfigureAwait(false); return results; } catch (PSInvalidOperationException ex) { string message = ex.Message; WriteError(new ErrorRecord(ex, message, ErrorCategory.InvalidOperation, this)); } catch (TaskCanceledException ex) { string message = ex.Message; WriteError(new ErrorRecord(ex, message, ErrorCategory.OperationStopped, this)); } catch (Exception ex) { string message = ex.Message; WriteError(new ErrorRecord(ex, message, ErrorCategory.InvalidOperation, this)); _failedOnUnblock = true; } finally { Complete(); } return new PSDataCollection<PSObject>(); } else { return new PSDataCollection<PSObject>(); } } /// <summary> /// Adds an error record to the job's error stream if possible. /// </summary> public void WriteError(ErrorRecord errorRecord) { lock (_lockObject) { if (IsFailedOrCancelled(JobStateInfo.State)) { return; } } if (Error.IsOpen) { Error.Add(errorRecord); } } /// <summary> /// Disposes the inner PowerShell job's RunspacePool if available. /// </summary> public void DisposeInnerRunspacePool() { lock (_lockObject) { if (_innerJob != null && _innerJob.RunspacePool != null) { try { if (_innerJob.RunspacePool.RunspacePoolStateInfo.State != RunspacePoolState.Closed && _innerJob.RunspacePool.RunspacePoolStateInfo.State != RunspacePoolState.Broken && _innerJob.RunspacePool.RunspacePoolStateInfo.State != RunspacePoolState.Closing) { _innerJob.RunspacePool.Dispose(); } } catch { /* Ignore exceptions on close */ } finally { _innerJob.RunspacePool = null; } } } } } |