Logstash configuration input error with if statement

Hi everyone,

I'm trying to create a pipeline in logstash which receives CEF events and other input from Filebeat. Since not all input is in CEF format, I created an IF-statement.

My logstash.conf looks like this:

  beats {
        port => 5044
          if "forcepoint" in [tags] {codec => cef {}}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logstash-%{[fields][flavor]}-%{+YYYY.MM.dd}"
#  stdout { codec => rubydebug }

But when I check the config using config test and exit, I get the following error:

[FATAL] 2020-08-23 14:39:47.004 [LogStash::Runner] runner - The given configuration is invalid. Reason: Expected one of [ \t\r\n], "#", "=>" at line 6, column 14 (byte 129) after input {
  beats {
        port => 5044
[ERROR] 2020-08-23 14:39:47.006 [LogStash::Runner] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

When I uncomment the if statement and run the config test again, it says the config is OK. So I guess there must be a mistake with the formatting of the if statement.

Can someone please tell me what I need to change? Or is it even possible to use the codec plugin and conditional statements in the input section?

Any help is appreciated!

Please set log.level to debug and run again to see more debug log.

Sorry, some part of the error message was missing. I updated my post.

Try putting your condition statements in the filter block and not the input.

You cannot use a conditional like that inside an input configuration. See this thread for a way to do this using multiple pipelines.

Thanks for the suggestion! I tried this:

filter {
  if "forcepoint" in [tags] {
    codec => cef 

But it gives me an error again.

[FATAL] 2020-08-23 18:45:24.966 [LogStash::Runner] runner - The given configuration is invalid. Reason: Expected one of [ \t\r\n], "#", "{" at line 17, column 11 (byte 432) after filter {
  if "forcepoint" in [tags] {

Thank you for the help! I'm not quite understanding your solution. Do you basically send it from one pipeline to another? My problem is that the message field is in CEF format.
I'm sorry if the question sounds silly, I'm new to ElasticStack.

BTW: Do you have experience with the decode_cef processor in Filebeat? Could that also be a option?

A codec can only be used on an input. The idea is that you accept the events using a beats input, then in the output section, if they have that "forcepoint" tag you can use a tcp output to send them to another pipeline that has a tcp input with a cef codec. That second pipeline can then send the decoded events to elasticsearch.

Ok, got it now. Thank you!

Note that you could do it using a single pipeline, but this is a terrible idea, since if you get anything wrong it sets up an infinite loop, like this

input {
     beats { port => 5044 }
     tcp { port => 5144 codec => cef {} }
output {
    if "forcepoint" in [tags] {
        tcp { port => 5144 }
    } else {
        elasticsearch { ... }

Where you might get away with

input {
    beats { port => 5044 }
    tcp { port => 5144 codec => cef {} tags => [ "decoded" ] }
output {
    if "forcepoint" in [tags] and "decoded" not in [tags] {
        tcp { port => 5144 }
    } else {
        elasticsearch { ... }

Yes, that's not a good idea.
I think the solution with two pipelines is the only suitable one. I'm also thinking about dropping Filebeat and sending the logs from Forcepoint via TCP/UDP directly to Logstash. But I'm not sure if that's an option as I'm not responsible for the firewalls.

In case someone is having the same issues, I also found this solution: https://gist.github.com/mzpqnxow/85a95010bedf8915056582fcfa34c3e7 But I haven't tried it yet.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.