Below is an example of how to use the map and reduce stage in optimal approach to take advantage of the governance points of each stage at a possible max limit.
Requires the following SuiteScript module:
- N/search
- N/cache
- N/runtime
Private Function:
- _checkBatchCache - This is the important part of this implementation.
/**
* @NApiVersion 2.1
* @NScriptType MapReduceScript
*/
define(
[
'N/cache',
'N/runtime',
'N/search'
], (
cache,
runtime,
search
) => {
'use strict';
//@method getInputData
function getInputData(context) {
const searchResults = [];
return searchResults;
}
//@method map
function map(context) {
const { key, value } = context;
const data = JSON.parse(value);
const batchId = _checkBatchCache(data);
context.write({
key: batchId,
value: data
});
}
//@method reduce
function reduce(context) {
const { key, values } = context;
//could be use as:
/*
for (const value of values) {
//process value here...
}
*/
context.write({
key: key,
value: values.length
});
}
//@method summarize
function summarize (summary) {
handleSummary(summary);
}
//@method handleError
function handleError(err, stage) {
log.error({title: stage, details: err});
}
//@method handleSuccess
function handleSuccess(stage, summary) {
if (summary && (summary?.isRestarted && summary.isRestarted)) {
log.audit({details: 'Summary stage is being restarted!'});
}
let totalRecordsUpdated = 0;
summary.output.iterator().each(function (key, value) {
totalRecordsUpdated += Number(value);
return true;
});
const processedMsg = ['map', 'reduce'].includes(stage) ? `Total records processed=${totalRecordsUpdated}` : '';
log.audit({title: stage, details: `Completed. ${processedMsg}`});
}
//@method handleSummary
function handleSummary(summary) {
if (summary.inputSummary.error) {
const err = error.create({
name: 'INPUT_STAGE_FAILED',
message: summary.inputSummary.error
});
handleError(err, 'getInputData');
} else {
handleSuccess('getInputData', summary);
}
handleErrorInStage('map', summary);
handleErrorInStage('reduce', summary);
}
//@method handleErrorInStage
function handleErrorInStage(stage, summary) {
let errorMsg = [];
summary[`${stage}Summary`].errors.iterator().each(function(key, value){
const msg = `Failed to process ${key}. Error was ${JSON.parse(value).message}
`;
errorMsg.push(msg);
return true;
});
if (errorMsg.length > 0) {
const err = error.create({
name: 'PROCESSING_FAILED',
message: JSON.stringify(errorMsg)
});
handleError(err, stage);
} else {
handleSuccess(stage, summary);
}
}
//@method _checkBatchCache
function _checkBatchCache(value) {
let batchCache = cache.getCache({
name: 'batch',
scope: cache.Scope.PRIVATE
});
let cacheData = batchCache.get({
key: 'data'
});
let timestamp = null;
let data = null;
if (cacheData) {
data = JSON.parse(cacheData);
timestamp = data.key;
if (Array.isArray(data.values) && data.values.length < 100) {
data.values.push(value);
batchCache.remove({
key: 'data'
});
batchCache.put({
key: 'data',
value: JSON.stringify({
key: timestamp,
values: data.values
})
});
} else {
batchCache.remove({
key: 'data'
});
data = [];
data.push(value);
timestamp = new Date().getTime();
batchCache.put({
key: 'data',
value: JSON.stringify({
key: timestamp,
values: data
})
});
}
} else {
data = [];
data.push(value);
timestamp = new Date().getTime();
batchCache.put({
key: 'data',
value: JSON.stringify({
key: timestamp,
values: data
})
});
}
return timestamp;
}
return {
config:{
retryCount: 3,
exitOnError: true
},
getInputData,
map,
reduce,
summarize
};
});