Hi,
I have a csv file which has currently three rows in it which are as follow:
name,age,ip
A,12,10.11.1.12
B,13,10.11.1
Now I want to check during the data ingestion via this csv that whether the IP is in proper format or not using grok.
If it is in proper format then message field =IP
else message field should contain improper IP and and text that "ip is improper"
Can you help please help me with the code.
Badger
April 4, 2021, 6:54pm
2
I would use a csv filter to split [message] into three fields ([name], [age], [ip]).
You can then do
grok { match => { "ip" => "^%{IPV4}$" } }
and make a decision on what to do based on
if "_grokparsefailure" in [tags] { ...
Hi,
Here is my code. Please assist.
file{
path=>"/home/myuser/test/new.csv"
}
}
filter {
csv
{
columns => ["name","Age","IP"]
}
grok {
match => {"%{IP:validIP}" }
}
if "_grokparsefailure" in [tags] {
*********want to create a new field like "isvalue_correct" in each document with value as "false"****
}
output{
elasticsearch {
hosts => ["http://x.x.x.:9200"]
index => "mytest_index"
user => "xxxxxx"
password => "xxxxxx"
}
}```
stephenb
(Stephen Brown)
April 5, 2021, 2:33pm
4
Just use mutate and add_field see here
Hi,
I tried below conf:
file
{
path=>"/home/myuser/test/new.csv"
start_position => "beginning"
}
}
filter
{
csv
{
columns => ["name","Age","IP"]
}
grok
{
match => {"%{IP:validIP}" }
}
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "is_value_correct" => "false" }
}
}
else
{
mutate
{
add_field => { "is_value_correct" => "true" }
}
}
output{
elasticsearch {
hosts => ["http://x.x.x.x:9200"]
index => "mytest_index"
user => "xxxx"
password => "xxxxxx"
}
}```
Error :
[ERROR] 2021-04-06 06:20:33.936 [Converge PipelineAction::Create] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "=>" at line 16, column 27 (byte 166) after filter\n{\ncsv\n{\ncolumns => ["name","Age","IP"]\n}\ngrok \n{\nmatch => {"%{IP:validIP}" ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:184:in
initialize'", "org/logstash/execution/JavaBasePipelineExt.java:69:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:367:in
block in converge_state'"]}
[INFO ] 2021-04-06 06:20:34.169 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2021-04-06 06:20:39.231 [LogStash::Runner] runner - Logstash shut down.
Any help on this.
Cad
April 6, 2021, 7:58am
6
According to the documentation :
Match in grok filtre is "A hash that defines the mapping of where to look, and with which patterns."
You don't have any "where to look" as Badger show you in the first response.
Hi,
Split the message in three column using csv filter and even define the name of the column. Post that I used the grok filter. Can you please suggest an edit in my code shown above as I am not able to understand your answer.
Cad
April 6, 2021, 8:23am
8
The grok filter need be like that :
grok { match => {"IP" => "%{IP:validIP}" } }
Cad.
Hi,
After the changes you suggest the code is as follow:
input{
file
{
path=>"/home/myuser/test/new.csv"
start_position => "beginning"
}
}
filter
{
csv
{
columns => ["name","Age","IP"]
}
grok { match => {"IP" => "%{IP:validIP}" } }
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "is_value_correct" => "false" }
}
}
else
{
mutate
{
add_field => { "is_value_correct" => "true" }
}
}
output{
elasticsearch {
hosts => ["http://x.x.x.x:9200"]
index => "mytest_index"
user => "xxxx"
password => "xxxxxx"
}
}```
Error :
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2021-04-06 08:53:43.751 [main] runner - Starting Logstash
[WARN ] 2021-04-06 08:53:44.088 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[ERROR] 2021-04-06 08:53:45.347 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"=>\" at line 30, column 15 (byte 404) after filter\n{\ncsv\n{\ncolumns => [\"name\",\"Age\",\"IP\"]\n}\ngrok { match => {\"IP\" => \"%{IP:validIP}\" } }\nif \"_grokparsefailure\" in [tags] { \n\n mutate {\n add_field => { \"is_value_correct\" => \"false\" }\n }\n}\nelse\n{\n mutate\n {\n add_field => { \"is_value_correct\" => \"true\" }\n }\n}\n\noutput{\nelasticsearch ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:184:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:69:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:367:in `block in converge_state'"]}
[INFO ] 2021-04-06 08:53:45.630 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2021-04-06 08:53:50.489 [LogStash::Runner] runner - Logstash shut down.
Cad
April 6, 2021, 10:02am
10
Hi,
You do not close the filter.
A }
is missing before the output.
Cad.
1 Like
Bro,
You are great!
Thanks for the this help.
system
(system)
Closed
May 4, 2021, 10:55am
12
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.