Optimal Map/Reduce Script

01/26/2022

Below is an example of how to use the map and reduce stage in optimal approach to take advantage of the governance points of each stage at a possible max limit.

Requires the following SuiteScript module:

  • N/search
  • N/cache
  • N/runtime

Private Function:

  • _checkBatchCache - This is the important part of this implementation.


/**
 * @NApiVersion 2.1
 * @NScriptType MapReduceScript
 */

define(
[
    'N/cache',
    'N/runtime',
    'N/search'
], (
    cache,
    runtime,
    search
) => {

    'use strict';
    
    //@method getInputData
    function getInputData(context) {
        const searchResults = [];
        return searchResults;
    }

    //@method map
    function map(context) {
        const { key, value } = context;

        const data = JSON.parse(value);

        const batchId = _checkBatchCache(data);

        context.write({
            key: batchId,
            value: data
        });
    }

    //@method reduce
    function reduce(context) {
        const { key, values } = context;

        //could be use as:
        /*
        for (const value of values) {
            //process value here...
        }
        */
        
        context.write({
            key: key,
            value: values.length
        });
    }

    //@method summarize
    function summarize (summary) {
        handleSummary(summary);
    }

    //@method handleError
    function handleError(err, stage) {
        log.error({title: stage, details: err});
    }

    //@method handleSuccess
    function handleSuccess(stage, summary) {
        if (summary && (summary?.isRestarted && summary.isRestarted)) {
            log.audit({details: 'Summary stage is being restarted!'});
        }

        let totalRecordsUpdated = 0;
        summary.output.iterator().each(function (key, value) {
            totalRecordsUpdated += Number(value);
            return true;
        });
        const processedMsg = ['map', 'reduce'].includes(stage) ? `Total records processed=${totalRecordsUpdated}` : '';
        log.audit({title: stage, details: `Completed. ${processedMsg}`});
    }

    //@method handleSummary
    function handleSummary(summary) {
        if (summary.inputSummary.error) {
            const err = error.create({
                name: 'INPUT_STAGE_FAILED',
                message: summary.inputSummary.error
            });
            handleError(err, 'getInputData');
        } else {
            handleSuccess('getInputData', summary);
        }
        handleErrorInStage('map', summary);
        handleErrorInStage('reduce', summary);
    }

    //@method handleErrorInStage
    function handleErrorInStage(stage, summary) {
        let errorMsg = [];
        summary[`${stage}Summary`].errors.iterator().each(function(key, value){
            const msg = `Failed to process ${key}. Error was ${JSON.parse(value).message}
`;
            errorMsg.push(msg);
            return true;
        });

        if (errorMsg.length > 0) {
            const err = error.create({
                name: 'PROCESSING_FAILED',
                message: JSON.stringify(errorMsg)
            });
            handleError(err, stage);
        } else {
            handleSuccess(stage, summary);
        }
    }

    //@method _checkBatchCache
    function _checkBatchCache(value) {
        let batchCache = cache.getCache({
            name: 'batch',
            scope: cache.Scope.PRIVATE
        });
        let cacheData = batchCache.get({
            key: 'data'
        });

        let timestamp = null;
        let data = null;
        if (cacheData) {
            data = JSON.parse(cacheData);
            timestamp = data.key;
            if (Array.isArray(data.values) && data.values.length < 100) {
                data.values.push(value);

                batchCache.remove({
                    key: 'data'
                });
                batchCache.put({
                    key: 'data',
                    value: JSON.stringify({
                        key: timestamp,
                        values: data.values
                    })
                });
            } else {
                batchCache.remove({
                    key: 'data'
                });

                data = [];
                data.push(value);
                timestamp = new Date().getTime();

                batchCache.put({
                    key: 'data',
                    value: JSON.stringify({
                        key: timestamp,
                        values: data
                    })
                });
            }
        } else {
            data = [];
            data.push(value);
            timestamp = new Date().getTime();

            batchCache.put({
                key: 'data',
                value: JSON.stringify({
                    key: timestamp,
                    values: data
                })
            });
        }

        return timestamp;
    }

    return {
        config:{
            retryCount: 3,
            exitOnError: true
        },
        getInputData,
        map,
        reduce,
        summarize
    };
});