Script to move data between clusters


(ppearcy) #1

Hey,
Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

I started to write this against the java node client until I came to
the realization that this would not work between different versions,
as my app must choose to be compatible with 0.12 based cluster or a
0.13 based cluster depending on which ES jar I am pulling in. This is
the one minor draw back of using the Node client. I am not a java
expert, so maybe there is some way to pull in both jar files into a
single app(my only thought is that I could hack one to have a slightly
different import path, but that seems very sketchy). The other
alternative in Java is two apps, one to do a data dump and the other
to import these files.

Currently, we are just re-flowing data from our backend storage, but
our backend data store is not that fast and there is a good amount of
overhead extracting content from docs (especially PDFs).

Thanks!
Paul


(Ryan Crumley) #2

You could use osgi to load both versions into the same jvm. Here is an
example that loads multiple groovy versions into the same jvm:

OSGI itself can be painful... the tooling often does not work as well as you
hope.

http://hamletdarcy.blogspot.com/2008/12/beginners-guide-to-osgi-on-desktop.html
Ryan

On Thu, Nov 11, 2010 at 11:39 AM, Paul ppearcy@gmail.com wrote:

Hey,
Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

I started to write this against the java node client until I came to
the realization that this would not work between different versions,
as my app must choose to be compatible with 0.12 based cluster or a
0.13 based cluster depending on which ES jar I am pulling in. This is
the one minor draw back of using the Node client. I am not a java
expert, so maybe there is some way to pull in both jar files into a
single app(my only thought is that I could hack one to have a slightly
different import path, but that seems very sketchy). The other
alternative in Java is two apps, one to do a data dump and the other
to import these files.

Currently, we are just re-flowing data from our backend storage, but
our backend data store is not that fast and there is a good amount of
overhead extracting content from docs (especially PDFs).

Thanks!
Paul


(ppearcy) #3

Cool, thanks Ryan. Will take a look at that approach.

On Nov 11, 10:46 am, Ryan Crumley crum...@gmail.com wrote:

You could use osgi to load both versions into the same jvm. Here is an
example that loads multiple groovy versions into the same jvm:

http://hamletdarcy.blogspot.com/2008/12/beginners-guide-to-osgi-on-de...

OSGI itself can be painful... the tooling often does not work as well as you
hope.

http://hamletdarcy.blogspot.com/2008/12/beginners-guide-to-osgi-on-de...
Ryan

On Thu, Nov 11, 2010 at 11:39 AM, Paul ppea...@gmail.com wrote:

Hey,
Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

I started to write this against the java node client until I came to
the realization that this would not work between different versions,
as my app must choose to be compatible with 0.12 based cluster or a
0.13 based cluster depending on which ES jar I am pulling in. This is
the one minor draw back of using the Node client. I am not a java
expert, so maybe there is some way to pull in both jar files into a
single app(my only thought is that I could hack one to have a slightly
different import path, but that seems very sketchy). The other
alternative in Java is two apps, one to do a data dump and the other
to import these files.

Currently, we are just re-flowing data from our backend storage, but
our backend data store is not that fast and there is a good amount of
overhead extracting content from docs (especially PDFs).

Thanks!
Paul


(Clinton Gormley) #4

Hi Paul

Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

Here's a simple Perl script which will do what you want in parallel
(using fork).

Should be fairly self explanatory. You would need to install these Perl
modules:

#!/usr/bin/perl

use strict;
use warnings;
use ElasticSearch();
use Parallel::ForkManager();

use Data::Dumper;

our $Source_ES = '127.0.0.1:9200';
our $Dest_ES = '127.0.0.1:9201';
our $Source_Index = 'my_index';
our $Dest_Index = $Source_Index;
our $Max_Kids = 10;
our $Rows = 1000;
our $Per_Kid = 10 * $Rows;

my $source = ElasticSearch->new( servers => $Source_ES );
my $dest = ElasticSearch->new( servers => $Dest_ES );
my $pm = Parallel::ForkManager->new($Max_Kids);

$|++; # Auto-flush STDOUT to see progress

main();

#===================================
sub main {
#===================================
delete_index();
create_index();
put_mappings();
my $total = total_docs();
my $start = 0;
while ( $start < $total ) {
my $end = $start + $Per_Kid;
if ( $pm->start ) {

        # parent
        $start = $end;
        next;
    }

    # child
    index_docs( $start, $end );
    $pm->finish;

}
$pm->wait_all_children;
print "\n - Done - \n";

}

#===================================
sub delete_index {
#===================================
print "Deleting index '$Dest_Index' in case it already exists\n";
eval {
$dest->delete_index( index => $Dest_Index );
wait_for_es();
};

}

#===================================
sub create_index {
#===================================
print "Creating index '$Dest_Index'\n";
$dest->create_index( index => $Dest_Index );
wait_for_es();

}

#===================================
sub put_mappings {
#===================================
print "Adding mappings\n";
my ($mappings) = values %{ $source->mapping( index => $Source_Index ) };
for my $type ( sort keys %$mappings ) {
print " - $type\n";
my $mapping = $mappings->{$type};
$dest->put_mapping(
index => $Dest_Index,
type => $type,
map { $_ => $mapping->{$_} } qw(_all _source properties)
);
}
wait_for_es();

}

#===================================
sub total_docs {
#===================================
my $total = $source->count(
index => $Source_Index,
match_all => {}
)->{count};

print "Indexing $total docs from '$Source_Index' to '$Dest_Index\n";
return $total;

}

#===================================
sub index_docs {
#===================================
my $start = shift;
my $end = shift;

while ( $start < $end ) {
    print ".";
    my @objects = map {
        create => {
            index => $Dest_Index,
            type  => $_->{_type},
            id    => $_->{_id},
            data  => $_->{_source}
            }
        },
        @{
        $source->search(
            index => $Source_Index,
            query => { match_all => {} },
            sort  => ['_id'],
            from  => $start,
            size  => $Rows
            )->{hits}{hits}
        };

    my $result = $dest->bulk( \@objects );
    die Dumper($result) if $result->{errors};

    last if @objects < $Rows;
    $start += $Rows;
}

}

#===================================
sub wait_for_es {
#===================================
$dest->cluster_health( wait_for_status => 'yellow' );
}

clint


(ppearcy) #5

Awesome, thanks! Very helpful.

On Nov 11, 12:13 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Paul

Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

Here's a simple Perl script which will do what you want in parallel
(using fork).

Should be fairly self explanatory. You would need to install these Perl
modules:

#!/usr/bin/perl

use strict;
use warnings;
use ElasticSearch();
use Parallel::ForkManager();

use Data::Dumper;

our $Source_ES = '127.0.0.1:9200';
our $Dest_ES = '127.0.0.1:9201';
our $Source_Index = 'my_index';
our $Dest_Index = $Source_Index;
our $Max_Kids = 10;
our $Rows = 1000;
our $Per_Kid = 10 * $Rows;

my $source = ElasticSearch->new( servers => $Source_ES );
my $dest = ElasticSearch->new( servers => $Dest_ES );
my $pm = Parallel::ForkManager->new($Max_Kids);

$|++; # Auto-flush STDOUT to see progress

main();

#===================================
sub main {
#===================================
delete_index();
create_index();
put_mappings();
my $total = total_docs();
my $start = 0;
while ( $start < $total ) {
my $end = $start + $Per_Kid;
if ( $pm->start ) {

        # parent
        $start = $end;
        next;
    }

    # child
    index_docs( $start, $end );
    $pm->finish;

}
$pm->wait_all_children;
print "\n - Done - \n";

}

#===================================
sub delete_index {
#===================================
print "Deleting index '$Dest_Index' in case it already exists\n";
eval {
$dest->delete_index( index => $Dest_Index );
wait_for_es();
};

}

#===================================
sub create_index {
#===================================
print "Creating index '$Dest_Index'\n";
$dest->create_index( index => $Dest_Index );
wait_for_es();

}

#===================================
sub put_mappings {
#===================================
print "Adding mappings\n";
my ($mappings) = values %{ $source->mapping( index => $Source_Index ) };
for my $type ( sort keys %$mappings ) {
print " - $type\n";
my $mapping = $mappings->{$type};
$dest->put_mapping(
index => $Dest_Index,
type => $type,
map { $_ => $mapping->{$_} } qw(_all _source properties)
);
}
wait_for_es();

}

#===================================
sub total_docs {
#===================================
my $total = $source->count(
index => $Source_Index,
match_all => {}
)->{count};

print "Indexing $total docs from '$Source_Index' to '$Dest_Index\n";
return $total;

}

#===================================
sub index_docs {
#===================================
my $start = shift;
my $end = shift;

while ( $start < $end ) {
    print ".";
    my @objects = map {
        create => {
            index => $Dest_Index,
            type  => $_->{_type},
            id    => $_->{_id},
            data  => $_->{_source}
            }
        },
        @{
        $source->search(
            index => $Source_Index,
            query => { match_all => {} },
            sort  => ['_id'],
            from  => $start,
            size  => $Rows
            )->{hits}{hits}
        };

    my $result = $dest->bulk( \@objects );
    die Dumper($result) if $result->{errors};

    last if @objects < $Rows;
    $start += $Rows;
}

}

#===================================
sub wait_for_es {
#===================================
$dest->cluster_health( wait_for_status => 'yellow' );

}

clint


(Shay Banon) #6

Hi,

Yea, the way to go with that is using the REST based interface, which is
less likely to change its underlying format :). The perl client is
one possibility, another is by using HTTP in Java.

Actually, there could be a feature in elasticsearch where you would say:

Reindex "indexX" into this HTTP endpoint. Can be implemented and should be
faster.

-shay.banon

On Fri, Nov 12, 2010 at 3:04 AM, Paul ppearcy@gmail.com wrote:

Awesome, thanks! Very helpful.

On Nov 11, 12:13 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Paul

Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

Here's a simple Perl script which will do what you want in parallel
(using fork).

Should be fairly self explanatory. You would need to install these Perl
modules:

#!/usr/bin/perl

use strict;
use warnings;
use ElasticSearch();
use Parallel::ForkManager();

use Data::Dumper;

our $Source_ES = '127.0.0.1:9200';
our $Dest_ES = '127.0.0.1:9201';
our $Source_Index = 'my_index';
our $Dest_Index = $Source_Index;
our $Max_Kids = 10;
our $Rows = 1000;
our $Per_Kid = 10 * $Rows;

my $source = ElasticSearch->new( servers => $Source_ES );
my $dest = ElasticSearch->new( servers => $Dest_ES );
my $pm = Parallel::ForkManager->new($Max_Kids);

$|++; # Auto-flush STDOUT to see progress

main();

#===================================
sub main {
#===================================
delete_index();
create_index();
put_mappings();
my $total = total_docs();
my $start = 0;
while ( $start < $total ) {
my $end = $start + $Per_Kid;
if ( $pm->start ) {

        # parent
        $start = $end;
        next;
    }

    # child
    index_docs( $start, $end );
    $pm->finish;

}
$pm->wait_all_children;
print "\n - Done - \n";

}

#===================================
sub delete_index {
#===================================
print "Deleting index '$Dest_Index' in case it already exists\n";
eval {
$dest->delete_index( index => $Dest_Index );
wait_for_es();
};

}

#===================================
sub create_index {
#===================================
print "Creating index '$Dest_Index'\n";
$dest->create_index( index => $Dest_Index );
wait_for_es();

}

#===================================
sub put_mappings {
#===================================
print "Adding mappings\n";
my ($mappings) = values %{ $source->mapping( index => $Source_Index )
};
for my $type ( sort keys %$mappings ) {
print " - $type\n";
my $mapping = $mappings->{$type};
$dest->put_mapping(
index => $Dest_Index,
type => $type,
map { $_ => $mapping->{$_} } qw(_all _source properties)
);
}
wait_for_es();

}

#===================================
sub total_docs {
#===================================
my $total = $source->count(
index => $Source_Index,
match_all => {}
)->{count};

print "Indexing $total docs from '$Source_Index' to '$Dest_Index\n";
return $total;

}

#===================================
sub index_docs {
#===================================
my $start = shift;
my $end = shift;

while ( $start < $end ) {
    print ".";
    my @objects = map {
        create => {
            index => $Dest_Index,
            type  => $_->{_type},
            id    => $_->{_id},
            data  => $_->{_source}
            }
        },
        @{
        $source->search(
            index => $Source_Index,
            query => { match_all => {} },
            sort  => ['_id'],
            from  => $start,
            size  => $Rows
            )->{hits}{hits}
        };

    my $result = $dest->bulk( \@objects );
    die Dumper($result) if $result->{errors};

    last if @objects < $Rows;
    $start += $Rows;
}

}

#===================================
sub wait_for_es {
#===================================
$dest->cluster_health( wait_for_status => 'yellow' );

}

clint


(ppearcy) #7

+1 for that feature. Would provide a very easy upgrade path for
breaking changes in the gateway.

I opened a feature request:

Thanks!

On Nov 11, 10:57 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Hi,

Yea, the way to go with that is using the REST based interface, which is
less likely to change its underlying format :). The perl client is
one possibility, another is by using HTTP in Java.

Actually, there could be a feature in elasticsearch where you would say:

Reindex "indexX" into this HTTP endpoint. Can be implemented and should be
faster.

-shay.banon

On Fri, Nov 12, 2010 at 3:04 AM, Paul ppea...@gmail.com wrote:

Awesome, thanks! Very helpful.

On Nov 11, 12:13 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Paul

Was curious if anybody had already written scripts to copy data
between clusters by pulling docs from one cluster and indexing to
another. Preferably, something using the bulk APIs.

Here's a simple Perl script which will do what you want in parallel
(using fork).

Should be fairly self explanatory. You would need to install these Perl
modules:

#!/usr/bin/perl

use strict;
use warnings;
use ElasticSearch();
use Parallel::ForkManager();

use Data::Dumper;

our $Source_ES = '127.0.0.1:9200';
our $Dest_ES = '127.0.0.1:9201';
our $Source_Index = 'my_index';
our $Dest_Index = $Source_Index;
our $Max_Kids = 10;
our $Rows = 1000;
our $Per_Kid = 10 * $Rows;

my $source = ElasticSearch->new( servers => $Source_ES );
my $dest = ElasticSearch->new( servers => $Dest_ES );
my $pm = Parallel::ForkManager->new($Max_Kids);

$|++; # Auto-flush STDOUT to see progress

main();

#===================================
sub main {
#===================================
delete_index();
create_index();
put_mappings();
my $total = total_docs();
my $start = 0;
while ( $start < $total ) {
my $end = $start + $Per_Kid;
if ( $pm->start ) {

        # parent
        $start = $end;
        next;
    }
    # child
    index_docs( $start, $end );
    $pm->finish;
}
$pm->wait_all_children;
print "\n - Done - \n";

}

#===================================
sub delete_index {
#===================================
print "Deleting index '$Dest_Index' in case it already exists\n";
eval {
$dest->delete_index( index => $Dest_Index );
wait_for_es();
};

}

#===================================
sub create_index {
#===================================
print "Creating index '$Dest_Index'\n";
$dest->create_index( index => $Dest_Index );
wait_for_es();

}

#===================================
sub put_mappings {
#===================================
print "Adding mappings\n";
my ($mappings) = values %{ $source->mapping( index => $Source_Index )
};
for my $type ( sort keys %$mappings ) {
print " - $type\n";
my $mapping = $mappings->{$type};
$dest->put_mapping(
index => $Dest_Index,
type => $type,
map { $_ => $mapping->{$_} } qw(_all _source properties)
);
}
wait_for_es();

}

#===================================
sub total_docs {
#===================================
my $total = $source->count(
index => $Source_Index,
match_all => {}
)->{count};

print "Indexing $total docs from '$Source_Index' to '$Dest_Index\n";
return $total;

}

#===================================
sub index_docs {
#===================================
my $start = shift;
my $end = shift;

while ( $start < $end ) {
    print ".";
    my @objects = map {
        create => {
            index => $Dest_Index,
            type  => $_->{_type},
            id    => $_->{_id},
            data  => $_->{_source}
            }
        },
        @{
        $source->search(
            index => $Source_Index,
            query => { match_all => {} },
            sort  => ['_id'],
            from  => $start,
            size  => $Rows
            )->{hits}{hits}
        };
    my $result = $dest->bulk( \@objects );
    die Dumper($result) if $result->{errors};
    last if @objects < $Rows;
    $start += $Rows;
}

}

#===================================
sub wait_for_es {
#===================================
$dest->cluster_health( wait_for_status => 'yellow' );

}

clint


(system) #8