I'm struggling to pass documents through an Ingest pipeline during bulk indexing operations and wonder if anyone else have made this work properly using the CPAN Search::Elasticsearch perl framework, which I use for all Elasticsearch operations (from index creation to document indexing and searches).
For bulk indexing I'm using the bulk_helper() method to get a bulk instance which allows me to call add_action() for every document I want to index in one bulk operation:
my $es = Search::Elasticsearch->new( %host_conf );
my $bulk = $es->bulk_helper(
%bulk_conf,
on_success => sub { ... },
on_error => sub { ... } );
for my $d (@docs) {
$bulk->add_action( index => { index=>$d->{index}, type=>$d->{type}, source=>$d->{body} } );
}
The perl documentation says "Each action can include the same parameters that you would pass to the equivalent "index() ..." so I looked up the regular index() method docs which lists "Query string parameters: op_type, parent, pipeline, ...". So it seems I can add a pipeline query string parameter to the bulk operation. But where do I do that? I've already tried
$es->bulk_helper( ..., pipeline=>$my_pipe ); # fails silently
$bulk->add_action( index=>{ ... }, pipeline=>$my_pipe ); # throws "Unrecognised action" exception
$bulk->add_action( index=>{ ... , pipeline=>$my_pipe } ); # throws "Unknown params " exception
I'm now running out of ideas on how to get pipeline added to the bulk operation.
Refs:
http://search.cpan.org/dist/Search-Elasticsearch/lib/Search/Elasticsearch/Client/5_0/Direct.pm#bulk()
http://search.cpan.org/~drtech/Search-Elasticsearch-5.01/lib/Search/Elasticsearch/Client/5_0/Direct.pm#index()