Slow Pipeline after adding output parameter

Hello,

I'm using Logstash to retrieve data from a SQL database to push into Elasticsearch.

I had some good result with around 1000 inputs per second into my cluster.

Next step was to put an id on my documents so that, when my pipeline restarts with the schedule parameter, it wouldn't send duplicates to my Cluster.

Since I added this document_id in the output, my pipeline is only sending 2 entries per second ... (I emptied my cluster before running again) it even stops at some points, around 250 docs.

Here's my conf file, all worked before I add the simple line document_id in output.

input {
jdbc {
jdbc_connection_string => "jdbc:jtds:sqlserver://ORCHESTRATOR1:60113;DatabaseName=ePO_ORCHESTRATOR1;domain=ORCHESTRATOR1;socketKeepAlive=true"
jdbc_user => "Administrator"
jdbc_password => "xxx"
jdbc_validate_connection => true
jdbc_driver_library => "/root/jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
statement => "SELECT * FROM dbo.EPOEvents WHERE DATEPART(yy, receivedutc) >= '2015'"
}
}

filter{

mutate{
convert => ["receivedutc", "string"]
}

date {
match => [ "receivedutc", "yyyy-MM-dd'T'HH:mm:ss.SSSZ" ]
}
ruby{

code =>
"event['o1'] = (event['sourceipv4']+2147483648)/16777216;
event['ip'] = (event['sourceipv4']+2147483648)%16777216;
event['o2'] = event['ip']/65536;
event['ip2'] = event['ip']%65536;
event['o3'] = event['ip2']/216;
event['o4'] = event['ip2']%216;
event['src_ipv4'] = (event['o1']).to_s+'.'+(event['o2']).to_s+'.'+(event['o3']).to_s+'.'+(event['o4']).to_s;
event['o1'] = (event['analyzeripv4']+2147483648)/16777216;
event['ip'] = (event['analyzeripv4']+2147483648)%16777216;
event['o2'] = event['ip']/65536;
event['ip2'] = event['ip']%65536;
event['o3'] = event['ip2']/216;
event['o4'] = event['ip2']%216;
event['analyzer_ipv4'] = (event['o1']).to_s+'.'+(event['o2']).to_s+'.'+(event['o3']).to_s+'.'+(event['o4']).to_s;
event['o1'] = (event['targetipv4']+2147483648)/16777216;
event['ip'] = (event['targetipv4']+2147483648)%16777216;
event['o2'] = event['ip']/65536;
event['ip2'] = event['ip']%65536;
event['o3'] = event['ip2']/216;
event['o4'] = event['ip2']%216;
event['target_ipv4'] = (event['o1']).to_s+'.'+(event['o2']).to_s+'.'+(event['o3']).to_s+'.'+(event['o4']).to_s"

remove_field => ["o1", "o2", "o3", "o4", "ip", "ip2"]

}
mutate {
remove_field => ["autoguid", "agentguid", "sourcehostname", "thetimestamp", "analyzeripv4", "sourceipv4", "targetipv4","analyzeripv6", "sourceipv6", "targetipv6"]
}
}

output {

elasticsearch {
hosts => ["xxx:9200"]
index => "orchestrator-event-%{+YYYY.MM.dd}"
document_id => "autoid"

}
}

Thank you in advance.

Just noticed there are only one document per index which is weird, autoid I set on document_id is unique to every events in the database.

Is the 'autoid' field populated in you database query? In order to get the value of this field used as document id, you need to change the document_id specification to: document_id => "%{autoid}". With the current configuration all records get the string 'autoid' as key, which results in a very large number of updates of the same document.