Prepend a text string to json output over tcp


(Mark Edwards) #1

Hi All,

We have a basic configuration file up and running adding some enrichment data to a log source,

For the output the receiving platform is expecting to have the existing text string prepending the json to identify the log source.

output {
stdout{
codec=>rubydebug
}
tcp {
codec => json_lines
port => 1514
host => "127.0.0.1"
}
}

the output is valid json but what we need to to is prepend a static string in fronot of that in the output as an identifier so

Identity_String{"name":"xxxxxx","version":"xxx"}\n

instead of

{"name":"xxxxxx","version":"xxx"}\n

that json_lines produces.

I can't get my head around how to do it.

thanks!


(Magnus B├Ąck) #2

You can use the json_encode filter to create a JSON string that you store in a field, then use the line codec with the tcp output and set its message format to "Identity_String%{name-of-json-field}".


(Mark Edwards) #3

Thanks Magnus,

I did try that and got it working on a field by field basis but i need to encode all the field pairs at the root level in the same way that json_lines does and then prepend the text string.

I'm wondering if short or writing a codec plugin that extends json_lines if we could use the ruby plugin to iterate over all the fields once we have done the enrichment and build the json output like that and then store that in a field and use the line codec as you describe, but that feels a bit hacky. The fields extracted in the initial message will vary a little depending on the source so I can't hard code the output.

I've put the wip config file below to give the question some context esentialy we are using logstash to recieve a message, enrich it then forward it on for further procesing but it needs to have the same header prepended that we strip off in the initial filter to parse the json.

input {
  tcp {
    port => 10514
  }
}

filter {
  #Input data is JSON with "XXX xxxx" in front, so strip that off and load json into new var.
  grok {
    match => { "message" => "XXX xxxx%{GREEDYDATA:dest_json}"}
  }

  #parse out data into separate fields from JSON
  json {
    source => "dest_json"
    remove_field => "dest_json"
  }
  
  #create a new blank field for holding the resolved hostname
  mutate {
    add_field => {"name_to_resolve"=>"unresolved"}
  }

  #use ruby to ping the ip in the "src" field
  ruby {
    #"src" is the only field name that will be resolved at this point,
    # if more are needed we can add addtional code
    code => "ip=event.get('src'); event.set('name_to_resolve',`ping -n 1 -a #{ip}`)"
  }

  #parse out ping returned response (will return full DNS)
  grok{
    match => { name_to_resolve => "Pinging %{HOSTNAME:hostname_resolved}" }
  }
  
  #If hostname is not resolved, set to "unresolved"
  if [hostname_resolved] == [src] {
    mutate { 
      replace => {"hostname_resolved" => "unresolved" }
    }
  }
  
  #strip down to just the first part of the resolved name
  else {
    grok{
      match => {hostname_resolved => "%{WORD:netbiosname_resolved}."}
    }
  }
  
  #remove extra fields that logstash added so that we don't add any extra data to what is sent on
  mutate {
    remove_field => "name_to_resolve"
    remove_field => "@timestamp"
    remove_field => "@version"
    remove_field => "host"
    remove_field => "port"
    remove_field => "message"
  }
}

output {
  stdout{
    codec=>rubydebug
  }

  tcp {
    codec => json_lines
    port => 1514
    host => "127.0.0.1"
  }
}

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.