You can use the json_encode filter to create a JSON string that you store in a field, then use the line codec with the tcp output and set its message format to "Identity_String%{name-of-json-field}".
I did try that and got it working on a field by field basis but i need to encode all the field pairs at the root level in the same way that json_lines does and then prepend the text string.
I'm wondering if short or writing a codec plugin that extends json_lines if we could use the ruby plugin to iterate over all the fields once we have done the enrichment and build the json output like that and then store that in a field and use the line codec as you describe, but that feels a bit hacky. The fields extracted in the initial message will vary a little depending on the source so I can't hard code the output.
I've put the wip config file below to give the question some context esentialy we are using logstash to recieve a message, enrich it then forward it on for further procesing but it needs to have the same header prepended that we strip off in the initial filter to parse the json.
input {
tcp {
port => 10514
}
}
filter {
#Input data is JSON with "XXX xxxx" in front, so strip that off and load json into new var.
grok {
match => { "message" => "XXX xxxx%{GREEDYDATA:dest_json}"}
}
#parse out data into separate fields from JSON
json {
source => "dest_json"
remove_field => "dest_json"
}
#create a new blank field for holding the resolved hostname
mutate {
add_field => {"name_to_resolve"=>"unresolved"}
}
#use ruby to ping the ip in the "src" field
ruby {
#"src" is the only field name that will be resolved at this point,
# if more are needed we can add addtional code
code => "ip=event.get('src'); event.set('name_to_resolve',`ping -n 1 -a #{ip}`)"
}
#parse out ping returned response (will return full DNS)
grok{
match => { name_to_resolve => "Pinging %{HOSTNAME:hostname_resolved}" }
}
#If hostname is not resolved, set to "unresolved"
if [hostname_resolved] == [src] {
mutate {
replace => {"hostname_resolved" => "unresolved" }
}
}
#strip down to just the first part of the resolved name
else {
grok{
match => {hostname_resolved => "%{WORD:netbiosname_resolved}."}
}
}
#remove extra fields that logstash added so that we don't add any extra data to what is sent on
mutate {
remove_field => "name_to_resolve"
remove_field => "@timestamp"
remove_field => "@version"
remove_field => "host"
remove_field => "port"
remove_field => "message"
}
}
output {
stdout{
codec=>rubydebug
}
tcp {
codec => json_lines
port => 1514
host => "127.0.0.1"
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.