Manual Reference Source

src/Job.js

import superagent from 'superagent';
import saNoCache from 'superagent-no-cache';
import EventEmitter from 'events';
import utils from './utils/utils.js';
import GSF_API from './utils/GSF_API';
import EVENTS from './utils/EVENTS';

const nocache = utils.isIE() ? saNoCache.withQueryStrings : saNoCache;

/**
 * The Job class is used for job operations.
 */
class Job extends EventEmitter {
  /**
   * @param {Client} client - The GSF Client object.
   * @param {string} jobId - The jobId.
   * @param {function(info: JobProgressInfo)} [progressCallback] - The callback to handle job progress.
   * @param {function(info: JobStartedInfo)} [startedCallback] - The callback that is called when the job starts.
   *  For more reliable job started information, listen to the GSF JobStarted
   *  events as this callback may not always get called.  In some cases the job
   *  can start before the callback is registered.
   * @emits {JobFailed}
   * @emits {JobSucceeded}
   * @emits {JobCompleted}
   * @emits {JobStarted}
   * @emits {JobAccepted}
   * @emits {JobProgress}
   */
  constructor(client, jobId, progressCallback, startedCallback) {
    // Init EventEmitter superclass.
    super();

    /**
     * The job Id.
     * @type {number}
     */
    this.jobId = jobId;

    // Server object.
    this._client = client;

    // Job endpoint.
    this._jobURL = [this._client.rootURL, GSF_API.JOBS_PATH,
      this.jobId].join('/');

    // Allow infinite listeners.
    this.setMaxListeners(0);

    // Store promise for wait() function if called.
    this._waiting = null;

    // Call progress and started callbacks if supplied to constructor.
    progressCallback && this.on(EVENTS.progress, progressCallback);
    startedCallback && this.on(EVENTS.started, startedCallback);

    // Function to handle events.
    this._handler = (eventName, data) => {
      // Only care about events pertaining to this job.
      if (data.jobId !== this.jobId) return;

      // Re-emit the rest of the events.
      this.emit(eventName, data);
    };

    // Listen for events from our server.  Pass
    // them into the handler with job event type.
    Object.keys(EVENTS).forEach((key) => {
      this._client.on(EVENTS[key], (data) => {
        this._handler(EVENTS[key], data);
      });
    });

  }

  /**
   * Waits for the job to complete.
   * @return {Promise<JobResults, error>} Returns a promise that is resolved when a job is
   *  successful, returning the job results object.
   *  If a job fails, the promise is rejected with an error message.
   */
  wait() {
    if (!this._waiting) {
      this._waiting = new Promise((resolve, reject) => {

        // Listen to job events.
        this.once(EVENTS.succeeded, (data) => {
          this.info().then((info) => {
            resolve(info.jobResults);
          });
        });
        this.once(EVENTS.failed, (data) => {
          this.info().then((info) => {
            reject(info.jobError);
          });
        });

        // Check to make sure it hasn't already completed.
        this.info().then((info) => {
          if (info.jobStatus === EVENTS.succeeded) {
            resolve(info.jobResults);
          } else if (info.jobStatus === EVENTS.failed) {
            reject(info.jobError);
          }
        }).catch((err) => {
          reject(err);
        });
      });
    }

    return this._waiting;
  }

  /**
   * The JobInfo object contains information about a job.
   * @typedef {Object} JobInfo
   * @property {string} serviceName - The name of the service.
   * @property {string} taskName - The name of the task.
   * @property {JobOptions} [jobOptions] - Processing directives to submit along with the job.
   * @property {Object} [inputParameters] - The input parameters.
   * @property {string} jobId - The job id.
   * @property {number} [jobProgress] - The percentage of job completion.
   * @property {string} [jobMessage] - A status message that is sent with progress updates.
   * @property {string} jobStatus - The status of the job. It can be Accepted,
   *  Started, Succeeded, or Failed.
   * @property {JobResults} [jobResults] - The job output results.
   * @property {string} [jobSubmitted] - Time the job was submitted.
   * @property {string} [jobStart] - Time the job started processing.
   * @property {string} [jobEnd] - Time the job finished processing.
   * @property {string} [jobError] - An error from the job, if there was one.
   * @property {NodeInfo} [nodeInfo] - Provides information about the node on which the job ran.
   */

  /**
   * Provides information about the node on which the job ran.
   * @typedef {Object} NodeInfo
   * @property {string} nodeAddress - This is the address of the machine that ran job.
   * @property {number} nodePort - The port of the server that ran the job.
   * @property {number} workerID - The ID of the worker that ran the job.
   */

  /**
   * The job output results.
   * @typedef {Object} JobResults
   * @property {*} <parameterName>.best - Result from the first parameter mapper which
   * was able to reverse translate the output value.
   * @property {*} <parameterName>.raw - The raw output value returned by the task.
   */

  /**
   * Retrieves the job information.
   * @return {Promise<JobInfo, error>} Returns a promise to a JobInfo object.
   */
  info() {
    return new Promise((resolve, reject) => {
      const jobStatusURL = this._jobURL;

      // Get job status.
      superagent
        .get(jobStatusURL)
        .use(nocache) // Prevents caching of *only* this request
        .set(this._client.headers)
        .end((err, res) => {
          if (res && res.ok) {
            resolve(res.body);
          } else {
            const status = ((err && err.status) ? ': ' + err.status : '');
            const text = ((err && err.response && err.response.text) ? ': ' +
             err.response.text : '');
            reject('Error requesting job info' + status + text);
          }
        });
    });
    
  }

  /**
   * Cancels the job.
   * @param {boolean} force - If true, the job will force cancel.  Please note that
   *  setting force to true may be unsafe depending on the type of job
   *  as it may not be able to properly shut down or clean up.
   * @return {Promise<true, error>} Returns a promise when cancel is submitted.  If request
   *  is successfully submitted, the promise will be resolved with a value of true.
   *  If the request fails, the promise will be resolved with an error message.
   *  Note that this only represents the success of the request made to the server,
   *  not the cancellation itself.  Use the Job.Info() function (or Job events)
   *  to retrieve the status of the job and to learn when it is actually cancelled.
   */
  cancel(force) {
    // Job url.
    const url = this._jobURL;
    return new Promise((resolve, reject) => {
      // Cancel force flag.
      const requestStatus = force ? 'KillRequested' : 'CancelRequested';
      // Cancel job.
      superagent
        .put(url)
        .set('Content-Type', 'application/json')
        .send(JSON.stringify({'jobStatus': requestStatus}))
        .use(nocache) // Prevents caching of *only* this request
        .set(this._client.headers)
        .end((err, res) => {
          if (res && res.ok) {
            resolve(true);
          } else {
            const status = ((err && err.status) ? ': ' + err.status : '');
            const text = ((err && err.response && err.response.text) ? ': ' +
             err.response.text : '');
            reject('Error cancelling job' + status + text);
          }
        });
    });
  }

  /**
 * Deletes the job.
 * @return {Promise<true, error>} Returns a promise when delete is submitted.  If request
 *  is successfully submitted, the promise will be resolved with a value of true.
 *  If the request fails, the promise will be resolved with an error message.
 *  Note that this only represents the success of the request made to the server,
 *  not the deletion itself.  Use the Job.Info() function (or Job events)
 *  to retrieve the status of the job and to learn when it is actually deleted.
 */
  delete() {
    // Job url.
    const url = this._jobURL;
    return new Promise((resolve, reject) => {
      // Delete job.
      superagent
        .delete(url)
        .set('Content-Type', 'application/json')
        .use(nocache) // Prevents caching of *only* this request
        .set(this._client.headers)
        .end((err, res) => {
          if (res && res.ok) {
            resolve(true);
          } else {
            const status = err && err.status ? ': ' + err.status : '';
            const text =
              err && err.response && err.response.text ?
                ': ' + err.response.text :
                '';
            reject('Error deleting job' + status + text);
          }
        });
    });
  }

  /**
   * Retrieves a list of the workspace files.
   * @return {Promise<Object[], error>} Returns a promise to an array of fs.stat objects.
   */
  workspace() {
    return new Promise((resolve, reject) => {
      const jobStatusURL = this._jobURL;
      // List workspace files.
      superagent
        .get(`${jobStatusURL}/workspace`)
        .use(nocache) // Prevents caching of *only* this request
        .set(this._client.headers)
        .end((err, res) => {
          if (res && res.ok) {
            resolve(res.body.workspace);
          } else {
            const status = ((err && err.status) ? ': ' + err.status : '');
            const text = ((err && err.response && err.response.text) ? ': ' +
             err.response.text : '');
            reject('Error requesting job workspace' + status + text);
          }
        });
    });
  }

  /**
   * Retrieves a workspace file.
   * @return {Promise<arraybuffer, error>} Returns a promise to an ArrayBuffer of the file contents.
   */
  file(fileName) {
    return new Promise((resolve, reject) => {
      const jobStatusURL = this._jobURL;
      // Get file as arraybuffer.
      superagent
        .get(`${jobStatusURL}/workspace/${fileName}`)
        .parse(superagent.parse['application/octet-stream'])
        .responseType('arraybuffer')
        .then((res) => {
          resolve(res.body);
        }).catch((err) => {
          const status = ((err && err.status) ? ': ' + err.status : '');
          reject('Error requesting file' + status);
        });
    });
  }
}

export default Job;