Perl Script to insert data into ES ~13M to ~17M documents a day

Fast background on System setup:
5 Nodes (JVM defaults setting)
10 shards; 0 replicas per index (indexes based on date)
Cluster Templates for new index base on git logstash - Works great

=== Template Start ===
{
"template": "al-logstash-",
"settings" : {
"number_of_shards" : 10,
"number_of_replicas" : 0,
"index" : {
"query" : { "default_field" : "@message" },
"store" : { "compress" : { "stored" : true, "tv": true } }
}
},
"mappings": {
"logs": {
"_all": { "enabled": false },
"_source": { "compress": true },
"dynamic_templates": [
{
"string_template" : {
"match" : "
",
"mapping": { "type": "string", "index": "analyzed"
},
"match_mapping_type" : "string"
}
}
],
"properties" : {
"@type" : { "type" : "string", "index" : "analyzed" },
"@message" : { "type" : "string", "index" : "analyzed" },
"@timestamp" : { "type" : "date", "index" : "analyzed" },
"@ip" : { "type" : "string", "index" : "analyzed" },
"@ident" : { "type" : "string", "index" : "analyzed" },
"@authuser" : { "type" : "string", "index" : "analyzed" },
"@protocol" : { "type" : "string", "index" : "analyzed" },
"@method" : { "type" : "string", "index" : "analyzed" },
"@request" : { "type" : "string", "index" : "analyzed" },
"@cs_referer" : { "type" : "string", "index" : "analyzed" },
"@user_agent" : { "type" : "string", "index" : "analyzed" },
"@bytes" : { "type" : "integer", "index" : "analyzed" },
"@response_code" : { "type" : "integer", "index" :
"analyzed" }
}
}
}
}
=== Template End ===

Perl Script:
=== Perl Script End ===

#!/usr/bin/perl

Parser for Logs

Version 2.1

Date: 2012-11-09

Richard Pardue

Notes:

2012-11-06

Removed the index and replaced with index_bulk

Changed index to bulk_index

Started adding error handling -> beta

2012-11-08

Move the client connection object outside the loop

2012-11-09

Remove the server refresh after insert and the 1sec system pause

Set perl to use ElasticSeach

use ElasticSearch;

Set app elasticseach client

my $es = ElasticSearch->new(
servers => ['127.0.0.1:9200',
'127.0.0.1:9201',
'127.0.0.1:9202',
'127.0.0.1:9203',
'127.0.0.1:9204'], # default
'127.0.0.1:9200'
transport => 'http', # default 'http'
timeout => 30,
#max_requests => 10_000, # default 10_000

     #trace_calls  => 'log_file.log',                                 
     #no_refresh   => 0 | 1,

);

Starts loop from STDIN

while (<>)
{
# Data that comes in from the pipe STDIN
my $data = $_;

    # Create 1st set of array data
    my @values = split('"', $data);                                     

Main Data Set fields

    my @values0 = split(' ', @values[0]);                           # 

IP/DTY plus?
my @values7 = split(' ', @values[7]); # ??
my @values2 = split(' ', @values[2]); #
Response Code and BW
my @values1 = split(' ', @values[1]); #
Method , request , protocol

    # Fields 
    my $field0  = @values0[0];                                         

Client IP Address

    my $field1  = @values0[1];                                         

-

    my $field2  = @values0[2];                                         

-

    my $field3  = substr(substr("@values0[3] @values0[4]",1),0,-1); # 

DTS
#my $field4 = @values7[7]; #
CARTCOOKIEUUID
#my $field5 = @values7[8]; #
ASP.NET_SessionId
my $field6 = @values[5];

User Agent

    my $field7  = @values2[0];                                        # 

ResponseCode
my $field8 = @values2[1]; #
bytes
my $field9 = @values1[0]; #
Method
my $field10 = @values1[1]; #
Request
my $field11 = @values1[2]; #
Protocol
my $field12 = @values0[1]; #
ident
my $field13 = @values0[2]; #
authuser
my $field14 = @values[3]; #
cs(Reerer)

    # Charge val for date changes
    my %mo = (
            'Jan'=>'01',
            'Feb'=>'02',
            'Mar'=>'03',
            'Apr'=>'04',
            'May'=>'05',
            'Jun'=>'06',                                               
  
            'Jul'=>'07',                                               
  
            'Aug'=>'08',
            'Sep'=>'09',
            'Oct'=>'10',
            'Nov'=>'11',
            'Dec'=>'12'
    );
    
    # Formats the log date/time from apache to ISOFormat
    my $tmptime = substr($field3,7,4) . '-' . $mo{substr($field3,3,3)} 

. '-' . substr($field3,0,2) . 'T' . substr($field3,12);

    # Format the Date for creating logstash index
    my $mylogdts = substr($field3,7,4) . '.' . $mo{substr($field3,3,3)} 

. '.' . substr($field3,0,2);

    # Set new log date/time to field
    $field3 = substr($tmptime,0,-6);
    
    # Set the logstash index name string
    $mylogdts = "al-logstash-$mylogdts";
                                                                       
# Forces a lookup of live nodes
#$results = $es->refresh_servers();
  
    # Uses the elasticseach client to bluk insert data into ElasticSeach
    $results = $es->bulk_index(
                                index       => $mylogdts,               
            
                                type        => 'logs',
                               refresh     => 1,
                                #on_conflict => 'IGNORE',
                                #on_error    => 'IGNORE',
                                on_error     => sub { myError },
                                docs    => [
                                            {
                                            data => {
                                                    '@type' => 'logs',
                                                    '@message' => 

$data,
'@timestamp' =>
$field3,
'@ip' => $field0,

                                                    '@ident' => 

$field12,
'@authuser' =>
$field13,
'@protocol' =>
$field11,
'@method' =>
$field9,
'@request' =>
$field10,
'@cs_referer' =>
$field14,
'@user_agent' =>
$field6,
'@bytes' =>
$field8,
'@response_code' =>
$field7,
},
},
]
);
}

My Error

sub myError {
print "*** ERROR: ***";
print $results;
$results = $es->refresh_servers();
print $results;
}

exit 0;

=== Perl Script End ===

All log files are compressed

Then using zcat * | perl my script.pl (~24 to 28 files)

The scripts then takes of running... inserting ~1000+ documents per sec
base on the 'head' plug-in when the refresh button is clocked... very fast
but then over a period of time ~ less then 30 mins the inserting drops to
~60 to 300 inserts per click of refresh button and the inserts take over
day to 2 days to finish.

Connect to each node: (Based on BigDesk plugin)
Node 1 = ~8 to 10
Node 2..5 = ~2 to 4

The System CPU running wide open... working great... still running query
I have also ran multi zcat and scripts at the same time.

Is there a better way to kept the inserts running as over 1000+ per sec?
Is there a better way to change the script and stop the CPU from running
wide open?

--

I use perl script bulk_index too. Suggest post your own map template
with '"index"
: "not_analyzed"', and use httplite transport. I got ~5000+ at 2 nodes.

2012/11/14 Richard Pardue richard.pardue@gmail.com

Fast background on System setup:
5 Nodes (JVM defaults setting)
10 shards; 0 replicas per index (indexes based on date)
Cluster Templates for new index base on git logstash - Works great

=== Template Start ===
{
"template": "al-logstash-",
"settings" : {
"number_of_shards" : 10,
"number_of_replicas" : 0,
"index" : {
"query" : { "default_field" : "@message" },
"store" : { "compress" : { "stored" : true, "tv": true } }
}
},
"mappings": {
"logs": {
"_all": { "enabled": false },
"_source": { "compress": true },
"dynamic_templates": [
{
"string_template" : {
"match" : "
",
"mapping": { "type": "string", "index": "analyzed"
},
"match_mapping_type" : "string"
}
}
],
"properties" : {
"@type" : { "type" : "string", "index" : "analyzed" },
"@message" : { "type" : "string", "index" : "analyzed" },
"@timestamp" : { "type" : "date", "index" : "analyzed" },
"@ip" : { "type" : "string", "index" : "analyzed" },
"@ident" : { "type" : "string", "index" : "analyzed" },
"@authuser" : { "type" : "string", "index" : "analyzed" },
"@protocol" : { "type" : "string", "index" : "analyzed" },
"@method" : { "type" : "string", "index" : "analyzed" },
"@request" : { "type" : "string", "index" : "analyzed" },
"@cs_referer" : { "type" : "string", "index" : "analyzed"
},
"@user_agent" : { "type" : "string", "index" : "analyzed"
},
"@bytes" : { "type" : "integer", "index" : "analyzed" },
"@response_code" : { "type" : "integer", "index" :
"analyzed" }
}
}
}
}
=== Template End ===

Perl Script:
=== Perl Script End ===

#!/usr/bin/perl

Parser for Logs

Version 2.1

Date: 2012-11-09

Richard Pardue

Notes:

2012-11-06

Removed the index and replaced with index_bulk

Changed index to bulk_index

Started adding error handling -> beta

2012-11-08

Move the client connection object outside the loop

2012-11-09

Remove the server refresh after insert and the 1sec system pause

Set perl to use ElasticSeach

use Elasticsearch;

Set app elasticseach client

my $es = Elasticsearch->new(
servers => ['127.0.0.1:9200',
'127.0.0.1:9201',
'127.0.0.1:9202',
'127.0.0.1:9203',
'127.0.0.1:9204'], # default '
127.0.0.1:9200'
transport => 'http', # default 'http'
timeout => 30,
#max_requests => 10_000, # default 10_000

     #trace_calls  => 'log_file.log',
     #no_refresh   => 0 | 1,

);

Starts loop from STDIN

while (<>)
{
# Data that comes in from the pipe STDIN
my $data = $_;

    # Create 1st set of array data
    my @values = split('"', $data);

Main Data Set fields

    my @values0 = split(' ', @values[0]);                           #

IP/DTY plus?
my @values7 = split(' ', @values[7]); #
??
my @values2 = split(' ', @values[2]); #
Response Code and BW
my @values1 = split(' ', @values[1]); #
Method , request , protocol

    # Fields
    my $field0  = @values0[0];

Client IP Address

    my $field1  = @values0[1];

-

    my $field2  = @values0[2];

-

    my $field3  = substr(substr("@values0[3] @values0[4]",1),0,-1); #

DTS
#my $field4 = @values7[7];

CARTCOOKIEUUID

    #my $field5  = @values7[8];

ASP.NET_SessionId

    my $field6  = @values[5];

User Agent

    my $field7  = @values2[0];

ResponseCode

    my $field8  = @values2[1];

bytes

    my $field9  = @values1[0];

Method

    my $field10 = @values1[1];                                       #

Request
my $field11 = @values1[2]; #
Protocol
my $field12 = @values0[1]; #
ident
my $field13 = @values0[2]; #
authuser
my $field14 = @values[3]; #
cs(Reerer)

    # Charge val for date changes
    my %mo = (
            'Jan'=>'01',
            'Feb'=>'02',
            'Mar'=>'03',
            'Apr'=>'04',
            'May'=>'05',
            'Jun'=>'06',

            'Jul'=>'07',

            'Aug'=>'08',
            'Sep'=>'09',
            'Oct'=>'10',
            'Nov'=>'11',
            'Dec'=>'12'
    );

    # Formats the log date/time from apache to ISOFormat
    my $tmptime = substr($field3,7,4) . '-' . $mo{substr($field3,3,3)}

. '-' . substr($field3,0,2) . 'T' . substr($field3,12);

    # Format the Date for creating logstash index
    my $mylogdts = substr($field3,7,4) . '.' .

$mo{substr($field3,3,3)} . '.' . substr($field3,0,2);

    # Set new log date/time to field
    $field3 = substr($tmptime,0,-6);

    # Set the logstash index name string
    $mylogdts = "al-logstash-$mylogdts";

# Forces a lookup of live nodes
#$results = $es->refresh_servers();

    # Uses the elasticseach client to bluk insert data into

ElasticSeach
$results = $es->bulk_index(
index => $mylogdts,

                                type        => 'logs',
                               refresh     => 1,
                                #on_conflict => 'IGNORE',
                                #on_error    => 'IGNORE',
                                on_error     => sub { myError },
                                docs    => [
                                            {
                                            data => {
                                                    '@type' => 'logs',
                                                    '@message' =>

$data,
'@timestamp' =>
$field3,
'@ip' => $field0,

                                                    '@ident' =>

$field12,
'@authuser' =>
$field13,
'@protocol' =>
$field11,
'@method' =>
$field9,
'@request' =>
$field10,
'@cs_referer' =>
$field14,
'@user_agent' =>
$field6,
'@bytes' =>
$field8,
'@response_code'
=> $field7,
},
},
]
);
}

My Error

sub myError {
print "*** ERROR: ***";
print $results;
$results = $es->refresh_servers();
print $results;
}

exit 0;

=== Perl Script End ===

All log files are compressed

Then using zcat * | perl my script.pl (~24 to 28 files)

The scripts then takes of running... inserting ~1000+ documents per sec
base on the 'head' plug-in when the refresh button is clocked... very fast
but then over a period of time ~ less then 30 mins the inserting drops to
~60 to 300 inserts per click of refresh button and the inserts take over
day to 2 days to finish.

Connect to each node: (Based on BigDesk plugin)
Node 1 = ~8 to 10
Node 2..5 = ~2 to 4

The System CPU running wide open... working great... still running query
I have also ran multi zcat and scripts at the same time.

Is there a better way to kept the inserts running as over 1000+ per sec?
Is there a better way to change the script and stop the CPU from running
wide open?

--

--

Hiya

Couple of notes on this:

my $es = Elasticsearch->new(

    servers      => ['127.0.0.1:9200',
                     '127.0.0.1:9201',
                     '127.0.0.1:9202',
                     '127.0.0.1:9203',
                     '127.0.0.1:9204'],              # default

'127.0.0.1:9200'

Why are you running several nodes on the same machine? You won't get any
performance benefit out of this, and in fact you're probably hurting
performance.

     transport    => 'http',                         # default

'http'

for fastest performance, use the 'curl' backend.

# Forces a lookup of live nodes
#$results = $es->refresh_servers();

No need to refresh - this is automatic

    $results = $es->bulk_index(
                                index       => $mylogdts,
                  
                                type        => 'logs',
                               refresh     => 1,
                                #on_conflict => 'IGNORE',
                                #on_error    => 'IGNORE',
                                on_error     => sub { myError },
                                docs    => [
                                            {
                                            data => {

You're using bulk, but only indexing one document at a time. Accumulate
eg 1,000 documents in @docs then pass all of them to bulk at the same
time.

clint

--