Help wtih KV Plugin

I have a cef log being sent to us but I can't get the kv plugin to work. When I apply this config, I don't get any errors but I don't see any logs from it so it's breaking it somehow. Below is the config I'm giving it and an example log. I have a priority value first followed by logs in the k=v format. Message does get broken down into syslog_pri and syslog_message but as soon as I apply any variation of kv{} I don't get any logs. Any suggestions?

filter {
    if "syslog" in [tags] {
            if ([message] =~ /XG330/) {
                mutate {
                        add_tag => [ "sophos" ]
                }
                grok {
                     match => {"message" => "<%{POSINT:syslog_pri}>%{GREEDYDATA:syslog_message}"}
                }
		    kv {
			source => "syslog_message"
			}
            }
    }

}

<160>device="SFW" date=2019-08-14 time=15:39:41 timezone="EDT" device_name="XG330" device_id=XX079CW3XXXXX log_id=0629106 log_type="Event" log_component="Firewall Authentication" log_subtype="Authentication" status="Successful" priority=Information user_name="user@company.com" usergroupname="VPNUsers" auth_client="SSLVPN" auth_mechanism="N/A" reason="" src_ip=10.0.0.4 src_mac="" start_time=1565781152 sent_bytes=0 recv_bytes=0 message="User user@company.com was logged out of firewall" name="user@company.com" timestamp=1565811581

If you use

output { stdout { codec => rubydebug { metadata => false } } }

what do you see? Your grok and kv parse the message for me...

        "log_id" => "0629106",
       "message" => "User user@company.com was logged out of firewall",
          "tags" => [
    [0] "sophos"
],
     "device_id" => "XX079CW3XXXXX",
"auth_mechanism" => "N/A",
     "timestamp" => "1565811581",
   "auth_client" => "SSLVPN",
[...]

I don't have direct console access to this because we use a salt master-minion configuration to manage these. I can write to a log file and have tried this config but I get errors (below) when trying to restart the logstash service. We already have another config running an output statement so it may be conflicting with that but I can't disable that one to test.

filter {
    if "syslog" in [tags] {
            if ([message] =~ /XG330/) {
                mutate {
                        add_tag => [ "sophos" ]
                }
                grok {
                     match => {"message" => "<%{POSINT:syslog_pri}>%{GREEDYDATA:syslog_message}"}
                }
                    kv {
                        source => "syslog_message"
                        }
                }
    }
}



output {
        file {
                path => /tmp/sophos_debug_output
                codec => rubydebug { metadata => false }
        }
}

Aug 15 14:31:24 sbnhsh11 logstash[6255]: [2019-08-15T14:31:24,220][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, ", ', -, [, { at line 112, column 25 (byte 4397) after output {\n file {\n path => ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:incompile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in block in compile_sources'", "org/jruby/RubyArray.java:2486:inmap'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:49:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:167:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:inexecute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:305:in `block in converge_state'"]}

You need double quotes around the filename.

Thanks that did it to create the file but the file became too large (~100 mb per minute) so I added the same if statement from the filter to the output section and that gave me just these logs. When reviewing them I do see they are getting parsed but not sure why they aren't making it to our Kibana. If I remove the kv section, I get them?!?

             "@timestamp" => 2019-08-16T13:12:12.960Z,
           "in_interface" => "tun0",
             "recv_bytes" => "0",
               "src_port" => "137",
               "protocol" => "UDP",
               "timezone" => "EDT",
              "sent_pkts" => "0",
               "@version" => "1",
          "tran_dst_port" => "0",
            "policy_type" => "0",
               "dst_port" => "137",
             " recv_pkts" => "0",
               "log_type" => "Firewall",
             "sent_bytes" => "0",
               "priority" => "Information",
          "log_component" => "SSL VPN"

If you are not seeing them in Kibana then it could be because the events are not reaching elasticsearch (e.g. a mapping exception). Running a query in the developer console of Kibana might tell you whether the events are in elasticsearch. If they are in elasticsearch but you are not seeing them in Kibana then the query you are running (either the filter or the time picker) does not include them.

That was it. We had a conflict of one of these new fields with the index we were putting this logs into and moving to it's own index resolved this. Thanks for all your help on this!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.