Send fortiweb syslog to elasticsearch

hi all,
we have a fortieweb 2000 s device and we want to send its log to elaticearch.
in the first step, we wrote a logstash config file which is listening on udp with type "json", meanwhile the fortiweb log type is configured on json format. logstash config is as foolowing:


input {
  udp {
    port => 12345
	type => "json"

    }

  }

filter {

		   dissect {
    mapping => {
        "message" => "<%{}>%{message}"
    }
}
      json {
	  source => "message" 
	  }

}
output {

  elasticsearch {
    hosts => ["https://localhost:9200"]
    ssl_enabled => true
    ssl_certificate_authorities => "...\elasticsearch-ca.pem"
    ssl_verification_mode => none
	user => "***"
    password => "****"
        index => "log_%{+yyyy.MM.dd}"

        }

stdout { 
codec => rubydebug
 }
  }

when we run logstash , it gathered the logs perfectly and the json filter can parse all key-values, but after a while it seems the input is not json anymore and it seems it has been damaged. so the json filter cannot parse it, how can i handle this issue.
what is best way to parse fortiweb syslog?

any advice will be so appreciated

Can you show us wrong event/messages? Maybe the device send additional characters or missing somewhere <>. Please provide an error, should be with the tag: _dissectfailure or _jsonparsefailure

thanks for your reply @Rios

event.original in documents are as following:

valid one:

event.original:

<189>{ "date": "2024-07-17", "time": "16:38:58", "log_id": "30001000", "msg_id": "44", "device_id": "adda", "eventtime": "1721221738449039646", "vd": "cz", "timezone": "(GMT)", "timezone_dayst": "GMT", "type": "s1", "subtype": "https", "sdf": "notice", "proto": "tcp", "service": "https", "status": "success", "reason": "none", "policy": "sdfdf", "dev_id": "sdffs", "cipher_suite": "none", "x509_cert_subject": "none" }

invalid one:

event.original:

<189>{ "date": "2024-07-17", "time": "16:38:58", "log_id": "30001000", "msg_id": "44", "device_id": "adda", "eventtime": "1721221738449039646", "vd": "cz", "timezone": "(GMT)", "timezone_dayst": "GMT", "type": "s1", "subtype": "https", "sdf": "notice", "proto": "tcp", "service": "https", "status": "success", "reason": "none", "policy": "sdfdf", "dev_id": "sdffs", "cipher_suite": "none", "x509_cert_subject": "none" }ozilla/5.0 (Windows ) Chrome", "http_session_id": "erw", "msg": "Parameter(refUrl) triggered signature ID 1221 of Signatures policy mail", "signature_subclass": "wererer", "signature_id": "65465
"client_level": "erewr", "x509_cert_subject": "none", "owasp_api_top10": "N/A", "match_location": "Parameter()" }
one", "owasp_api_top10": "N/A", "match_location": "Parameter()" }
}
cious", "x509_cert_subject": "none", "owasp_api_top10": "N/A", "match_location": "Parameter()" }
, "bot_info": "none"}
,"SignerCi)" }
12)" }
autoUnmask: !0 }), $("[id$=" + e + "]").inputmask({ mask: r, autoUnmask: !0 })) } function StringRefinement(e) { return "undefined" =

I am not sure where data is being corrupted. it seems some additional characters are appended to the json

You can handle wrong messages. Always read until the first mark "}" , if corrupted data exists, keep in an addition field or just drop it in a simplified version.

filter {

  dissect {
    mapping => {
    "message" => "<%{}>%{message}}%{errorpart}"
    }
  }
  mutate {
    update => { "message" => "%{message}}" }
  }
   json {
     source => "message" 
   }

  if [errorpart] == "" {
     mutate {  remove_field => [ "errorpart"] }
  }
}

Simplified version

filter {

  dissect {
    mapping => {
    "message" => "<%{}>%{message}}%{}"
    }
  }
  mutate {
    update => { "message" => "%{message}}" }
  }
   json {
     source => "message" 
   }

}