Hi Guys,
I noticed that @timestamp field, which is correctly defined by filebeat, is changed automatically by logstash and its value is replaced with a log timestamp value (field name is a_timestamp).
Here is part of logstash debug log:
[2017-07-18T11:55:03,598][DEBUG][logstash.pipeline ] filter received {"event"=>{"**@**timestamp"=>2017-07-18T09:54:53.507Z, "offset"=>498, "@version"=>"1", "input_type"=>"log", "beat"=>{"hostname"=>"centos-ea", "name"=>"filebeat_shipper_kp", "version"=>"5.5.0"}, "host"=>"centos-ea", "source"=>"/home/elastic/ELASTIC_NEW/log_bw/test.log", "message"=>"2017-06-05 19:02:46.779 INFO [bwEngThread:In-Memory Process Worker-4] psg.logger - a_applicationName="PieceProxy", a_processName="piece.PieceProxy", a_jobId="bw0a10ao", a_processInstanceId="bw0a10ao", a_level="Info", a_phase="ProcessStart", a_activityName="SetAndLog", a_timeStamp="2017-06-05T19:02:46.779", a_sessionId="", a_sender="PCS", a_cruid="37d7e225-bbe5-425b-8abc-f4b44a5a1560", a_MachineCode="CFDM7757", a_correlationId="fa10f", a_trackingId="9d3b8", a_message="START=piece.PieceProxy"", "type"=>"log", "tags"=>["beats_input_codec_plain_applied"]}}
[2017-07-18T11:55:03,629][DEBUG][logstash.pipeline ] output received {"event"=>{"a_message"=>"START=piece.PieceProxy", "log"=>"INFO ", "bwthread"=>"[bwEngThread:In-Memory Process Worker-4]", "logger"=>"psg.logger ", "a_correlationId"=>"fa10f", "source"=>"/home/elastic/ELASTIC_NEW/log_bw/test.log", "a_trackingId"=>"9d3b8", "type"=>"log", "a_sessionId"=>"""", "a_sender"=>"PCS", "@version"=>"1", "beat"=>{"hostname"=>"centos-ea", "name"=>"filebeat_shipper_kp", "version"=>"5.5.0"}, "host"=>"centos-ea", "a_level"=>"Info", "a_processName"=>"piece.PieceProxy", "a_cruid"=>"37d7e225-bbe5-425b-8abc-f4b44a5a1560", "a_activityName"=>"SetAndLog", "offset"=>498, "a_MachineCode"=>"CFDM7757", "input_type"=>"log", "message"=>"2017-06-05 19:02:46.779 INFO [bwEngThread:In-Memory Process Worker-4] psg.logger - a_applicationName="PieceProxy", a_processName="piece.PieceProxy", a_jobId="bw0a10ao", a_processInstanceId="bw0a10ao", a_level="Info", a_phase="ProcessStart", a_activityName="SetAndLog", a_timeStamp="2017-06-05T19:02:46.779", a_sessionId="", a_sender="PCS", a_cruid="37d7e225-bbe5-425b-8abc-f4b44a5a1560", a_MachineCode="CFDM7757", a_correlationId="fa10f", a_trackingId="9d3b8", a_message="START=piece.PieceProxy"", "a_phase"=>"ProcessStart", "tags"=>["beats_input_codec_plain_applied", "_dateparsefailure", "kv_ok", "taskStarted"], "a_processInstanceId"=>"bw0a10ao", "@timestamp"=>2017-06-05T17:02:46.779Z, "my_index"=>"bw_logs", "a_timeStamp"=>"2017-06-05T19:02:46.779", "a_jobId"=>"bw0a10ao", "a_applicationName"=>"PieceProxy", "TMS"=>"2017-06-05 19:02:46.779"}}
NB:
- I noticed that this doesn't happen with a simple pipeline (without grok, kv and other plugins I use in my custom pipeline)
 - I changed filebeat's property json.overwrite_keys to TRUE but with no success.
 
Can you explain me why and what happens with @_timestamp changing? I don't expect it to be done automatically (I saw many posts of people asking how to do that) because @timestamp is a system field.. What's wrong with that?
Here is my pipeline:
input {
beats {
port => "5043"
type => json
}
}
filter {   	
#date {
#  match => [ "@timestamp", "ISO8601" ]
#  target => "@timestamp"
#}
if "log_bw" in [source] {
			grok {
				patterns_dir => ["/home/elastic/ELASTIC_NEW/logstash-5.5.0/config/patterns/extrapatterns"]
				match => { "message" => "%{CUSTOM_TMS:TMS}\s*%{CUSTOM_LOGLEVEL:log}\s*%{CUSTOM_THREAD:bwthread}\s*%{CUSTOM_LOGGER:logger}-%{CUSTOM_TEXT:text}" }	
				tag_on_failure => ["no_match"]
			}
			
			if "no_match" not in [tags] {
				
				if "Payload for Request is" in [text] {
					mutate {
						add_field => { "my_index" => "json_request" }
					}										
					
					grok {
						patterns_dir => ["/home/elastic/ELASTIC_NEW/logstash-5.5.0/config/patterns/extrapatterns"]
						match => { "text" => "%{CUSTOM_JSON:json_message}" }
					}
					
					json {
						source => "json_message"
						tag_on_failure => ["errore_parser_json"]
						target => "json_request"
					}
					
					mutate {
						remove_field => [ "json_message", "text" ]
					}
				}
				else {
					
					mutate {
						add_field => { "my_index" => "bw_logs" }
					}
					
					kv {
						source => "text"
						trim_key => "\s"
						field_split => ","
						add_tag => [ "kv_ok" ]
					}
					
					if "kv_ok" not in [tags] {
						drop { }
					}
					
					else {
						
						mutate {
							remove_field => [ "text" ]
						}
						
						if "ProcessStart" in [a_phase] {
							mutate {
								add_tag => [ "taskStarted" ]
							}
						}
						
						if "ProcessEnd" in [a_phase] {
							mutate {
								add_tag => [ "taskTerminated" ]
							}
						}
						
						date {
							match => [ "a_timeStamp", "yyyy'-'MM'-'dd'T'HH:mm:ss.SSS" ]
						}
						
						elapsed {
							start_tag => "taskStarted"
							end_tag => "taskTerminated"
							unique_id_field => "a_cruid"
						}
					}
				}		
			}
}
else {
	
	mutate {
		add_field => { "my_index" => "other_products" } 
	}
}
}
output {
	elasticsearch { 
		index => "%{my_index}"
		hosts => ["localhost:9200"] 
	}
	
	stdout { codec => rubydebug }
	file {
		path => "/tmp/loggata.tx"
		codec => json
	}
}
Thank you very much,
Andrea