Infinite Loop Of Logs

Hi,

I'm using ELK stack, where Filebeat (v5.0.0-alpha3) collects my logs and sends them to Kafka (v8.2.1), Then Logstash (v2.1.1) consumes those logs, parses / filters them and outputting them to Elasticsearch (v2.1.1) (and I use Kibana (v4.3.1) for seeing them).
In general, all works flawless.
But with one specific index, I'm getting a bizarre situation where a bunch of logs are getting "trapped" in an infinite loop and they are being outputted to Elasticsearch on the same timestamp. It's like they are getting stuck.
Another weird thing is that we see that loop but each time logs have different partition numbers.
It seems it happens after we restart Logstash but we are not sure its the trigger for this behaviour. We have Logstash installed on a docker container, so everytime we are upgrading the container version, it’s basically a restart.

What can cause such a situation? Why is it happening?

I'm also adding the filter configuration of this specific index:

mutate {
remove_field => [ "input_type" ]
remove_field => [ "source" ]
}

mutate { gsub => [ "message", '\"', '""' ] }

csv {
columns => [
"timestamp",
"user_host",
"query_time",
"lock_time",
"rows_sent",
"rows_examined",
"db_name",
"last_insert_id",
"insert_id",
"server_id" ,
"query" ,
"thread_id"
]
separator => ","
remove_field => ["message"]
}

mutate {
gsub => [
"query", "\n", " ",
"query", "\t", " ",
"query", "\r", " "
]
}

mutate { convert => [ "rows_sent", "integer" ] }
mutate { convert => [ "rows_examined", "integer" ] }
mutate { convert => [ "last_insert_id", "integer" ] }
mutate { convert => [ "insert_id", "integer" ] }
mutate { convert => [ "server_id", "integer" ] }

grok {
match => { "user_host" => "%{USER:query_user}(?:[[^]]+])?\s+@\s+%{DATA:query_host}?\s+[%{IP:query_ip}?]" }
remove_field => ["user_host"]
}

ruby { code => "event['query_time'] = event['query_time'] ? event['query_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_i} : 0" }

ruby { code => "event['lock_time'] = event['lock_time'] ? event['lock_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_i} : 0" }

date {
match => [ "timestamp", "YYYY-MM-dd HH:mm:ss" ]
target => [ "timestamp" ]
}

grok {
match => { "query" => "(/*%{GREEDYDATA:to_remove}*/){0,}%{WORD:query_type}" }
remove_field => [ "to_remove" ]
}

mutate { uppercase => [ "query_type" ] }

if [query_time] > 0 {
mutate {
update => { "type" => "mysql_slow_queries" }
}
}
else if ([query_user] =~ "^ps_" or [query_user] =~ "^dba_" or [query_user] =~ "^sl_" or [query_user] =~ "^bi_" or [query_user] =~ "^root") {
mutate {
update => { "type" => "mysql_queries_audit" }
}
}
else {
drop {}
}

Thanks

Nitz