Hello,
I'm using Logstash to retrieve data from a SQL database to push into Elasticsearch.
I had some good result with around 1000 inputs per second into my cluster.
Next step was to put an id on my documents so that, when my pipeline restarts with the schedule parameter, it wouldn't send duplicates to my Cluster.
Since I added this document_id in the output, my pipeline is only sending 2 entries per second ... (I emptied my cluster before running again) it even stops at some points, around 250 docs.
Here's my conf file, all worked before I add the simple line document_id in output.
input {
jdbc {
jdbc_connection_string => "jdbc:jtds:sqlserver://ORCHESTRATOR1:60113;DatabaseName=ePO_ORCHESTRATOR1;domain=ORCHESTRATOR1;socketKeepAlive=true"
jdbc_user => "Administrator"
jdbc_password => "xxx"
jdbc_validate_connection => true
jdbc_driver_library => "/root/jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
statement => "SELECT * FROM dbo.EPOEvents WHERE DATEPART(yy, receivedutc) >= '2015'"
}
}
filter{
mutate{
convert => ["receivedutc", "string"]
}
date {
match => [ "receivedutc", "yyyy-MM-dd'T'HH:mm:ss.SSSZ" ]
}
ruby{
code =>
"event['o1'] = (event['sourceipv4']+2147483648)/16777216;
event['ip'] = (event['sourceipv4']+2147483648)%16777216;
event['o2'] = event['ip']/65536;
event['ip2'] = event['ip']%65536;
event['o3'] = event['ip2']/216;
event['o4'] = event['ip2']%216;
event['src_ipv4'] = (event['o1']).to_s+'.'+(event['o2']).to_s+'.'+(event['o3']).to_s+'.'+(event['o4']).to_s;
event['o1'] = (event['analyzeripv4']+2147483648)/16777216;
event['ip'] = (event['analyzeripv4']+2147483648)%16777216;
event['o2'] = event['ip']/65536;
event['ip2'] = event['ip']%65536;
event['o3'] = event['ip2']/216;
event['o4'] = event['ip2']%216;
event['analyzer_ipv4'] = (event['o1']).to_s+'.'+(event['o2']).to_s+'.'+(event['o3']).to_s+'.'+(event['o4']).to_s;
event['o1'] = (event['targetipv4']+2147483648)/16777216;
event['ip'] = (event['targetipv4']+2147483648)%16777216;
event['o2'] = event['ip']/65536;
event['ip2'] = event['ip']%65536;
event['o3'] = event['ip2']/216;
event['o4'] = event['ip2']%216;
event['target_ipv4'] = (event['o1']).to_s+'.'+(event['o2']).to_s+'.'+(event['o3']).to_s+'.'+(event['o4']).to_s"
remove_field => ["o1", "o2", "o3", "o4", "ip", "ip2"]
}
mutate {
remove_field => ["autoguid", "agentguid", "sourcehostname", "thetimestamp", "analyzeripv4", "sourceipv4", "targetipv4","analyzeripv6", "sourceipv6", "targetipv6"]
}
}
output {
elasticsearch {
hosts => ["xxx:9200"]
index => "orchestrator-event-%{+YYYY.MM.dd}"
document_id => "autoid"
}
}
Thank you in advance.