How to write Customize grok with multiple json values?

Below are the sample logs. I was able to write a customize grok on it and took 'onDPriceDetail' value in GREEDYDATA. But I want to parse this multiple child data of JSON format with different fields name.

2018-06-08 16:46:36,124 INFO c.t.t.r.s.i.BookingPaxReportServiceImpl.lambda$generateAndSavePNRReport$1 [9be0c71f-938a-444a-b728-537dac09da1d] [127.0.0.1] [cust1] [http-nio-8080-exec-9] BookingPaxReport : [pnrNo=MTOF1U, pnrBkdDateTime=2018-06-08T11:15:49.399, pnrChannel=WEB, pnrOndOrg=LOS, pnrReqFName=Harsh, pnrReqLName=Sharma, pnrReqEmailId=harsh@yahoo.com, onDPriceDetail=[{category=ANCILLARY, subCategory=Seat, template=CRJ300 - A-ECONOMY-Y-SSS, itemcode=1A, cost=0.0, source=BOOKING}, {category=ANCILLARY, subCategory=SSR, template=ST-SPECIAL SERVICES-123, itemcode=FG01, cost=34.0, source=BOOKING}, {category=FARE, subCategory=BASEPRICE, template=-, itemcode=-, cost=120.0, source=BOOKING}]]

Now I seriously need your help how could I handle this log format so that I can play on Kibana with this Multiple Json data too.
Thanks In Advance!

I would start with something like this

    mutate {
        gsub => [
            "message", "\[{", "",
            "message", "}]", "",
            "message", "}, {", "|"
        ]
    }
    mutate { split => { "message" => "|" } }
    kv { source => "[message][0]" target => "[foo][0]" field_split => "," }
    kv { source => "[message][1]" target => "[foo][1]" field_split => "," }
    kv { source => "[message][2]" target => "[foo][2]" field_split => "," }

Hello Badger,

Thanks for your help. There is a situation, in this log format we have only 3 json string. But in my real time logs it is not static. sometimes I'll get 2 JSON, 5 JSON or 8 JSON i mean it can be n numbers of JSON string in ** onDPriceDetail** value.
How can I handle this dynamic nature?

Thanks

OK, well it turns out kv handles an array on input, so you can do

kv { source => "[message]" target => "[foo]" field_split => "," }

If you then want to have the arrays inverted, you could do that in ruby using something like this.

Hi Badger,

I tried to implement it but getting error. Below is my conf file. Could you please help if I'm doing anything wrong?

input {
file {
path => "/data/log/newadmin.log"
type => "log"
}
}

filter {

grok {
match => { "message" => "%{DATESTAMP:date1}\ %{DATA:LogLevel}\ %{DATA:msg1}\ %{DATA:requestTrackID}\ %{IP:sourceIP}\ %{DATA:tenant}\ %{DATA:thread}\ pnrNo=%{DATA:pnrNo}, pnrBkdDateTime=%{TIMESTAMP_ISO8601:pnrBkdDateTime}, pnrChannel=%{DATA:pnrChannel}, pnrOndOrg=%{DATA:pnrOndOrg}, pnrOndDest=%{DATA:pnrOndDest}, pnrAgent=%{DATA:pnrAgent}, pnrStaff=%{DATA:pnrStaff}, pnrRegion=%{DATA:pnrRegion}, pnrCountry=%{DATA:pnrCountry}, pnrTerritory=%{DATA:pnrTerritory}, pnrCity=%{DATA:pnrCity}, pnrFareBasis=%{DATA:pnrFareBasis}, pnrBaseCurrency=%{DATA:pnrBaseCurrency}, pnrPaxCount=%{NUMBER:pnrPaxCount}, pnrExchangeRate=%{BASE10NUM:pnrExchangeRate}, pnrMasterAgent=%{DATA:pnrMasterAgent}, pnrSelectedCurrency=%{DATA:pnrSelectedCurrency}, pnrPaymentStatus=%{DATA:pnrPaymentStatus}, pnrCurrentStatus=%{DATA:pnrCurrentStatus}, paxTitle=%{DATA:paxTitle}, paxName=%{DATA:paxName}, paxEmailId=%{DATA:paxEmailId}, paxContactNo=%{NUMBER:paxContactNo}, paxDOB=%{DATA:paxDOB}, paxPassportNo=%{DATA:paxPassportNo}, paxLtvId=%{DATA:paxLtvId}, paxTicketNo=%{DATA:paxTicketNo}, paxSeatNo=%{DATA:paxSeatNo}, paxType=%{DATA:paxType}, paxAddress=%{DATA:paxAddress}, paxPassportValidity=%{DATA:paxPassportValidity}, paxNationality=%{DATA:paxNationality}, paxCategory=%{DATA:paxCategory}, paxIpAddress=%{DATA:paxIpAddress}, sectorAircraftType=%{DATA:sectorAircraftType}, sectorFltNo=%{DATA:sectorFltNo}, sectorDepDateTime=%{DATA:sectorDepDateTime}, sectorArrivalDateTime=%{TIMESTAMP_ISO8601:sectorArrivalDateTime}, sectorOrg=%{DATA:sectorOrg}, sectorDest=%{DATA:sectorDest}, sectorCabinClass=%{DATA:sectorCabinClass}, sectorLogicalClass=%{DATA:sectorLogicalClass}, sectorRbd=%{DATA:sectorRbd}, sectorFltStop=%{DATA:sectorFltStop}, sectorFltStopStn=%{DATA:sectorFltStopStn}, sectorSeqNo=%{NUMBER:sectorSeqNo}, sectorBookingStatus=%{DATA:sectorBookingStatus}, sectorDcsStatus=%{DATA:sectorDcsStatus}, sectorInstanceId=%{DATA:sectorInstanceId}, sectorDistance=%{NUMBER:sectorDistance}, pricingTotBasefare=%{BASE10NUM:pricingTotBasefare}, pricingTotSurcharge=%{BASE10NUM:pricingTotSurcharge}, pricingTotTaxes=%{BASE10NUM:pricingTotTaxes}, pricingTotFees=%{BASE10NUM:pricingTotFees}, pricingTotAncillary=%{BASE10NUM:pricingTotAncillary}, pricingTotDiscount=%{BASE10NUM:pricingTotDiscount}, pricingNetCanCharge=%{BASE10NUM:pricingNetCanCharge}, pricingTotModCharge=%{BASE10NUM:pricingTotModCharge}, pricingRefundAmount=%{BASE10NUM:pricingRefundAmount}, pricingExtraServiceFee=%{BASE10NUM:pricingExtraServiceFee}, pricingNetAmount=%{BASE10NUM:pricingNetAmount}, pricingTotMeal=%{BASE10NUM:pricingTotMeal}, pricingTotBaggage=%{BASE10NUM:pricingTotBaggage}, pricingTotSeat=%{BASE10NUM:pricingTotSeat}, pricingTotService=%{BASE10NUM:pricingTotService}, pricingTotVendors=%{BASE10NUM:pricingTotVendors}, pricingTotAdditionalFees=%{BASE10NUM:pricingTotAdditionalFees}, pricingTotGenDiscount=%{BASE10NUM:pricingTotGenDiscount}, pricingTotLtvDiscount=%{BASE10NUM:pricingTotLtvDiscount}, pricinngTotAdditionalFees=%{BASE10NUM:pricinngTotAdditionalFees}, pricingTotDcsMeals=%{BASE10NUM:pricingTotDcsMeals}, pricingTotDcsSeat=%{BASE10NUM:pricingTotDcsSeat}, pricingTotDcsBaggage=%{BASE10NUM:pricingTotDcsBaggage}, pricingTotDcsServices=%{BASE10NUM:pricingTotDcsServices}, dcsPaymentStatus=%{DATA:dcsPaymentStatus}, onDPriceDetails=%{GREEDYDATA:onDPriceDetails}" }

add_tag => ["pnr"]

    }

mutate {
kv { source => "[onDPriceDetails]" target => "[child]" field_split => "," }
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "adminnew-log-%{+YYYY.MM.dd}"
}
}

What error are you getting?

It looks like you have a few fields followed by a long key-value list. Why do you not parse out the entire key-value list into a single field and then apply the kv filter to it? I suspect it would be a lot neater and also be able to handle the case when fields in the list are missing or change order.

It gives configuration error which is as below. But when I remove mutate field it works fine.

#tail -f /var/log/logstash/logstash-plain.log

[2018-06-17T03:09:57,891][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 23, column 5 (byte 3669) after filter {\n\n grok {\n match => { "message" => "%{DATESTAMP:date1}\ %{DATA:LogLevel}\ %{DATA:msg1}\ %{DATA:requestTrackID}\ %{IP:sourceIP}\ %{DATA:tenant}\ %{DATA:thread}\ pnrNo\=%{DATA:pnrNo}\, pnrBkdDateTime\=%{TIMESTAMP_ISO8601:pnrBkdDateTime}\, pnrChannel\=%{DATA:pnrChannel}\, pnrOndOrg\=%{DATA:pnrOndOrg}\, pnrOndDest\=%{DATA:pnrOndDest}\, pnrAgent\=%{DATA:pnrAgent}\, pnrStaff\=%{DATA:pnrStaff}\, pnrRegion\=%{DATA:pnrRegion}\, pnrCountry\=%{DATA:pnrCountry}\, pnrTerritory\=%{DATA:pnrTerritory}\, pnrCity\=%{DATA:pnrCity}\, pnrFareBasis\=%{DATA:pnrFareBasis}\, pnrBaseCurrency\=%{DATA:pnrBaseCurrency}\, pnrPaxCount\=%{NUMBER:pnrPaxCount}\, pnrExchangeRate\=%{BASE10NUM:pnrExchangeRate}\, pnrMasterAgent\=%{DATA:pnrMasterAgent}\, pnrSelectedCurrency\=%{DATA:pnrSelectedCurrency}\, pnrPaymentStatus\=%{DATA:pnrPaymentStatus}\, pnrCurrentStatus\=%{DATA:pnrCurrentStatus}\, paxTitle\=%{DATA:paxTitle}\, paxName\=%{DATA:paxName}\, paxEmailId\=%{DATA:paxEmailId}\, paxContactNo\=%{NUMBER:paxContactNo}\, paxDOB\=%{DATA:paxDOB}\, paxPassportNo\=%{DATA:paxPassportNo}\, paxLtvId\=%{DATA:paxLtvId}\, paxTicketNo\=%{DATA:paxTicketNo}\, paxSeatNo\=%{DATA:paxSeatNo}\, paxType\=%{DATA:paxType}\, paxAddress\=%{DATA:paxAddress}\, paxPassportValidity\=%{DATA:paxPassportValidity}\, paxNationality\=%{DATA:paxNationality}\, paxCategory\=%{DATA:paxCategory}\, paxIpAddress\=%{DATA:paxIpAddress}\, sectorAircraftType\=%{DATA:sectorAircraftType}\, sectorFltNo\=%{DATA:sectorFltNo}\, sectorDepDateTime\=%{DATA:sectorDepDateTime}\, sectorArrivalDateTime\=%{TIMESTAMP_ISO8601:sectorArrivalDateTime}\, sectorOrg\=%{DATA:sectorOrg}\, sectorDest\=%{DATA:sectorDest}\, sectorCabinClass\=%{DATA:sectorCabinClass}\, sectorLogicalClass\=%{DATA:sectorLogicalClass}\, sectorRbd\=%{DATA:sectorRbd}\, sectorFltStop\=%{DATA:sectorFltStop}\, sectorFltStopStn\=%{DATA:sectorFltStopStn}\, sectorSeqNo\=%{NUMBER:sectorSeqNo}\, sectorBookingStatus\=%{DATA:sectorBookingStatus}\, sectorDcsStatus\=%{DATA:sectorDcsStatus}\, sectorInstanceId\=%{DATA:sectorInstanceId}\, sectorDistance\=%{NUMBER:sectorDistance}\, pricingTotBasefare\=%{BASE10NUM:pricingTotBasefare}\, pricingTotSurcharge\=%{BASE10NUM:pricingTotSurcharge}\, pricingTotTaxes\=%{BASE10NUM:pricingTotTaxes}\, pricingTotFees\=%{BASE10NUM:pricingTotFees}\, pricingTotAncillary\=%{BASE10NUM:pricingTotAncillary}\, pricingTotDiscount\=%{BASE10NUM:pricingTotDiscount}\, pricingNetCanCharge\=%{BASE10NUM:pricingNetCanCharge}\, pricingTotModCharge\=%{BASE10NUM:pricingTotModCharge}\, pricingRefundAmount\=%{BASE10NUM:pricingRefundAmount}\, pricingExtraServiceFee\=%{BASE10NUM:pricingExtraServiceFee}\, pricingNetAmount\=%{BASE10NUM:pricingNetAmount}\, pricingTotMeal\=%{BASE10NUM:pricingTotMeal}\, pricingTotBaggage\=%{BASE10NUM:pricingTotBaggage}\, pricingTotSeat\=%{BASE10NUM:pricingTotSeat}\, pricingTotService\=%{BASE10NUM:pricingTotService}\, pricingTotVendors\=%{BASE10NUM:pricingTotVendors}\, pricingTotAdditionalFees\=%{BASE10NUM:pricingTotAdditionalFees}\, pricingTotGenDiscount\=%{BASE10NUM:pricingTotGenDiscount}\, pricingTotLtvDiscount\=%{BASE10NUM:pricingTotLtvDiscount}\, pricinngTotAdditionalFees\=%{BASE10NUM:pricinngTotAdditionalFees}\, pricingTotDcsMeals\=%{BASE10NUM:pricingTotDcsMeals}\, pricingTotDcsSeat\=%{BASE10NUM:pricingTotDcsSeat}\, pricingTotDcsBaggage\=%{BASE10NUM:pricingTotDcsBaggage}\, pricingTotDcsServices\=%{BASE10NUM:pricingTotDcsServices}\, dcsPaymentStatus\=%{DATA:dcsPaymentStatus}\, onDPriceDetails\=%{GREEDYDATA:onDPriceDetails}" }\n\nadd_tag => ["pnr"]\n\n }\n\tmutate {\n\tkv ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:169:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:315:in `block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:312:in `block in converge_state'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:299:in `converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:348:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

I’ve is a separate filter and can not be within a mutate filter. If you can show what a raw event looks like, we might be able to help.

Hi,

Yes please below is the sample log -

2018-05-03 18:26:12,981 INFO c.t.t.r.s.i.BookingPaxReportServiceImpl.lambda$generateAndSavePNRReport$1 313db314-4788-4e64-bf09-b483ef0e8243 127.0.0.1 cust1.local http-nio-8080-exec-3 pnrNo=EGFYZJ, pnrBkdDateTime=2018-05-03T12:55:57.391, pnrChannel=WEB, pnrOndOrg=DXB, pnrOndDest=MCT, pnrAgent=, pnrStaff=superuser, pnrRegion=null, pnrCountry=, pnrTerritory=null, pnrCity=, pnrFareBasis=DFLT, pnrBaseCurrency=OMR, pnrPaxCount=2, pnrExchangeRate=1.0, pnrMasterAgent=, pnrSelectedCurrency=OMR, pnrPaymentStatus=INITIATED, pnrCurrentStatus=INITIATED, paxTitle=Mr, paxName=aaaaa aaaa, paxEmailId=aaaaa@aaaa.aaa, paxContactNo=24234234234, paxDOB=2018-05-03, paxPassportNo=null, paxLtvId=null, paxTicketNo=OV-876-9310-163213, paxSeatNo=, paxType=ADULT, paxAddress=, paxPassportValidity=null, paxNationality=null, paxCategory=null, paxIpAddress=null, sectorAircraftType=A320-TYPE-A, sectorFltNo=OV502, sectorDepDateTime=2018-05-07T04:30, sectorArrivalDateTime=2018-05-07T05:40, sectorOrg=DXB, sectorDest=MCT, sectorCabinClass=ECONOMY, sectorLogicalClass=Y, sectorRbd=YS1, sectorFltStop=0, sectorFltStopStn=, sectorSeqNo=1, sectorBookingStatus=INITIATED, sectorDcsStatus=INITIATED, sectorInstanceId=2c9f98ac62cd32940162cdec6540042e, sectorDistance=349, pricingTotBasefare=74.0, pricingTotSurcharge=1.0, pricingTotTaxes=0.0, pricingTotFees=2.0, pricingTotAncillary=0.0, pricingTotDiscount=0.0, pricingNetCanCharge=0.0, pricingTotModCharge=0.0, pricingRefundAmount=0.0, pricingExtraServiceFee=0.0, pricingNetAmount=77.0, pricingTotMeal=0.0, pricingTotBaggage=0.0, pricingTotSeat=0.0, pricingTotService=0.0, pricingTotVendors=0.0, pricingTotAdditionalFees=0.0, pricingTotGenDiscount=0.0, pricingTotLtvDiscount=0.0, pricinngTotAdditionalFees=0.0, pricingTotDcsMeals=0.0, pricingTotDcsSeat=0.0, pricingTotDcsBaggage=0.0, pricingTotDcsServices=0.0, dcsPaymentStatus=null, onDPriceDetails={category=FARE, subCategory=BASEPRICE, template=-, itemcode=-, cost=74.0, source=BOOKING}, {category=FARE, subCategory=SURCHARGE, template=-, itemcode=-, cost=1.0, source=BOOKING}, {category=TAXESFEES, subCategory=FEES, template=-, itemcode=-, cost=2.0, source=BOOKING}

Most of the key-value list can be parsed easily using the kv filter with field_split_pattern set to ", ", but the list of objects contained in the onDPriceDetails is causing some problems. I am not sure how to best handle that field.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.