Hi Paul
Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.
Here's a simple Perl script which will do what you want in parallel
(using fork).
Should be fairly self explanatory. You would need to install these Perl
modules:
#!/usr/bin/perl
use strict;
use warnings;
use Elasticsearch();
use Parallel::ForkManager();
use Data::Dumper;
our $Source_ES = '127.0.0.1:9200';
our $Dest_ES = '127.0.0.1:9201';
our $Source_Index = 'my_index';
our $Dest_Index = $Source_Index;
our $Max_Kids = 10;
our $Rows = 1000;
our $Per_Kid = 10 * $Rows;
my $source = Elasticsearch->new( servers => $Source_ES );
my $dest = Elasticsearch->new( servers => $Dest_ES );
my $pm = Parallel::ForkManager->new($Max_Kids);
$|++; # Auto-flush STDOUT to see progress
main();
#===================================
sub main {
#===================================
delete_index();
create_index();
put_mappings();
my $total = total_docs();
my $start = 0;
while ( $start < $total ) {
my $end = $start + $Per_Kid;
if ( $pm->start ) {
# parent
$start = $end;
next;
}
# child
index_docs( $start, $end );
$pm->finish;
}
$pm->wait_all_children;
print "\n - Done - \n";
}
#===================================
sub delete_index {
#===================================
print "Deleting index '$Dest_Index' in case it already exists\n";
eval {
$dest->delete_index( index => $Dest_Index );
wait_for_es();
};
}
#===================================
sub create_index {
#===================================
print "Creating index '$Dest_Index'\n";
$dest->create_index( index => $Dest_Index );
wait_for_es();
}
#===================================
sub put_mappings {
#===================================
print "Adding mappings\n";
my ($mappings) = values %{ $source->mapping( index => $Source_Index ) };
for my $type ( sort keys %$mappings ) {
print " - $type\n";
my $mapping = $mappings->{$type};
$dest->put_mapping(
index => $Dest_Index,
type => $type,
map { $_ => $mapping->{$_} } qw(_all _source properties)
);
}
wait_for_es();
}
#===================================
sub total_docs {
#===================================
my $total = $source->count(
index => $Source_Index,
match_all => {}
)->{count};
print "Indexing $total docs from '$Source_Index' to '$Dest_Index\n";
return $total;
}
#===================================
sub index_docs {
#===================================
my $start = shift;
my $end = shift;
while ( $start < $end ) {
print ".";
my @objects = map {
create => {
index => $Dest_Index,
type => $_->{_type},
id => $_->{_id},
data => $_->{_source}
}
},
@{
$source->search(
index => $Source_Index,
query => { match_all => {} },
sort => ['_id'],
from => $start,
size => $Rows
)->{hits}{hits}
};
my $result = $dest->bulk( \@objects );
die Dumper($result) if $result->{errors};
last if @objects < $Rows;
$start += $Rows;
}
}
#===================================
sub wait_for_es {
#===================================
$dest->cluster_health( wait_for_status => 'yellow' );
}
clint