I am building a logstash runner that reads input from kafka topic converts the message in specific json format that a webhook accepts using filter.conf and uses logstash http output plugin to send the event to webhook url. I am having trouble fixing the json format and post it as event to http webhook. Any help is appreciated
Input.conf (using local file instead of kafka topic)
file {
path => "/var/log/vicuu_test.log"
start_position => beginning
codec => "json"
}
}
Sample input file
{"@timestamp":"2020-05-26T11:45:02.217Z","beat":{"hostname”:”xx”,”name”:”xx”,”version":"1.2.1"},"input_type":"log","log_dc”:”Dallas”,”log_dc_type":"rg","message":"1347\t1561593662400\t{\"deviceId\":null,\"eventTypeId\":\"107\",\"eventLevel\":\"error\",\"message\”:\”Hi test 1\”,\”eventArguments\":{\"backupType\":\"testautomatic\",\"backupLocation\":\"testftp server\",\"rootCause\":\"The backup failed. Please check your configuration. [Could not back up to FTP server: No route to host (Host unreachable)]\"}}","offset":123,"source":"/var/lib/xxxx”,”type":"local-dev-vergil"}
{"@timestamp":"2020-05-26T11:43:02.217Z","beat":{"hostname”:”xx”,”name”:”xx”,”version":"1.2.1"},"input_type":"log","log_dc”:”Dallas”,”log_dc_type":"rg","message":"1347\t1561593662400\t{\"deviceId\":null,\"eventTypeId\":\"282\",\"eventLevel\":\"error\",\"message\”:\”Hi test 2\”,\”eventArguments\":{\"backupType\":\"testautomatic\",\"backupLocation\":\"testftp server\",\"rootCause\":\"The backup failed. Please check your configuration. [Could not back up to FTP server: No route to host (Host unreachable)]\"}}","offset":765,"source":"/var/lib/dd”,”type":"local-dev-vergil"}
Filter.conf
filter {
json {
source => "message"
}
if [eventTypeId] == "107" {
mutate {
add_field => {
"situation" => "cosupgrade"
"tip_msg_type" => "create.notice"
}
}
}
else if [eventTypeId] == "184" {
mutate {
add_field => {
"situation" => "cosupgrade"
"tip_msg_type" => "close.notice"
}
}
}
else if [eventTypeId] == "282" {
mutate {
add_field => {
"situation" => "cos500"
"tip_msg_type" => "create.notice"
}
}
}
else if [eventTypeId] == "283" {
mutate {
add_field => {
"situation" => "cos500"
"tip_msg_type" => "close.notice"
}
}
}
# now we need to use ruby new event stream to put all the fields in new event, push to piple and cancel at end
# and the entire new event should be pushed as json message to tip endpoint
mutate {
add_field => {
"[crn][location]" => "%{log_dc}"
"[crn][resource]" => "%{[beat][hostname]}"
"[crn][cname]" => "bluemix"
"[crn][ctype]" => "public"
"[crn][service_name]" => "cloud-object-storage"
"[crn][version]" => "v1"
"version" => "1.0"
"tribe_name" => "Storage"
"short_description" => "%{[beat][hostname]}-%{message}"
"alert_id" => "testalertid"
}
remove_field => [ "log_dc_type", "type", "source", "deviceId", "path", "eventLevel", "@version", "offset", "input_type", "eventTypeId", "tags", "@timestamp", "[eventArguments]", "[beat]", "log_dc" ]
}
ruby {
code => '
newevent = LogStash::Event.new # create a new event
newevent.set("[crn][location]" ,event.get("[crn][location]"))
newevent.set("[crn][resource]" ,event.get("[crn][resource]"))
newevent.set("[crn][cname]" ,event.get("[crn][cname]"))
newevent.set("[crn][ctype]" ,event.get("[crn][ctype]"))
newevent.set("[crn][service_name]" ,event.get("[crn][service_name]"))
newevent.set("[crn][version]",event.get("[crn][version]"))
newevent.set("[crn][scope]","testscope")
newevent.set("[crn][resource_type]","testresroucetype")
newevent.set("customer_impacting", "false")
newevent.set("runbook_url", "urllink")
newevent.set("severity" , 3)
newevent.set("version" ,event.get("version"))
newevent.set("tribe_name" ,event.get("tribe_name"))
newevent.set("short_description" ,event.get("short_description"))
newevent.set("source" ,event.get("source"))
newevent.set("alert_id" ,event.get("alert_id"))
newevent.set("situation" ,event.get("situation"))
newevent.set("tip_msg_type" ,event.get("tip_msg_type"))
newevent.set("source", "Logstash")
new_event_block.call(newevent) # push it onto the queue
'
}
}
Output.conf
output {
http {
url => "https://xxxx/hooks/tip-alert"
http_method => "post"
format => "json"
message => "%{newevent}"
headers => ["Authorization", "<token>"]
}
}
Logstash.log shows
[2020-05-28T20:49:37,163][DEBUG][logstash.pipeline ] output received {"event"=>{"short_description"=>"xx", "tip_msg_type"=>"create.notice", "message"=>"Upgrade done", "version"=>"1.0", "alert_id"=>"testalertid", "host"=>"ossxx", "tribe_name"=>"Storage", "situation"=>"cosupgrade", "crn"=>{"cname"=>"bluemix", "location"=>"Dallas", "ctype"=>"public", "resource"=>"xx", "version"=>"v1", "service_name"=>"cloud"}}}
[2020-05-28T20:49:37,164][DEBUG][logstash.pipeline ] output received {"event"=>{"short_description"=>"xx", "tip_msg_type"=>"create.notice", "message"=>"Upgrade done", "version"=>"1.0", "alert_id"=>"testalertid", "host"=>"ossmlbadc0101a", "tribe_name"=>"Storage", "situation"=>"cos500", "crn"=>{"cname"=>"bluemix", "location"=>"Dallas", "ctype"=>"public", "resource"=>"xx", "version"=>"v1", "service_name"=>"cloud"}}}
[2020-05-28T20:49:37,164][DEBUG][logstash.pipeline ] output received {"event"=>{"severity"=>3, "short_description"=>"xx", "customer_impacting"=>"false", "tip_msg_type"=>"create.notice", "source"=>"Logstash", "version"=>"1.0", "@timestamp"=>2020-05-29T01:49:37.149Z, "runbook_url"=>"urllink", "alert_id"=>"testalertid", "@version"=>"1", "tribe_name"=>"Storage", "crn"=>{"ctype"=>"public", "resource"=>"xx", "service_name"=>"cloud", "scope"=>"testscope", "cname"=>"bluemix", "resource_type"=>"testresroucetype", "location"=>"Dallas", "version"=>"v1"}, "situation"=>"cosupgrade"}}
[2020-05-28T20:49:37,164][DEBUG][logstash.pipeline ] output received {"event"=>{"severity"=>3, "short_description"=>"xx", "customer_impacting"=>"false", "tip_msg_type"=>"create.notice", "source"=>"Logstash", "version"=>"1.0", "@timestamp"=>2020-05-29T01:49:37.162Z, "runbook_url"=>"urllink", "alert_id"=>"testalertid", "@version"=>"1", "tribe_name"=>"Storage", "crn"=>{"ctype"=>"public", "resource"=>"xx", "service_name"=>"cloud-object-storage", "scope"=>"testscope", "cname"=>"bluemix", "resource_type"=>"testresroucetype", "location"=>"mon01", "version"=>"v1"}, "situation"=>"cos500"}}
[2020-05-28T20:49:41,262][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2020-05-28T20:49:46,268][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
I am not sure why there are 4 output received blocks in log file while there are only 2 inputs in the input.log file
Also how do I post the event in the format that webhook accepts. Below is the sample json format that webhook accepts
{
"alert_id": " abc.test",
"crn": {
"version": "v1",
"cname": "staging",
"ctype": "dedicated",
"service_name": "foo.doc",
"location": "eu-gb",
"scope": "p/12345",
"service_instance": "1234-5678-9012-3456",
"resource_type": "object",
"resource": "foo.yml"
},
"customer_impacting": "false",
"runbook_url": "https://test/foo.md",
"severity": 3,
"short_description": "Something bad happened",
"situation": "FOOBAR_TEST_NCO_HEARTBEAT_OVERDUE",
"source": "nagios",
"timestamp": "2017-07-18T20:48:36Z",
"tip_msg_type": "create.notice",
"tribe_name": "FOO_TRIBE",
"version": "1.0"
}```
I don't see any return code logged from webhook so am confused what's wrong. Thanks!