/**
Pipelines are a general mechanism for managing ordered steps of async
actions.

They provide for smart retries, per-step error handling.
 */
export const STEP_STATE_PENDING = 'pending';
export const STEP_STATE_ERROR = 'error';
export const STEP_STATE_COMPLETE = 'complete';

export class Step {
  constructor(runFn, onError) {
    this.state = STEP_STATE_PENDING;
    this.run = runFn;
    // onError handlers shoud either:
    // - resolve & retry current step by returning
    // - or throw error if cannot be resolved
    this.onError = onError;
    this.meta = {
      retryCount: 0
    };
  }
}

export default class Pipeline {
  constructor(steps, initResults, config) {
    this.steps = steps || [];
    this.config = config || {};
    this.results = initResults || {};
    this.pending = null;
    this.currentStep = null;
  }

  async run(data) {
    if (!this.steps.length) throw new Error('No steps found');
    if (this.pending) return this.pending;
    this.remainingSteps = this.steps.concat();
    this.currentStep = null;
    this.pending = this.runStep(data);
    return this.pending;
  }

  async runStep(data) {
    if (!this.remainingSteps.length) {
      this.pending = null;
      return this.results;
    }

    // Either we're trying the next step, or this call is
    // re-trying the same step
    if (!this.currentStep) {
      this.currentStep = this.remainingSteps.shift();
    } else {
      this.currentStep.meta.retryCount += 1;
    }

    // Skip if the step has already been completed
    if (this.currentStep.state === STEP_STATE_COMPLETE) {
      this.currentStep = null;
      return this.runStep(data);
    }

    try {
      await this.currentStep.run(
        data,
        this.results,
        this.config,
        { step: this.currentStep, pipeline: this }
      );
    } catch (e) {
      this.currentStep.state = STEP_STATE_ERROR;

      // If the step fails (throws), it can be handled by the step
      // to auto-correct failures, customize error behaviour, etc.
      if (this.currentStep.onError) {
        await this.currentStep.onError(e, data, this.results, this.config, { step: this.currentStep, pipeline: this });
        return this.runStep(data);
      }

      this.pending = null;
      this.currentStep = null;
      throw e;
    }

    this.currentStep.state = STEP_STATE_COMPLETE;
    this.currentStep = null;
    return this.runStep(data);
  }
}
