Parsing data from multiple application servers through logstash

Hi Team,

I have application running on 2 servers and application logs are getting logged on both the servers, so i want to parse logs from both servers.

filebeat is installed on two application servers,
logstash is installed on separate two servers and
elasticsearch is installed on three servers (2 of which are logstash servers also)

filebeat.yml is like below,

Application Server1 -

filebeat.inputs:
      -  type: log
         fields_under_root: true
         fields:
           log_type:  federate_server1
           app_id: pf
         multiline.pattern: ^[[:space:]]+(at|\.{3})\b|^Caused by:|^java|^...|^-
         multiline.negate: true
         multiline.match: after
         paths:
           - /opt/federate-0.2.0/federate/log/*

output.logstash:
   hosts: ['logstash1:5044'], ['logstash2:5044']
   loadbalance: true

Application Server 2 -

filebeat.inputs:
      -  type: log
         fields_under_root: true
         fields:
           log_type:  federate_server2
           app_id: pf
         multiline.pattern: ^[[:space:]]+(at|\.{3})\b|^Caused by:|^java|^...|^-
         multiline.negate: true
         multiline.match: after
         paths:
           - /opt/federate-0.2.0/federate/log/*

output.logstash:
   hosts: ['logstash1:5044'], ['logstash2:5044']
   loadbalance: true

logstash.yml -

logstash server1

input {
  beats {
    port => 5044
  }
}

filter {
if [log_type] == "federate_server" and [app_id] == "pf"
  {
    mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{WORD:TraceID}%{SPACE}%{GREEDYDATA:messagetext}" } }
    mutate {
             replace => {
               "[type]" => "federate_server"
             }
           }
  }

output {
  if [log_type] == "federate_server" {
  elasticsearch {
    hosts => ['http://es1:9200', 'http://es2:9200', 'http://es3:9200']
        user => elastic
    password => "${es_pwd}"
     index => "federate"
     template_name => "federate"
     template_overwrite => "false"
      }
 }
  elasticsearch {
    hosts => ['http://es1:9200', 'http://es2:9200', 'http://es3:9200']
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
    user => elastic
    password => "${es_pwd}"
  }
}

logstash server 2

input {
  beats {
    port => 5044
  }
}

filter {
if [log_type] == "federate_server" and [app_id] == "pf"
  {
    mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{WORD:TraceID}%{SPACE}%{GREEDYDATA:messagetext}" } }
    mutate {
             replace => {
               "[type]" => "federate_server"
             }
           }
  }
output {
  if [log_type] == "federate_server" {
  elasticsearch {
    hosts => ['http://es1:9200', 'http://es2:9200', 'http://es3:9200']
        user => elastic
    password => "${es_pwd}"
     index => "federate"
     template_name => "federate"
     template_overwrite => "false"
      }
 }
elasticsearch {
    hosts => ['http://es1:9200', 'http://es2:9200', 'http://es3:9200']
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
    user => elastic
    password => "${es_pwd}"
  }
}

I know currently log_type in both filebeat.yml is not matching with log_type in both logstash.yml file.

Since both the logstash are mentioned in filebeat.yml and loadbalance is true so filebeat will send events either of the logstash servers at a time but two receive events on logstash end how can i add the other log_type in logstash.yml?. currently only one is specified.

  1. i.e Can I change log_type as below on both server's logstash.yml to receive events from both the application server's filebeat?

filter {
if [log_type] == "federate_server1" or if [log_type] == "federate_server2" and [app_id] == "pf"

output {
if [log_type] == "federate_server" or if [log_type] == "federate_server2" {
Elasticsearch {

Is the above or condition correct? if yes, what will come at below [type] => line

 mutate {
             replace => {
               "[type]" => "federate_server"
             }
           }

I just want to parse logs from both the application servers which will be send by filebeat to any logstash server but this currently above config seems to be incorrect as only one log_type will matched as only one if condition is mentioned.

  1. Do we need to mentioned all es hosts in output section (like above its mentioned 3 es hosts or only one is enough and that will forward the requests to other two es nodes in cluster)

Thanks,

Hi All,

I just tried adding or condition in logstash.yml but logstash file validation command is giving error and due to this logstash service is also getting restarted again and again.

filter {
if [log_type] == "federate_server1" or if [log_type] == "federate_server2" and [app_id] == "pf"
  {
    mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{WORD:TraceID}%{SPACE}%{GREEDYDATA:messagetext}" } }
    mutate {
             replace => {
               "[type]" => "federate_server"
             }
           }
  }
 output {
  if [log_type] == "federate_server1" or if [log_type] == "federate_server2" {
  elasticsearch {
    hosts => ['http://es1:9200', 'http://es2:9200', 'http://es3:9200']
        user => elastic
    password => "${es_pwd}"
     index => "federate"
     template_name => "federate"
     template_overwrite => "false"
      }
}

logstash config validation command showing error is due to above config.

if [log_type] == "federate_server1" or if
[2021-09-24T17:29:28,070][FATAL][org.logstash.Logstash    ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:747) ~[jruby-complete-9.2.19.0.jar:?]
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:710) ~[jruby-complete-9.2.19.0.jar:?]
        at usr.share.logstash.lib.bootstrap.environment.<main>(/usr/share/logstash/lib/bootstrap/environment.rb:89) ~[?:?]

logstash logs -

``
[2021-09-24T17:32:16,364][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "(" at line 73, column 50 (byte 2916) after filter {\nif [log_type] == "developer-portal-api_app_server" and [app_id] == "node"\n {\n grok { match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:json_message}" } } json { source => "json_message" }\n mutate {\n replace => {\n "[type]" => "developer-portal-api_app_server"\n }\n }\n }\nif [log_type] == "developer-portal-spa_app_server" and [app_id] == "node"\n {\n grok { match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:json_message}" } } json { source => "json_message" }\n mutate {\n replace => {\n "[type]" => "developer-portal-spa_app_server"\n }\n }\n }\nif [log_type] == "ob-admin-api_app_server" and [app_id] == "node"\n {\n grok { match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:json_message}" } } json { source => "json_message" }\n mutate {\n replace => {\n "[type]" => "ob-admin-api_app_server"\n }\n }\n }\nif [log_type] == "ob-admin-spa_app_server" and [app_id] == "node"\n {\n grok { match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:json_message}" } } json { source => "json_message" }\n mutate {\n replace => {\n "[type]" => "ob-admin-spa_app_server"\n }\n }\n }\nif [log_type] == "consent-spa_app_server" and [app_id] == "node"\n {\n grok { match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:json_message}" } } json { source => "json_message" }\n mutate {\n replace => {\n "[type]" => "consent-spa_app_server"\n }\n }\n }\nif [log_type] == "obie-api_app_server" and [app_id] == "app"\n {\n mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TraceID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{GREEDYDATA:messagetext}" } }\n mutate {\n replace => {\n "[type]" => "obie-api_app_server"\n }\n }\n }\nif [log_type] == "access_server" and [app_id] == "pa"\n {\n grok { match => { "message" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:%{MINUTE}(?::?%{SECOND})\| %{USERNAME:exchangeId}\| %{DATA:trackingId}\| %{NUMBER:RoundTrip:int}%{SPACE}ms\| %{NUMBER:ProxyRoundTrip:int}%{SPACE}ms\| %{NUMBER:UserInfoRoundTrip:int}%{SPACE}ms\| %{DATA:Resource}\| %{DATA:subject}\| %{DATA:authmech}\| %{DATA:scopes}\| %{IPV4:Client}\| %{WORD:method}\| %{DATA:Request_URI}\| %{INT:response_code}\| %{DATA:failedRuleType}\| %{DATA:failedRuleName}\| %{DATA:APP_Name}\| %{DATA:Resource_Name}\| %{DATA:Path_Prefix}" } }\n mutate {\n replace => {\n "[type]" => "access_server"\n }\n }\n }\nif [log_type] == "federate_server1" or if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:391:in block in converge_state'"]}
.
.
.
.

[2021-09-24T17:32:18,079][INFO ][logstash.javapipeline ][.monitoring-logstash] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x7796125 run>"}
[2021-09-24T17:32:18,895][INFO ][logstash.javapipeline ][.monitoring-logstash] Pipeline Java execution initialization time {"seconds"=>0.81}
[2021-09-24T17:32:18,928][INFO ][logstash.javapipeline ][.monitoring-logstash] Pipeline started {"pipeline.id"=>".monitoring-logstash"}
[2021-09-24T17:32:20,978][INFO ][logstash.javapipeline ][.monitoring-logstash] Pipeline terminated {"pipeline.id"=>".monitoring-logstash"}
[2021-09-24T17:32:21,153][INFO ][logstash.runner ] Logstash shut down.
``

logstash service is getting restarted continuously.

no docs are getting indexed due to this problem.

green  open   federate-000001               FTjBayPLQreqk3PMtb4LJg   2   1          0            0       832b           416b

Can someone please point out how to correctly do this?

Thanks,

After removing second if (after or), validation command gave ok output. also can see indexed getting docs and growing in size but not sure how can i confirm that its parsing logs from both the App servers and not just from any one server.

since you already add field type from each server, you can verify whether log comes from both server by filtering each type. you might need to remove this config though, since it replaces the identifier

Hi @ptamba,

Thanks for your reply.

You mean to say it will replace federate_server1 or federate_server2 with only federate_server ?

Can you please check and confirm once again.

I can see value of type as log (type: log) so it will replace log as federate_server right?

The intention is to parse the logs from both the server so that it can be identified from which server they have came when they are indexed into es.

Thanks,

you already have this in your filebeat. i assume server2, will have log_type value of federate_server2

if you don’t need to do anything else to the log on logstash , then just send them to output, and search for log_type fields in ES. you should have federate_server1 and federate_server2 if logs are coming from both server

if you need to do different things based on source , then on logstash

if [log_type] == ‘federate_server1” {
  #do something to log from server1
}

if [log_type] == ‘federate_server2”
  #do something to log from server2”
}

on logstash output, unless you want logs go to different index or different output, there is no need to use conditionals. but if you have other logstash config in place you can do

output {
  if [log_type] == “federate_server1” or [log_type] == “federate_server2” {
     #output config 
  }

}

@ptamba, Thanks for your reply and sorry for delay.

Yes, as you can see in very first comment above under Application Server 2 output.

I do not want do anything other than parsing logs from both application servers through logstash and be able to identify them on kibana i.e which log is from which application servers out of two.

So in this case, is below correct? but as you said it's removing the identifier by mutate { replace config below.

I still not get this, my identifier are log_type and not type and in the mutate config, its written as replace type. don't you think either,
i) it should be log_type there (if want to remove the identifier)
ii) or as you said, need to remove the config from mutate as it's removing the identifier.

filter {
if [log_type] == "federate_server1" or [log_type] == "federate_server2" and [app_id] == "pf"
  {
    mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{WORD:TraceID}%{SPACE}%{GREEDYDATA:messagetext}" } }
    mutate {
             replace => {
               "[type]" => "federate_server"
             }
           }
  }

here also i do not want to do anything other than creating index based on the log_type, apply index_template and ultimately send them to elasticsearch .

If i correctly understand your reply on use of conditionals, the if statements are there as there are different applications logs (see below output) getting parsed through logstash (though i have only mentioned one above), may be that is why you are saying not to use conditionals.

Below is the example of three applications config in logstash.

filter {
if [log_type] == "portal-api_app_server" and [app_id] == "node"
  {
    grok { match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:json_message}"  } } json { source =>  "json_message" }
    mutate {
             replace => {
               "[type]" => "portal-api_app_server"
             }
           }
  }
if [log_type] == "federate_ping_server" and [app_id] == "pf"
  {
    mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{WORD:TraceID}%{SPACE}%{GREEDYDATA:messagetext}" } }
    mutate {
             replace => {
               "[type]" => "federate_ping_server"
             }
           }
  }
if [log_type] == "directory_ping_server" and [app_id] == "pd"
  {
    mutate { gsub => ["message","\|"," "] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MY_DATE_PATTERN:timestamp}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{UUID:ConsentID}%{SPACE}%{WORD:TransactionID}%{SPACE}%{WORD:TraceID}%{SPACE}%{GREEDYDATA:messagetext}" } }
    mutate {
             replace => {
               "[type]" => "directory_ping_server"
             }
           }
  }
}  
output {
 if [log_type] == "portal-api_app_server" {
  elasticsearch {
    hosts => ['http://10.10.10.242:9200']
        user => elastic
    password => "${es_pwd}"
     index => "portal-api"
     template_name => "portal-api"
     template_overwrite => "false"
      }
 }
   if [log_type] == "federate_ping_server" {
  elasticsearch {
    hosts => ['http://10.10.10.242:9200']
        user => elastic
    password => "${es_pwd}"
     index => "federate"
     template_name => "federate"
     template_overwrite => "false"
      }
 }
 if [log_type] == "directory_ping_server" {
  elasticsearch {
    hosts => ['http://10.10.10.242:9200']
        user => elastic
    password => "${es_pwd}"
     index => "directory"
     template_name => "directory"
     template_overwrite => "false"
      }
 }
 elasticsearch {
    hosts => ['http://10.10.10.242:9200']
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
    user => elastic
    password => "${es_pwd}"
  }
}

Thanks,

Hi @ptamba,

Could you please confirm.

Thanks,

if i understand correctly

on input server1

  • log_type: federate_server1
  • app_id: pf

on input server2

  • log_type: federate_server2
  • app_id: pf

why do you have

  • if [log_type] == “federate_server” ?

based on that input i will do

filter { 
  if [app_id] == “pf” { 
     # parse pf logs 
  } 
} 

output { 
  if [app_id] == “pf” { 
    # output config for pf 
  }
}

which will match all logs with [app_id] == “pf” regardless of the source

Hi @ptamba,

Thanks for your reply.

Got your point on use of app_id: pf instead of log_type: federate_server1 and log_type: federate_server2.
app_id: pf will anyway match logs coming from both the application servers.

In this case, can you please update mutate config. Does it requries?

mutate {
             replace => {
               "[type]" => "federate_server"
             }
           }
  }

depends, what do you want to do with that filter