Aggregate filter

Hi,

I'm using the latest aggregate plugin filter with the latest logstash version and for some reason I'm having some issues with indexing it to elasticsearch (latest version as well).

this is the logstash config file :

input {
  file {
     path => "/path/to/log/file"
  }
}
filter {
  json{
        source => "message"
  }

  aggregate {
		task_id => "%{transactionId}"
		code => ""
		push_map_as_event_on_timeout => true
        timeout_task_id_field => "transactionId"
		timeout => 60
        timeout_tags => ["aggregate"]

}
if "aggregate" not in [tags] {
		drop{}
	}

}
#output {
#	stdout {
#		codec => rubydebug
#	}
#}
output {
    elasticsearch {
      hosts => ["localhost:9200"]
      manage_template => false
      index => "agregatedindex"
      document_type => "agregatedindextype"
    }
}

the results vary from output to output (this is where the issue is) :

  • when selecting the stdout it goes great and everything looks great (merged/aggregated docs when timeout expires)
  • when the ES is the output - everything is aggregated into one single document instead of one document per transaction id (as I would expect).

As you can see I have no start/end event so I had to base this on timeout.

The log file looks something like this:
{"MedGotMsgFromApi":"","authCBGot":"","accountInfoGot":"","lnpInfoGot":"","blackListCBGot":"","msgSentToJMS_MT":"","SRI_sent":"","SRI_got":"","FSM_sent":"","FSM_got":"170904104510.540","medGotMsg_MO":"","medSentToUser_MO":"","operatorCodeFrom":"","operatorCodeTo":"","userName":"","message":"","medHostName":"smscA","MTtags":"","sending_IP":"","packType":"","packName":"","isExpires":"","isBroker":"","messageID":"","dlr_url":"","schedule":"","medGotDLR_MT":"","medSentDLR_MT":"","medGotAck_MT":"","SMPP_sent":"","SMPP_got":"","medGotFromUser_MO":"","medSentToJMS_MO":"","smscGotAck_MO":"","smscSentAck_MO":"","medSentRespToApi":"","lrn":"","gt":"","reason":"return error code -1","accountID":"","statusCode":"","chargeTo":"","nodeId":"0","cdrType":"smsc","recordType":"MT","callingNumber":"97224874449","callingImsi":"425240204546146","callingMsc":"","billable":"","calledNumber":"972523991529","calledImsi":"425240204546146","calledMsc":"972723800004","calledMscTon":"4","calledMscNpi":"1","msgSubmissionTime":"170904103408.516","clientId":"user@testAccount","gmt1":"2","msgDeliveryTime":"170904104410.470","originatingProtocol":"HTTP","gmt2":"2","campignId":"","channel":"","destinationProtocol":"MAP","terminationCause":"FAILED","transactionId":"19db41f8-bee9-453d-be4a-7b1c0e72f79b","msgLength":"3","concatenated":"FALSE","concatenatedFrom":"1","concatenatedFromTotal":"","sequence":"0","priority":"","deferred":"","numOfAttemp":"4","sequenceNumber":"832","receiptReqFlag":"1"}

with obviously more entries and with transactionIds that sometimes are identical (hence the need to aggregate) :slight_smile:

Can anyone give me some insight on this? Any help is greatly appreciated. Thanks in advance :sunny:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.