Grok filter

Hi,

I have a csv file which has currently three rows in it which are as follow:

name,age,ip
A,12,10.11.1.12
B,13,10.11.1

Now I want to check during the data ingestion via this csv that whether the IP is in proper format or not using grok.
If it is in proper format then message field =IP
else message field should contain improper IP and and text that "ip is improper"
Can you help please help me with the code.

I would use a csv filter to split [message] into three fields ([name], [age], [ip]).
You can then do

grok { match => { "ip" => "^%{IPV4}$" } }

and make a decision on what to do based on

if "_grokparsefailure" in [tags] { ...

Hi,

Here is my code. Please assist.

file{
path=>"/home/myuser/test/new.csv"
}
}
filter {
csv
{
columns => ["name","Age","IP"]
}

grok {
match => {"%{IP:validIP}" }
}
if "_grokparsefailure" in [tags] { 

*********want to create a new field like "isvalue_correct" in each document with value as "false"**** 
}

output{
elasticsearch {
    hosts => ["http://x.x.x.:9200"]
    index => "mytest_index"
    user => "xxxxxx"
    password => "xxxxxx"
}
}```

Just use mutate and add_field see here

Hi,

I tried below conf:

file
{
path=>"/home/myuser/test/new.csv"
start_position => "beginning"
}
}
filter
{
csv
{
columns => ["name","Age","IP"]
}
grok
{
match => {"%{IP:validIP}" }
}
if  "_grokparsefailure" in [tags] {

      mutate {
        add_field => { "is_value_correct" => "false" }
      }
}
else
{
      mutate
 {
        add_field => { "is_value_correct" => "true" }
      }
}

output{
elasticsearch {
    hosts => ["http://x.x.x.x:9200"]
    index => "mytest_index"
    user => "xxxx"
    password => "xxxxxx"
}
}```


Error : 

[ERROR] 2021-04-06 06:20:33.936 [Converge PipelineAction::Create] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "=>" at line 16, column 27 (byte 166) after filter\n{\ncsv\n{\ncolumns => ["name","Age","IP"]\n}\ngrok \n{\nmatch => {"%{IP:validIP}" ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:184:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:69:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:367:in block in converge_state'"]}
[INFO ] 2021-04-06 06:20:34.169 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2021-04-06 06:20:39.231 [LogStash::Runner] runner - Logstash shut down.


Any help on this.

According to the documentation :
Match in grok filtre is "A hash that defines the mapping of where to look, and with which patterns."

You don't have any "where to look" as Badger show you in the first response.

Hi,
Split the message in three column using csv filter and even define the name of the column. Post that I used the grok filter. Can you please suggest an edit in my code shown above as I am not able to understand your answer.

The grok filter need be like that :
grok { match => {"IP" => "%{IP:validIP}" } }

Cad.

Hi,

After the changes you suggest the code is as follow:

input{
file
{
path=>"/home/myuser/test/new.csv"
start_position => "beginning"
}
}
filter
{
csv
{
columns => ["name","Age","IP"]
}
grok { match => {"IP" => "%{IP:validIP}" } }

if  "_grokparsefailure" in [tags] {

      mutate {
        add_field => { "is_value_correct" => "false" }
      }
}
else
{
      mutate
 {
        add_field => { "is_value_correct" => "true" }
      }
}

output{
elasticsearch {
    hosts => ["http://x.x.x.x:9200"]
    index => "mytest_index"
    user => "xxxx"
    password => "xxxxxx"
}
}```



Error : 


Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2021-04-06 08:53:43.751 [main] runner - Starting Logstash 
[WARN ] 2021-04-06 08:53:44.088 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[ERROR] 2021-04-06 08:53:45.347 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"=>\" at line 30, column 15 (byte 404) after filter\n{\ncsv\n{\ncolumns => [\"name\",\"Age\",\"IP\"]\n}\ngrok { match => {\"IP\" => \"%{IP:validIP}\" } }\nif \"_grokparsefailure\" in [tags] { \n\n      mutate {\n        add_field => { \"is_value_correct\" => \"false\" }\n      }\n}\nelse\n{\n      mutate\n {\n        add_field => { \"is_value_correct\" => \"true\" }\n      }\n}\n\noutput{\nelasticsearch ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:184:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:69:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:367:in `block in converge_state'"]}
[INFO ] 2021-04-06 08:53:45.630 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2021-04-06 08:53:50.489 [LogStash::Runner] runner - Logstash shut down.

Hi,

You do not close the filter.
A } is missing before the output.

Cad.

1 Like

Bro,
You are great!

Thanks for the this help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.