I'm trying to upsert a document from Cascading, and then I expect that the script will executed, but It doesn't happens at initial time, but when I try again (the document already exist) the script is executed.
//Setup
Properties props = new Properties();
props.setProperty(ConfigurationOptions.ES_NODES, properties.getProperty("es.nodes"));
props.setProperty(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT);
props.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT, "upsert-test");
props.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
props.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS_JSON, "{ param1 : 1.2 }");
Hfs inTap = new Hfs(new TextDelimited(false, "\n"), inputPath);
EsTap outTap = new EsTap("/test/test", Fields.ALL);
ScrubFunction scrubFunction = new ScrubFunction("id","test1","test2");
Pipe processPipe = new Each("processPipe", scrubFunction, Fields.RESULTS);
new Hadoop2MR1FlowConnector(props).connect(inTap, outTap, pagesPipe).complete();
upsert-test.groovy
import org.elasticsearch.common.logging.*;
ESLogger logger = ESLoggerFactory.getLogger('update-weights');
logger.info('Entering');
def test1 = ctx._source.'test1'
def test2 = ctx._source.'test2'
ctx._source.'test3' = test1 + test2
The logs when It runs a first time:
[2016-03-10 12:42:56,407][INFO ][cluster.metadata ] [Synch] [test] update_mapping [test] (dynamic)
Result:
{
_index: "test",
_type: "test",
_id: "1",
_score: 1,
_source: {
test1: 1,
test2: 2
}
}
At second time:
[2016-03-10 12:52:53,833][INFO ][upsert-test ] Entering
Result:
{
_index: "test",
_type: "test",
_id: "1",
_score: 1,
_source: {
test1: 1,
test2: 2,
test3: 3
}
}