Logstash if else

I am pushing some example lines to check my if else. But Its not working as expected. Here is my configuaration

input {
	stdin{}
}

filter {
	if "cdrs" in [tags] {
		json {
			source => "message"
		}
		csv {
			separator => "|"
				columns => ["ActualDateTime","RecordType","SMSCAddress","ShortCode","SystemID","FailureReason","Priority","DataCodingScheme","MessageSize","UDHIndicator","SegmentNumber","MessageClass","MessageValidityPeriod","DifferedDelivery","DeliveryReportRequestFlag","ReadReplyRequested","TransactionType","PLMN","ChargedParty","AmountCharged","InternalTransactionID","ExternalTransactionID","RespTransactionID","MessageSubmissionTime","MessageDeliveryTime","CallingParty","CalledParty","FromAddress","OrigInterface","DestInterface","OrigPrepaidFlag","DestPrepaidFlag","INIndicator","TransactionStatus","Keyword","HTTPPUSHURL","HTTPPULLURL","MsgType","DNDCategoryId","ServiceCenterTimeStamp","OriginationIMSI","OrigVisitedMSCId","ServiceCenterAddress","DischargeTime","Connection-ID","Orginator","TotalSegments","ActualSenderAddress","ActualDestAddress","SourceTON","Source NPI","DestTON","Dest NPI","DNDCategoryEnabled","SourceModified","DestModified","TextMatched","TextReplaced","ActionTakenOnMessage","AppliedPolicyInfo","ErrorCode","VirusScanResult","SpamFloodFeaturesAppliedAndActionsTaken","DndType","EsmeShortCode","EsmeSystemId","VipNumber"]
				autogenerate_column_names => true
		}
		prune {
			whitelist_names => ["@timestamp","^ActualDateTime$","^RecordType$","^FailureReason$","^Priority$","^MessageSubmissionTime$","^MessageDeliveryTime$","^tags$"]
		}
	}
	if "logs" in [tags] {
		json {
			source => "message"
		}
		grok {
			named_captures_only => true
				patterns_dir => ["./patterns"]
				match => {
					"message" => "%{WORD:debug}:%{LOGDATE:date}:%{LOGFILENAME:process}:%{WORD}:%{GREEDYDATA:content}"
				}
		}
		prune {
			whitelist_names => ["@timestamp","^debug$","^date$","^process$","^content$","^tags$","^MessageDeliveryTime$"]
		}
	}
}

output {
	if "cdrs" in [tags]
	{
		elasticsearch
		{
			hosts => ["172.16.23.14:9200"]
				index=>'logstash_testing'
		}
	}
	if "logs" in [tags] {
		elasticsearch
		{
			hosts => ["172.16.23.14:9200"]
				index=>'log_analysis'
		}
	}
	stdout {codec=>rubydebug}
}

My input:

{"@timestamp":"2017-03-30T11:32:36.328Z","@version":"1","message":"{"@timestamp":"2017-03-30T11:32:34.738Z","beat":{"hostname":"gems.smsc.net","name":"gems.smsc.net","version":"5.3.0"},"input_type":"log","message":"Thu Mar 30 17:02:34 2017|DeliveryReceipt||||Sequence Number is missing|least|||||||no|N|N|Res||||567177888|||Thu Mar 30 17:02:34.460 2017|Thu Mar 30 17:02:34.460 2017||||SMPP-3.4|SMPP-3.4|N|N||FAIL|||||||||||00|SMSC|||||||||||||||1290|0||O|||||||||03||||||0","offset":5028812,"source":"/home/smsgw/SMSCGW-2.0.2.0/cdrs/SMSGW_CDR_POSTPAID_SMSGW_CDR_172.31.2.236_20170330170000.log","tags":["cdrs"],"type":"log"}"}
{"@timestamp":"2017-03-30T11:32:36.328Z","@version":"1","message":"{"@timestamp":"2017-03-30T11:32:34.738Z","beat":{"hostname":"gems.smsc.net","name":"gems.smsc.net","version":"5.3.0"},"input_type":"log","message":"DBG5:28-Mar-2017 12:11:49.548778:IOMultiplexer.C:26:In IOMultiplexer default constructor","offset":5027312,"source":"/home/smsgw/SMSCGW-2.0.2.0/logs/SMSC_debug.log","tags":["logs"],"type":"log"}"}

Expected output:

{
          "ActualDateTime": "Thu Mar 30 17:08:36 2017",
          "OrigInterface": "HTTP",
          "Priority": "least",
          "tags": [
            "cdrs"
          ],
          "FailureReason": "Message Delivered",
          "MessageSubmissionTime": "Thu Mar 30 17:08:35.985 2017",
          "MessageDeliveryTime": "Thu Mar 30 17:08:36.034 2017",
          "RecordType": "SMPPSubmitSM",
          "DestInterface": "SMPP-3.4",
          "@timestamp": "2017-03-30T11:38:36.181Z"
        }

Extual output:

{
    "@timestamp" => 2017-03-30T11:51:57.464Z,
      "@version" => "1",
          "host" => "0.0.0.0",
       "message" => "{\"@timestamp\":\"2017-03-30T11:32:36.328Z\",\"@version\":\"1\",\"message\":\"{\"@timestamp\":\"2017-03-30T11:32:34.738Z\",\"beat\":{\"hostname\":\"gems.smsc.net\",\"name\":\"gems.smsc.net\",\"version\":\"5.3.0\"},\"input_type\":\"log\",\"message\":\"Thu Mar 30 17:02:34 2017|DeliveryReceipt||||Sequence Number is missing|least|||||||no|N|N|Res||||567177888|||Thu Mar 30 17:02:34.460 2017|Thu Mar 30 17:02:34.460 2017||||SMPP-3.4|SMPP-3.4|N|N||FAIL|||||||||||00|SMSC|||||||||||||||1290|0||O|||||||||03||||||0\",\"offset\":5028812,\"source\":\"/home/smsgw/SMSCGW-2.0.2.0/cdrs/SMSGW_CDR_POSTPAID_SMSGW_CDR_172.31.2.236_20170330170000.log\",\"tags\":[\"cdrs\"],\"type\":\"log\"}\"}"
}

When if "cdrs" in [tags] { is evaluated the event doesn't have any tags. The tags are extracted by the json filter which is inside the conditional.

Thanks for the reply. Now I understand that the error is in the condition. But How can I change that now

Move the json filter outside the conditional or use a json_lines codec for your stdin input?

I tried exactly the same before giving you the reply. But its showing

 "tags" => [
        [0] "_jsonparsefailure"
    ]

I changed My filter part like this

filter {
	json {
		source => "message"
	}
	if "cdrs" in [tags] {
		csv {
			separator => "|"
				columns => ["ActualDateTime","RecordType","SMSCAddress","ShortCode","SystemID","FailureReason","Priority","DataCodingScheme","MessageSize","UDHIndicator","SegmentNumber","MessageClass","MessageValidityPeriod","DifferedDelivery","DeliveryReportRequestFlag","ReadReplyRequested","TransactionType","PLMN","ChargedParty","AmountCharged","InternalTransactionID","ExternalTransactionID","RespTransactionID","MessageSubmissionTime","MessageDeliveryTime","CallingParty","CalledParty","FromAddress","OrigInterface","DestInterface","OrigPrepaidFlag","DestPrepaidFlag","INIndicator","TransactionStatus","Keyword","HTTPPUSHURL","HTTPPULLURL","MsgType","DNDCategoryId","ServiceCenterTimeStamp","OriginationIMSI","OrigVisitedMSCId","ServiceCenterAddress","DischargeTime","Connection-ID","Orginator","TotalSegments","ActualSenderAddress","ActualDestAddress","SourceTON","Source NPI","DestTON","Dest NPI","DNDCategoryEnabled","SourceModified","DestModified","TextMatched","TextReplaced","ActionTakenOnMessage","AppliedPolicyInfo","ErrorCode","VirusScanResult","SpamFloodFeaturesAppliedAndActionsTaken","DndType","EsmeShortCode","EsmeSystemId","VipNumber"]
				autogenerate_column_names => true
		}
		prune {
			whitelist_names => ["@timestamp","^ActualDateTime$","^RecordType$","^FailureReason$","^Priority$","^MessageSubmissionTime$","^MessageDeliveryTime$","^tags$"]
		}
	}
	if "logs" in [tags] {
		
		grok {
			named_captures_only => true
				patterns_dir => ["./patterns"]
				match => {
					"message" => "%{WORD:debug}:%{LOGDATE:date}:%{LOGFILENAME:process}:%{WORD}:%{GREEDYDATA:content}"
				}
		}
		prune {
			whitelist_names => ["@timestamp","^debug$","^date$","^process$","^content$","^tags$","^MessageDeliveryTime$"]
		}
	}
}

Well, looking more closely it's evident that the input isn't valid JSON:

{"@timestamp":"2017-03-30T11:32:36.328Z","@version":"1","message":"{"@timestamp":"2017-03-30T11:32:34.738Z","beat":{"hostname":"gems.smsc.net","name":"gems.smsc.net","version":"5.3.0"},"input_type":"log","message":"Thu Mar 30 17:02:34 2017|DeliveryReceipt||||Sequence Number is missing|least|||||||no|N|N|Res||||567177888|||Thu Mar 30 17:02:34.460 2017|Thu Mar 30 17:02:34.460 2017||||SMPP-3.4|SMPP-3.4|N|N||FAIL|||||||||||00|SMSC|||||||||||||||1290|0||O|||||||||03||||||0","offset":5028812,"source":"/home/smsgw/SMSCGW-2.0.2.0/cdrs/SMSGW_CDR_POSTPAID_SMSGW_CDR_172.31.2.236_20170330170000.log","tags":["cdrs"],"type":"log"}"}

It's "message":"{"@timestamp":" that's problematic.

This is my actual message. Filebeat pushing it into kafka and I am reading from kafka. This is the output I am getting when i read

{   "@timestamp" => 2017-03-30T12:47:56.617Z,
      "@version" => "1",
       "message" => "{\"@timestamp\":\"2017-03-30T11:32:53.753Z\",\"beat\":{\"hostname\":\"gems.smsc.net\",\"name\":\"gems.smsc.net\",\"version\":\"5.3.0\"},\"input_type\":\"log\",\"message\":\"Thu Mar 30 17:02:53 2017|SMPPSubmitSM|172.31.2.236|54389|smsgwh1|Message Delivered|least|SMSC Default Alphabet|76|||||no|N|N|Res||||400223201|400223201|2463510000746680|Thu Mar 30 17:02:52.967 2017|Thu Mar 30 17:02:53.005 2017|Telenor|959772752790|Telenor|HTTP|SMPP-3.4|N|N||SUCCESS|||||||||||00|HTTP||||5|1|1|1||||||||0|0||O|||||Thu Mar 30 17:02:53.001 2017|Thu Mar 30 17:02:53.005 2017|||03|TeleDNA|||||0\",\"offset\":5806461,\"source\":\"/home/smsgw/SMSCGW-2.0.2.0/cdrs/SMSGW_CDR_POSTPAID_SMSGW_CDR_172.31.2.236_20170330170000.log\",\"tags\":[\"cdrs\"],\"type\":\"log\"}"
}

I believe This is a valid json.

Thanks resolved by changing JSON

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.