plugins/network/s3_client.js

const { Context } = require('../../core/context');
const fs = require('fs');
const Minio = require('minio');
const { Plugin } = require('../plugin');
const { S3Transfer } = require('./s3_transfer');

const MAX_ATTEMPTS = 10;

/**
 * This is a subset of the list of S3 errors at
 * https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList.
 *
 * These errors apply to PUT/POST requests (uploads) and should be
 * considered fatal, causing the S3Client to stop attempting the
 * current upload job.
 *
 * While the S3Client does retry on transient network errors, it will
 * not retry on fatal errors that will be just as fatal in the next
 * attempt. For example, an InvalidAccessKeyId error indicates that
 * we're using bad credentials and we should not retry with the same
 * bad credentials.
 */
const FATAL_UPLOAD_ERRORS = [
    'AccessDenied',
    'AccountProblem',
    'AllAccessDisabled',
    'EntityTooSmall',
    'EntityTooLarge',
    'InvalidAccessKeyId',
    'InvalidArgument',
    'InvalidBucketName',
    'InvalidEncryptionAlgorithmError',
    'InvalidLocationConstraint',
    'InvalidObjectState',
    'InvalidPayer',
    'InvalidSecurity',
    'InvalidStorageClass',
    'InvalidURI',
    'KeyTooLongError',
    'MaxMessageLengthExceeded',
    'MaxPostPreDataLengthExceededError',
    'MetadataTooLarge',
    'MethodNotAllowed',
    'NoSuchBucket',
    'NotImplemented',
    'NotSignedUp',
    'PreconditionFailed'
];

/**
 * S3Client provides access to S3 REST services that conforms to the
 * DART network client interface.
 *
 */
class S3Client extends Plugin {
    /**
     * Creates a new S3Client.
     *
     * @param {StorageService} storageService - A StorageService record that
     * includes information about how to connect to a remote S3 service.
     * This record includes the host URL, default bucket, and connection
     * credentials.
     */
    constructor(storageService) {
        super();
        this.storageService = storageService;
    }

    /**
     * Returns a {@link PluginDefinition} object describing this plugin.
     *
     * @returns {PluginDefinition}
     */
    static description() {
        return {
            id: '23a8f0af-a03a-418e-89a4-6d07799882b6',
            name: 'S3Client',
            description: 'Built-in DART S3 network client',
            version: '0.1',
            readsFormats: [],
            writesFormats: [],
            implementsProtocols: ['s3'],
            talksToRepository: [],
            setsUp: []
        };
    }

    /**
     * Uploads a file to the remote bucket. The name of the remote bucket is
     * determined by the {@link StorageService} passed in to this class'
     * constructor.
     *
     * @param {string} filepath - The path to the local file to be uploaded
     * to S3.
     *
     * @param {string} keyname - This name of the key to put into the remote
     * bucket. This parameter is optional. If not specified, it defaults to
     * the basename of filepath. That is, /path/to/bagOfPhotos.tar would
     * default to bagOfPhotos.tar.
     *
     */
    upload(filepath, keyname) {
        if (!filepath) {
            throw new Error('Param filepath is required for upload.');
        }
        if (!keyname) {
            keyname = path.basename(filepath);
        }
        var xfer = this._initXferRecord('upload', filepath, keyname);
        try {
            if (xfer.localStat == null || !(xfer.localStat.isFile() || xfer.localStat.isSymbolicLink())) {
                xfer.result.finish(Context.y18n.__('%s is not a file', filepath));
                this.emit('error', xfer.result);
                return;
            }
            Context.logger.info(Context.y18n.__('Starting upload'));
            this._upload(xfer);
        } catch (err) {
            xfer.result.finish(err.toString());
            this.emit('error', xfer.result);
        }
    }

    /**
     * Downloads a file from the remote bucket. The name of the remote bucket is
     * determined by the {@link StorageService} passed in to this class'
     * constructor.
     *
     * @param {string} filepath - The local path to which we should save the
     * downloaded file.
     *
     * @param {string} keyname - This name of the key (object) to download from
     * the S3 bucket.
     *
     */
    download(filepath, keyname) {
        var s3Client = this;
        var minioClient = s3Client._getClient();
        var xfer = this._initXferRecord('download', filepath, keyname);
        xfer.result.info = `Downloading ${xfer.host} ${xfer.bucket}/${xfer.key} to ${xfer.localPath}`;
        this.emit('start', xfer.result);
        // TODO: Build in retries?
        minioClient.fGetObject(xfer.bucket, xfer.key, xfer.localPath, function(err) {
            if (err) {
                xfer.result.finish(err.toString());
                s3Client.emit('error', xfer.result);
                xfer.result.finish(err.toString());
                return;
            } else {
                xfer.localStat = fs.statSync(filepath);
                xfer.result.filesize = xfer.localStat.size;
                xfer.result.finish();
            }
            Context.logger.info(Context.y18n.__('Finished download'));
            s3Client.emit('finish', xfer.result);
        });
    }

    /**
     * Lists files in a remote S3 bucket. NOT YET IMPLEMENTED.
     *
     */
    list() {
        throw 'S3Client.list() is not yet implemented.';
        // var minioClient = this.getClient();
        // var stream = minioClient.listObjects(this.storageService.bucket, '', false);
        // stream.on('data', function(obj) { console.log(obj) } )
        // stream.on('error', function(err) { console.log("Error: " + err) } )
    }

    /**
     * Checks to see whether a file already exists on the storage provider.
     * NOT YET IMPLEMENTED.
     *
     * @param {string} key - The key whose existence you want to check.
     *
     * @returns {bool} - True if the file exists.
     */
    exists(key) {
        throw 'S3Client.exists() is not yet implemented.';
        try {
            // TODO: Write me
        } catch (err) {
            // TODO: Write me
        }
        return trueOrFalse;
    }

    /**
     * Creates the S3Transfer record used internally by this client
     * to record details of an S3 upload or download.
     *
     * @param {string} operation - Should be 'upload' or 'download'.
     *
     * @param {string} filepath - Path of the file on the local file system
     * to upload (if operation is uplaod). Path on the local file system where
     * we should write the contents of the S3 object we download (if operation
     * is download).
     *
     * @param {string} key - The S3 key to upload or download.
     *
     * @private
     * @returns {S3Transfer}
     */
    _initXferRecord(operation, filepath, key) {
        var xfer = new S3Transfer(operation, S3Client.description().name);
        xfer.host = this.storageService.host;
        xfer.port = this.storageService.port;
        xfer.localPath = filepath;
        xfer.bucket = this.storageService.bucket;
        xfer.key = key;
        xfer.result.start();
        if (operation === 'upload') {
            xfer.localStat = fs.lstatSync(filepath);
            xfer.result.filesize = xfer.localStat.size;
            xfer.result.filepath = filepath;
            xfer.result.fileMtime = xfer.localStat.mtime;
        }
        if (operation === 'download') {
            xfer.result.remoteURL = xfer.getRemoteUrl();
        }
        return xfer;
    }

    /**
     * Uploads a file to S3.
     *
     * @param {S3Transfer} xfer - An object describing what to upload
     * and where it should go.
     *
     * @private
     */
    _upload(xfer) {
        var s3Client = this;
        var minioClient = s3Client._getClient();
        // Metadata works with fPutObject, but not with putObject,
        // so this is commented out for now. We can delete it if we
        // stick with putObject (which allows our progress bar to work, sort of).
        // var metadata = {
        //     'Uploaded-By': `${Context.dartVersion()}`,
        //     'Original-Path': xfer.localPath,
        //     'Size': xfer.localStat.size
        // };
        xfer.result.info = `Uploading ${xfer.localPath} to ${xfer.host} ${xfer.bucket}/${xfer.key}`;
        this.emit('start', xfer.result)
        let fileStream = fs.createReadStream(xfer.localPath);
        fileStream.on('data', (chunk) => {
            xfer.bytesTransferred += chunk.length;
            s3Client.emit('status', xfer);
        });
        try {
            minioClient.putObject(xfer.bucket, xfer.key, fileStream, xfer.localStat.size, function(err, etag) {
                if (err) {
                    s3Client._handleError(err, xfer);
                    return;
                }
                // Note: Buckets must allow GetObject or you'll get
                // "valid credentials required" error from remote.
                xfer.remoteChecksum = etag;
                minioClient.statObject(xfer.bucket, xfer.key, function(err, remoteStat) {
                    if (err) {
                        Context.logger.error('Error getting object info after upload: %s', err.toString());
                        xfer.result.finish(err.toString());
                        s3Client.emit('error', xfer.result);
                        return;
                    }
                    xfer.remoteStat = remoteStat;
                    s3Client._verifyRemote(xfer);
                });
            });
        } catch (err) {
            Context.logger.error('Upload error: %s', err.toString());
            xfer.result.finish(err.toString());
            s3Client.emit('error', xfer.result);
        }
    }

    /**
     * Ensures that the file we uploaded to S3 has the correct size
     * and that no errors occurred.
     *
     * @param {S3Transfer} xfer - An object describing what to upload
     * and where it should go.
     *
     * @private
     */
    _verifyRemote(xfer) {
        var message = null;
        if (xfer.error) {
            message = `After upload, could not get object stats. ${xfer.error.toString()}`;
        }
        if (!xfer.error && xfer.remoteStat.size != xfer.localStat.size) {
            message = `Object was not correctly uploaded. Local size is ${xfer.localStat.size}, remote size is ${xfer.remoteStat.size}`;
        } else {
            xfer.result.remoteURL = xfer.getRemoteUrl();
            xfer.result.remoteChecksum = xfer.remoteStat.etag;
        }
        xfer.result.finish(message);
        Context.logger.info(Context.y18n.__('Finished upload'));
        this.emit('finish', xfer.result);
    }

    /**
     * Handles an S3 upload error by either retrying or quitting and recording
     * the error message.
     *
     * @param {Error} err - An optional error object, caught during the upload
     * attempt.
     *
     * @param {S3Transfer} xfer - An object describing what to upload
     * and where it should go.
     *
     * @private
     */
    _handleError(err, xfer) {
        if (xfer.result.attempt < MAX_ATTEMPTS && !FATAL_UPLOAD_ERRORS.includes(err.code)) {
            // ECONNRESET: Connection reset by peer is common on large uploads.
            // Minio client is smart enough to pick up where it left off.
            // Log a warning, wait 5 seconds, then try again.
            xfer.result.warning = `Got error ${err.code} (request id ${err.requestid}) on attempt number ${xfer.result.attempt} while attempting to send ${xfer.result.filepath} to ${this.storageService.host}. Will try again in 1.5 seconds.`;
            this.emit('warning', xfer.result);
            let s3Client = this;
            setTimeout(function() {
                xfer.result.attempt += 1;
                Context.logger.info('Reattempting upload');
                s3Client._upload(xfer);
            }, 1500);
        } else {
            Context.logger.error('Too many failed upload attempts');
            xfer.result.finish(err.toString());
            this.emit('error', xfer.result);
        }
    }

    /**
     * Returns a Minio S3 client.
     *
     * @private
     */
    _getClient() {
        var minioClient = new Minio.Client({
            endPoint:  this.storageService.host,
            port: this.storageService.port || 443,
            accessKey: this.storageService.getValue('login'),
            secretKey: this.storageService.getValue('password'),
            useSSL: this.storageService.host != 'localhost' && this.storageService.host != '127.0.0.1',
        });
        // TODO: This is too specialized to go in a general-use client.
        // Where should this go?
        if (this.storageService.host == 's3.amazonaws.com' && this.storageService.bucket.startsWith('aptrust.')) {
            minioClient.region = 'us-east-1';
        }
        return minioClient;
    }

    /**
     * @event S3Client#start
     * @type {string} A message indicating that the upload or download is starting.
     *
     * @event S3Client#warning
     * @type {string} A warning message describing why the S3Client is retrying
     * an upload or download operation.
     *
     * @event S3Client#error
     * @type {OperationResult} Contains information about what went wrong during
     * an upload or download operation.
     *
     * @event S3Client#finish
     * @type {OperationResult} Contains information about the outcome of
     * an upload or download operation.
     */

}

// Use because declaring module.exports above cause jsdoc to screw up.
module.exports = S3Client;