Data filter input

Hello

For a technical constraint we want to change the format of the data in the logs processed by filebeat .
On the logs there are time fields that will be modified to milliseconds in input, in this case we want to apply the modification in the logstash pipeline.

time_in
time_out
time

Here is the production logstash configuration.

input {
  beats {
    port => 5044
    type => "filebeat"
  }
}

filter {
  mutate {
    rename => { "[fields][env]" => "env" }
  }
  mutate {
    rename => { "[fields][app]" => "app" }
  }
  if [app] == "provin" {
    dissect {
      mapping => { "message" => "%{fluxid}|%{time_in}|%{time_out}|%{time}|%{typeflux}|%{typeflux_desc}|%{bu}|%{reforigin}|%{contractoidval}|%{useroidval}|%{status}|%{errorcode}|%{errormessage}|%{host_app}" }
    }
    ruby {
      code => "event.set('typeflux_desc', event.get('typeflux_desc').to_s.gsub(/[\]\[]/, '').split(','))"
    }
    date {
      match => [ "time_in", "YYYYMMddHHmmss" ]
      target => "time_in"
    }
    date {
      match => [ "time_out", "YYYYMMddHHmmss" ]
      target => "time_out"
    }
  } else if [app] == "router" {
    dissect {
      mapping => { "message" => "%{fluxid}|%{time_in}|%{time_out}|%{time}|%{typeflux}|%{typeflux_desc}|%{messageid_backend}|%{reforigin}|%{contractoidval}|%{useroidval}|%{host_app}" }
    }
    ruby {
      code => "
      event.set('backend', event.get('messageid_backend').to_s.gsub(/(\[\d*)|]/, '').gsub(/^,/, '').split(','))
      event.set('backend_count', event.get('backend').length)
      "
    }
    date {
      match => [ "time_in", "YYYYMMddHHmmss" ]
      target => "time_in"
    }
    date {
      match => [ "time_out", "YYYYMMddHHmmss" ]
      target => "time_out"
    }
  } else if [app] == "provout" {
    dissect {
      mapping => { "message" => "%{messageid}|%{time_in}|%{time_out}|%{time}|%{typeflux}|%{typeflux_desc}|%{reforigin}|%{backend}|%{contractoidval}|%{useroidval}|%{status}|%{errorcode}|%{errormessage}" }
    }
    ruby {
      code => "
      event.set('typeflux', event.get('typeflux').to_s.gsub(/_[A-Z]*$/, ''))
      event.set('backend', event.get('backend').to_s.gsub(/[A-Z]*$/, ''))
      "
    }
    date {
      match => [ "time_in", "YYYYMMddHHmmssSSS" ]
      target => "time_in"
    }
    date {
      match => [ "time_out", "YYYYMMddHHmmssSSS" ]
      target => "time_out"
    }
  }
  mutate {
    rename => { "[beat][hostname]" => "host" }
  }
  mutate {
    rename => { "[fields][bu]" => "bu" }
  }
  mutate {
    remove_tag => ["beats_input_codec_plain_applied"]
  }
  mutate {
    remove_field => [ "path", "message", "tags" ]
  }
}

output {
  if ![app] {
    elasticsearch {
      hosts => [ "localhost:9200" ]
      index => "error-filebeat"
    }
  } else {
    elasticsearch {
      hosts => [ "localhost:9200" ]
      index => "app-%{env}-%{app}-%{+YYYY.MM.dd}"
    }
  }
}

Thank you for your feedback

sai

It is unclear what you are asking.

the question is how we can consider this elements in Unix Timestamps .

time_in
time_out
time

regards

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.