One logstash pipeline listening multiple formats (same input)

Hi -

Working on a new requirement to parse through one file containing two different formatted messages and send to different indices on elastic.

Filebeat (same file with two different formatted messages) -> Logstash (beats input) -> Send to two output Elasticsearch based on the format it is.

Eg:

Input file:

01/03/2022 This is a DB Log
01/03/2022 This is a Appserver log
01/03/2022 This is a DB Log
01/03/2022 This is a DB Log
01/03/2022 This is a DB Log
01/03/2022 This is a Appserver log
01/03/2022 This is a Appserver log

Logstash config:

input {
  beats {
       port => 5001
       client_inactivity_timeout => "86400"
 }
}
filter {
<parse differently for DB log vs App server log
}
output {
if <DB log> -> Send to DB index
else if <App server log> -> Send to App server log
}

This is a high volume log file and trying to see the most efficient way of doing it. Any ideas are highly appreciated.

You will need to use conditionals in both filter and output blocks, you also need something in the message to identify which line is a DB log and which line is an App log.

Basically you need something like this for the filter and output block.

filter {
    if "something that identifies a DB log" in [message] {
        parse for DB logs
        mutate {
            add_tag => ["db-logs"]
        }
    }
    if "something that identifies a App log" in [message] {
        parse for App logs
        mutate {
            add_tag => ["app-logs"]
        }
    }
}
output {
    if "db-logs" in [tags] {
        output for db index
    }
    if "app-logs" in [tags] {
        output for app index
    }
}

Since after you parse the message it is common to remove the message field, adding tags with the mutate filter will allow you to filter the logs in the output.

Great, thanks Leandro! Will give it a try.

I tried to implement this but see the below error:

Failed to execute action ..
message=>"Expected one of [ \\t\\r\\n], \"#\", \"in\", \"not \", \"==\", \"!=\", \"<=\", \">=\", \"<\", \">\", \"=~\", \"!~\", \"and\", \"or\", \"xor\", \"nand\", \"{\" at line 18, column 28 (byte 302) after filter {\n\n  if \"\\\\\\\" in [source] {\n\n        json{\n                source => \"", ...

Here is my config file:

input {
   file {
         path => "/tmp/companycom.json"
         start_position => "beginning"
         sincedb_path => "NUL"
        }
}

filter {

  if "\\\" in [source] {

        json{
                source => "message"
        }

        if "beats_input_codec_plain_applied" in [tags] {
        mutate {
            remove_tag => "beats_input_codec_plain_applied"
        }
    }

        mutate {
                gsub => ["message","\\"",""]
        }

        mutate {
                remove_field => [ "[agent][id]","[agent][type]","[agent][hostname]","[agent][ephemeral_id]","[agent][version]","[ecs][version]","[log][offset]","[input][type]","[beat][name]","[beat][hostname]","[beat][version]","[prospector][type]","offset","sessionId","host"]
                lowercase => [ "logLevel" ]
    }

        mutate {
                rename => { "machineName" => "jvm" }
                rename => { "SourceContext" => "loggerName" }
                rename => { "url" => "requestUri" }
        }

        mutate {
                add_field => {"logFilePath" => "%{[log][file][path]}"}
                add_tag => ["file-based"]
        }

        mutate{
                update => { "responseBody" => "" }
              }

        mutate {
                remove_field => ["source"]
     }
        mutate {
                add_tag => ["company2mod_calls"]
               }
}
    else if "\\\" not in [source]
     {
        json{
                source => "message"
        }

        if "beats_input_codec_plain_applied" in [tags] {
        mutate {
            remove_tag => "beats_input_codec_plain_applied"
        }
    }

     grok {
       match => [ "message", "%{TIMESTAMP_ISO8601}\s*.*?%{HOUR}:%{MINUTE}\s*\[%{DATA:Info}\]\s*%{GREEDYDATA:Data}" ]
       overwrite => [ "message" ]
      }

           json{
                source => "Data"
        }

        mutate {
                remove_field => ["source"]
    }

        mutate {
                add_tag => ["companysql_calls"]
               }

     }

}
output {
  if ["company2mod_calls"] in tags
  {
    elasticsearch
    {
        hosts => ["https://host1:9200" , "https://host2:9200"]
        ssl => true
        cacert => "/path/logstash/certs/company_Root.crt"
        ssl_certificate_verification => true
        user => logstash_user
        password => "${logstash.user.password}"
        sniffing => false
        manage_template => false
        index => "dev-com-logs-%{+YYYY.MM.dd}"
        ilm_enabled => false
    }
  }
  if ["companysql_calls"] in tags
  {
    {
        hosts => ["https://host1:9200" , "https://host2:9200"]
        ssl => true
        cacert => "xz.crt"
        ssl_certificate_verification => true
        user => logstash_user
        password => "${logstash.user.password}"
        sniffing => false
        manage_template => false
        index => "dev-sql-logs-%{+YYYY.MM.dd}"
        ilm_enabled => false
    }
}

Am I missing something?

You cannot escape a backslash at the end of a string. It is interpreted as escaping the quote, so logstash thinks

"\\\" in [source] {

    json{
            source => "

is a quoted string and then it comes upon message and cannot parse it.

In some cases you can use a regular expression containing a character group that contains a backslash. For example, to check if [source] contains a backslash you could use

if source =~ "[\]" {

I am uncertain what you intended to test using if "\\\" in. Can you explain what you were trying to do?

Thanks Badger. The reason I was looking for "\" in the source message is one of the log types has the three backslashes in it while the other log type does not. And so am trying to look for three backslashes and apply different filters if present vs not present and send them to two different indices in the output section. So, if I follow what you mentioned - I should be using I should be something like this?

 if source =~ "[\\\]" {

For three consecutive backslashes you could use

if source =~ "[\]{3}" {

I tried with this configuration:

filter {

  if "[\]{3}" in [source] {

        json{
                source => "message"
        } ...
        mutate {
                add_tag => ["company2mod_calls"]
               } }
    else if "[\]{3}" inot in [source] {
        json{
                source => "message"
        }
        mutate {
                add_tag => ["companysql_calls"]
               }
}
output {
  if ["company2mod_calls"] in tags
  {
    elasticsearch
    {
        hosts => ["https://host1:9200" , "https://host2:9200"]
        ssl => true
        cacert => "/path/logstash/certs/company_Root.crt"
        ssl_certificate_verification => true
        user => logstash_user
        password => "${logstash.user.password}"
        sniffing => false
        manage_template => false
        index => "dev-com-logs-%{+YYYY.MM.dd}"
        ilm_enabled => false
    }
  }
  if ["companysql_calls"] in tags
  {
    {
        hosts => ["https://host1:9200" , "https://host2:9200"]
        ssl => true
        cacert => "xz.crt"
        ssl_certificate_verification => true
        user => logstash_user
        password => "${logstash.user.password}"
        sniffing => false
        manage_template => false
        index => "dev-sql-logs-%{+YYYY.MM.dd}"
        ilm_enabled => false
    }
}

Looks like it went past the filter section fine, but failing at the output section now:

message=>"Expected one of [ \\t\\r\\n], \"#\", \"(\" at line 130, column 3 (byte 3260) after output {\n  if [\"company2mod_calls\"] in tags\n  "

I think that should be if "company2mod_calls" in [tags]

Now this error:

message=>"Expected one of [ \\t\\r\\n], \"#\", \"(\" at line 130, column 3 (byte 3258) after output {\n  if \"company2mod_calls\" in tags ", 

Config:

output {
  if "company2mod_calls" in tags
  {
    elasticsearch { ...}
  }
 if "companysql_calls" in tags
  {
    elasticsearch { ...}
  }
}

You need square brackets around the field reference -- [tags], not tags.

Little progress, I think it's erroring on second if condition

output {
  if "company2mod_calls" in [tags]
  {
    elasticsearch { ...}
  }
 if "companysql_calls" in [tags]
  {
    elasticsearch { ...}
  }
}

Also, tried with

output {
  if "company2mod_calls" in [tags]
  {
    elasticsearch { ...}
  }
 else if "companysql_calls" in [tags]
  {
    elasticsearch { ...}
  }
}

Error:

after output {\n  if \"company2mod_calls\" in [tags]\n  { \n    elasticsearch\n    {\n        hosts => [\"https://host1:9200\" , \"https://host2:9200\"]\n        ssl => true\n        cacert => \"path/logstash/certs/company_Root.crt\"\n        ssl_certificate_verification => true\n        user => logstash_user\n        password => \"${logstash.user.password}\"\n        sniffing => false\n        manage_template => false\n        index => \"dev-companycom-logs-%{+YYYY.MM.dd}\"\n        ilm_enabled => false \n        #ilm_rollover_alias => \"dev-mod-logs\"\n        #ilm_pattern => \"{now/d}-000001\"\n        #ilm_policy => \"DEV_mod_ILM_Policy\"\n    }\n  }\n  else if \"companysql_calls\" in [tags]\n  {\n    ", :backtrace

I cannot explain that error.

Can you share the full error? Just part of it does not help, but it looks like another configuration error.

Share your full output configuration as well, it looks like something is wrong in it.

Here's the full error:

[2022-01-06T13:10:34,532][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:logs-companycom-filebeat-1-dev01-01, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"if\", [A-Za-z0-9_-], '\"', \"'\", \"}\" at line 150, column 5 (byte 3922) after output {\n  if \"company2mod_calls\" in [tags]\n  { \n    elasticsearch\n    {\n        hosts => [\"https://dc-es09.companydm.com:9200\" , \"https://dc-es10.companydm.com:9200\"]\n        ssl => true\n        cacert => \"path/logstash/certs/company_Root.crt\"\n        ssl_certificate_verification => true\n        user => logstash_user\n        password => \"${logstash.user.password}\"\n        sniffing => false\n        manage_template => false\n        index => \"dev-companycom-logs-%{+YYYY.MM.dd}\"\n        ilm_enabled => false \n        #ilm_rollover_alias => \"dev-mod-logs\"\n        #ilm_pattern => \"{now/d}-000001\"\n        #ilm_policy => \"DEV_mod_ILM_Policy\"\n    }\n  }\n  else if \"companysql_calls\" in [tags]\n  {\n    ", :backtrace=>["path/logstash/logstash-7.13.2/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "path/logstash/logstash-7.13.2/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "path/logstash/logstash-7.13.2/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "path/logstash/logstash-7.13.2/logstash-core/lib/logstash/agent.rb:389:in `block in converge_state'"]}

And here's the full config:

input {
#  beats {
#        port => 5356
#        client_inactivity_timeout => "86400"
#  }
   file {
         path => "/tmp/companycom.json"
         start_position => "beginning"
         sincedb_path => "NUL"
        }
}

filter {

  if "[\]{3}" in [source] {

        json{
                source => "message"
        }

        if "beats_input_codec_plain_applied" in [tags] {
        mutate {
            remove_tag => "beats_input_codec_plain_applied"
        }
    }

        mutate {
                gsub => ["message","\\"",""]
        }

        mutate {
                remove_field => [ "[agent][id]","[agent][type]","[agent][hostname]","[agent][ephemeral_id]","[agent][version]","[ecs][version]","[log][offset]","[input][type]","[beat][name]","[beat][hostname]","[beat][version]","[prospector][type]","offset","sessionId","host"]
                lowercase => [ "logLevel" ]
    }

        mutate {
                rename => { "machineName" => "jvm" }
                rename => { "SourceContext" => "loggerName" }
                rename => { "url" => "requestUri" }
        }

        mutate {
                add_field => {"logFilePath" => "%{[log][file][path]}"}
                add_tag => ["file-based"]
        }

        mutate{
                update => { "responseBody" => "" }
              }


    if "_" in [coorelationId] {
            mutate {
                        add_field => ["tempcid", "%{coorelationId}"]
                    }
            mutate{
                        split => ["tempcid", "_"]

                        add_field => {"sessionId" => "%{[tempcid][0]}"}
                        add_field => {"GUID" => "%{[tempcid][1]}"}
                        add_field => {"endPointIdentifier" => "%{[tempcid][2]}"}
                    }
                mutate{
                        remove_field => ["tempcid"]
                }
    }

        if "companydm" in [hostname]
        {
                mutate {
                        add_field => ["tempHostName", "%{hostname}"]
                }

                mutate{
                        split => ["tempHostName", "-"]

                        add_field => {"dataCenter" => "%{[tempHostName][0]}"}
                }

                mutate{
                        remove_field => ["tempHostName"]
                }

                mutate{
                        lowercase => ["dataCenter"]
                }
        }

        mutate {
                remove_field => ["source"]
     }
        mutate {
                add_tag => ["company2mod_calls"]
               }
}
    else if "[\]{3}" not in [source] {

        json{
                source => "message"
        }

        if "beats_input_codec_plain_applied" in [tags] {
        mutate {
            remove_tag => "beats_input_codec_plain_applied"
        }
    }

     grok {
       match => [ "message", "%{TIMESTAMP_ISO8601}\s*.*?%{HOUR}:%{MINUTE}\s*\[%{DATA:Info}\]\s*%{GREEDYDATA:Data}" ]
       overwrite => [ "message" ]
      }

           json{
                source => "Data"
        }

        mutate {
                remove_field => ["source"]
    }

        mutate {
                add_tag => ["companysql_calls"]
               }

     }

}
output {
  if "company2mod_calls" in [tags]
  {
    elasticsearch
    {
        hosts => ["https://dc-es09.companydm.com:9200" , "https://dc-es10.companydm.com:9200"]
        ssl => true
        cacert => "path/logstash/certs/company_Root.crt"
        ssl_certificate_verification => true
        user => logstash_user
        password => "${logstash.user.password}"
        sniffing => false
        manage_template => false
        index => "dev-companycom-logs-%{+YYYY.MM.dd}"
        ilm_enabled => false
        #ilm_rollover_alias => "dev-mod-logs"
        #ilm_pattern => "{now/d}-000001"
        #ilm_policy => "DEV_mod_ILM_Policy"
    }
  }
  else if "companysql_calls" in [tags]
  {
    {
        hosts => ["https://dc-es09.companydm.com:9200" , "https://dc-es10.companydm.com:9200"]
        ssl => true
        cacert => "path/logstash/certs/company_Root.crt"
        ssl_certificate_verification => true
        user => logstash_user
        password => "${logstash.user.password}"
        sniffing => false
        manage_template => false
        index => "dev-companysql-logs-%{+YYYY.MM.dd}"
        ilm_enabled => false
        #ilm_rollover_alias => "dev-mod-logs"
        #ilm_pattern => "{now/d}-000001"
        #ilm_policy => "DEV_mod_ILM_Policy"
    }
}
}

You are missing the output name. It needs to be

else if "companysql_calls" in [tags]
{
  elasticsearch
  {
      hosts => ["https://dc-es09.companydm.com:9200"

ah my bad, I see that now. It's working!! Thanks much for the help!!! You guys are amazing.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.