Fast background on System setup:
5 Nodes (JVM defaults setting)
10 shards; 0 replicas per index (indexes based on date)
Cluster Templates for new index base on git logstash - Works great
=== Template Start ===
{
"template": "al-logstash-",
"settings" : {
"number_of_shards" : 10,
"number_of_replicas" : 0,
"index" : {
"query" : { "default_field" : "@message" },
"store" : { "compress" : { "stored" : true, "tv": true } }
}
},
"mappings": {
"logs": {
"_all": { "enabled": false },
"_source": { "compress": true },
"dynamic_templates": [
{
"string_template" : {
"match" : "",
"mapping": { "type": "string", "index": "analyzed"
},
"match_mapping_type" : "string"
}
}
],
"properties" : {
"@type" : { "type" : "string", "index" : "analyzed" },
"@message" : { "type" : "string", "index" : "analyzed" },
"@timestamp" : { "type" : "date", "index" : "analyzed" },
"@ip" : { "type" : "string", "index" : "analyzed" },
"@ident" : { "type" : "string", "index" : "analyzed" },
"@authuser" : { "type" : "string", "index" : "analyzed" },
"@protocol" : { "type" : "string", "index" : "analyzed" },
"@method" : { "type" : "string", "index" : "analyzed" },
"@request" : { "type" : "string", "index" : "analyzed" },
"@cs_referer" : { "type" : "string", "index" : "analyzed" },
"@user_agent" : { "type" : "string", "index" : "analyzed" },
"@bytes" : { "type" : "integer", "index" : "analyzed" },
"@response_code" : { "type" : "integer", "index" :
"analyzed" }
}
}
}
}
=== Template End ===
Perl Script:
=== Perl Script End ===
#!/usr/bin/perl
Parser for Logs
Version 2.1
Date: 2012-11-09
Richard Pardue
Notes:
2012-11-06
Removed the index and replaced with index_bulk
Changed index to bulk_index
Started adding error handling -> beta
2012-11-08
Move the client connection object outside the loop
2012-11-09
Remove the server refresh after insert and the 1sec system pause
Set perl to use ElasticSeach
use ElasticSearch;
Set app elasticseach client
my $es = ElasticSearch->new(
servers => ['127.0.0.1:9200',
'127.0.0.1:9201',
'127.0.0.1:9202',
'127.0.0.1:9203',
'127.0.0.1:9204'], # default
'127.0.0.1:9200'
transport => 'http', # default 'http'
timeout => 30,
#max_requests => 10_000, # default 10_000
#trace_calls => 'log_file.log',
#no_refresh => 0 | 1,
);
Starts loop from STDIN
while (<>)
{
# Data that comes in from the pipe STDIN
my $data = $_;
# Create 1st set of array data
my @values = split('"', $data);
Main Data Set fields
my @values0 = split(' ', @values[0]); #
IP/DTY plus?
my @values7 = split(' ', @values[7]); # ??
my @values2 = split(' ', @values[2]); #
Response Code and BW
my @values1 = split(' ', @values[1]); #
Method , request , protocol
# Fields
my $field0 = @values0[0];
Client IP Address
my $field1 = @values0[1];
-
my $field2 = @values0[2];
-
my $field3 = substr(substr("@values0[3] @values0[4]",1),0,-1); #
DTS
#my $field4 = @values7[7]; #
CARTCOOKIEUUID
#my $field5 = @values7[8]; #
ASP.NET_SessionId
my $field6 = @values[5];
User Agent
my $field7 = @values2[0]; #
ResponseCode
my $field8 = @values2[1]; #
bytes
my $field9 = @values1[0]; #
Method
my $field10 = @values1[1]; #
Request
my $field11 = @values1[2]; #
Protocol
my $field12 = @values0[1]; #
ident
my $field13 = @values0[2]; #
authuser
my $field14 = @values[3]; #
cs(Reerer)
# Charge val for date changes
my %mo = (
'Jan'=>'01',
'Feb'=>'02',
'Mar'=>'03',
'Apr'=>'04',
'May'=>'05',
'Jun'=>'06',
'Jul'=>'07',
'Aug'=>'08',
'Sep'=>'09',
'Oct'=>'10',
'Nov'=>'11',
'Dec'=>'12'
);
# Formats the log date/time from apache to ISOFormat
my $tmptime = substr($field3,7,4) . '-' . $mo{substr($field3,3,3)}
. '-' . substr($field3,0,2) . 'T' . substr($field3,12);
# Format the Date for creating logstash index
my $mylogdts = substr($field3,7,4) . '.' . $mo{substr($field3,3,3)}
. '.' . substr($field3,0,2);
# Set new log date/time to field
$field3 = substr($tmptime,0,-6);
# Set the logstash index name string
$mylogdts = "al-logstash-$mylogdts";
# Forces a lookup of live nodes
#$results = $es->refresh_servers();
# Uses the elasticseach client to bluk insert data into ElasticSeach
$results = $es->bulk_index(
index => $mylogdts,
type => 'logs',
refresh => 1,
#on_conflict => 'IGNORE',
#on_error => 'IGNORE',
on_error => sub { myError },
docs => [
{
data => {
'@type' => 'logs',
'@message' =>
$data,
'@timestamp' =>
$field3,
'@ip' => $field0,
'@ident' =>
$field12,
'@authuser' =>
$field13,
'@protocol' =>
$field11,
'@method' =>
$field9,
'@request' =>
$field10,
'@cs_referer' =>
$field14,
'@user_agent' =>
$field6,
'@bytes' =>
$field8,
'@response_code' =>
$field7,
},
},
]
);
}
My Error
sub myError {
print "*** ERROR: ***";
print $results;
$results = $es->refresh_servers();
print $results;
}
exit 0;
=== Perl Script End ===
All log files are compressed
Then using zcat * | perl my script.pl (~24 to 28 files)
The scripts then takes of running... inserting ~1000+ documents per sec
base on the 'head' plug-in when the refresh button is clocked... very fast
but then over a period of time ~ less then 30 mins the inserting drops to
~60 to 300 inserts per click of refresh button and the inserts take over
day to 2 days to finish.
Connect to each node: (Based on BigDesk plugin)
Node 1 = ~8 to 10
Node 2..5 = ~2 to 4
The System CPU running wide open... working great... still running query
I have also ran multi zcat and scripts at the same time.
Is there a better way to kept the inserts running as over 1000+ per sec?
Is there a better way to change the script and stop the CPU from running
wide open?
--