Grok filter log


(Rouchad Rouchad) #1

hi every one i need your help please :
i want parse a log file with grok filter but it doesn't work i don't know why :
the log file :

Aug 24 14:26:49 192.168.0.61 firewall 514865 11:12:13:14:15:16

the logstash config :
input {
file {
path => "C:\elk\logstash\file.log"
start_position => "beginning"
sincedb_path => "nul"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:date} %{IP:ip} %{WORD:service} %{NUMBER:numero}" }
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "firstlog"
}
stdout {}
}

what i get :
C:\elk\logstash\bin>logstash -f firslog.conf
Sending Logstash's logs to C:/elk/logstash/logs which is now configured via log4j2.properties
[2018-05-17T16:53:45,702][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"C:/elk/logstash/modules/fb_apache/configuration"}
[2018-05-17T16:53:45,743][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"C:/elk/logstash/modules/netflow/configuration"}
[2018-05-17T16:53:46,654][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-05-17T16:53:49,183][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-05-17T16:53:51,043][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-05-17T16:53:53,655][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 13, column 18 (byte 267) after filter {\n grok {\n match => { "message" => "%{SYSLOGTIMESTAMP:date} %{IP:ip} %{WORD:service} %{NUMBER:numero}" }\n}\noutput {\n elasticsearch ", :backtrace=>["C:/elk/logstash/logstash-core/lib/logstash/compiler.rb:42:in compile_imperative'", "C:/elk/logstash/logstash-core/lib/logstash/compiler.rb:50:incompile_graph'", "C:/elk/logstash/logstash-core/lib/logstash/compiler.rb:12:in block in compile_sources'", "org/jruby/RubyArray.java:2486:inmap'", "C:/elk/logstash/logstash-core/lib/logstash/compiler.rb:11:in compile_sources'", "C:/elk/logstash/logstash-core/lib/logstash/pipeline.rb:51:ininitialize'", "C:/elk/logstash/logstash-core/lib/logstash/pipeline.rb:169:in initialize'", "C:/elk/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:inexecute'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:315:in block in converge_state'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:141:inwith_pipelines'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:312:in block in converge_state'", "org/jruby/RubyArray.java:1734:ineach'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:299:in converge_state'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:166:inblock in converge_state_and_update'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:164:inconverge_state_and_update'", "C:/elk/logstash/logstash-core/lib/logstash/agent.rb:90:in execute'", "C:/elk/logstash/logstash-core/lib/logstash/runner.rb:348:inblock in execute'", "C:/elk/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}


#2

You have a } to close the grok, but you do not have one to close filter. Add a }.


(Rouchad Rouchad) #3

i dit what u tell me to do and i get this :
C:\elk\logstash\bin>logstash -f firslog.conf
Sending Logstash's logs to C:/elk/logstash/logs which is now configured via log4j2.properties
[2018-05-17T17:37:00,151][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"C:/elk/logstash/modules/fb_apache/configuration"}
[2018-05-17T17:37:00,318][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"C:/elk/logstash/modules/netflow/configuration"}
[2018-05-17T17:37:02,071][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-05-17T17:37:05,767][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-05-17T17:37:07,945][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-05-17T17:37:30,877][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-05-17T17:37:33,498][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-05-17T17:37:33,523][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-05-17T17:37:37,807][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2018-05-17T17:37:40,725][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-05-17T17:37:40,758][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-05-17T17:37:40,811][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-05-17T17:37:40,940][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-05-17T17:37:43,097][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
[2018-05-17T17:37:47,455][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x561191aa run>"}
[2018-05-17T17:37:47,798][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}


#4

That looks like a normal startup. Did the contents of your file get to elasticsearch? After processing the contents, logstash will sit there tailing the file to see if anything is appended to it.


(Rouchad Rouchad) #5

heey Badger please i need ur help

here is another example of an email that I want to deal with with grok filter ..
the problem is that i can not apply the filter correctly i tested the filter in grok debugger and it worked well but when i run my logstash file and i see the email in kibana nothing has changed as if I did not do filters

there is the email :

Dear Sir/Madam,

We have detected abuse from the IP address ( 197.230.107.154 ), which according to a whois lookup is on your network. We would appreciate if you would investigate and take action as appropriate. Any feedback is welcome but not mandatory.

Log lines are given below, but please ask if you require any further information.

(If you are not the correct person to contact about this please accept our apologies - your e-mail address was extracted from the whois record by an automated process. This mail was generated by Fail2Ban.)

IP of the attacker: 197.230.107.154

You can contact us by using: abuse-reply@keyweb.de

Addresses to send to:
noc_isp@meditel.ma

==================== Excerpt from log for 197.230.107.154 ====================
Note: Local timezone is +0100 (CET)
Mar 7 20:44:05 shared07 sshd[5371]: Invalid user admin from 197.230.107.154
Mar 7 20:44:05 shared07 sshd[5371]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=197.230.107.154
Mar 7 20:44:07 shared07 sshd[5371]: Failed password for invalid user admin from 197.230.107.154 port 33040 ssh2
Mar 7 20:44:08 shared07 sshd[5371]: Connection closed by 197.230.107.154 port 33040 [preauth]

how i want to see the email in kibana after apply the grok filter :
"message": [
[
"Dear Sir/Madam",
"We have detected abuse from the IP address ( 197.230.107.154 ), which according to a whois lookup is on your network. We would appreciate if you would investigate and take action as appropriate. Any feedback is welcome but not mandatory",
"Log lines are given below, but please ask if you require any further information",
"(If you are not the correct person to contact about this please accept our apologies - your e-mail address was extracted from the whois record by an automated process. This mail was generated by Fail2Ban.",
"Addresses to send to",
"Excerpt from log for 197.230.107.154 ===================",
"Note: Local timezone is +0100 (CET)
"Attacker_IP": [
[
"197.230.107.15"
]
],
"sender": [
[
"abuse-reply@keyweb.d"
]
],
"receiver": [
[
"noc_isp@meditel.m"
]
],
"time_attack": [
[
"Mar 7 20:44:05",
"Mar 7 20:44:05",
"Mar 7 20:44:07",
"Mar 7 20:44:08"
]
],
"Attack": [
[
"Invalid user admin from 197.230.107.15",
"pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=197.230.107.15",
"Failed password for invalid user admin from 197.230.107.154 port 33040 ssh",
"Connection closed by 197.230.107.154 port 33040 [preauth]"
]
]

there is my config file logstash :
input {
imap {
host => "imap.gmail.com"
password => "xxxxxxx"
user => "rouchad767@gmail.com"
port => 993
secure => true
fetch_count => 15
check_interval => 10
strip_attachments => true
folder => "Inbox"
}
}
filter {
if [from] == "abuseorange47@gmail.com" {
grok {
match => { "message" => "%{GREEDYDATA:message}%{NOTSPACE}%{SPACE}%{GREEDYDATA:message}%{NOTSPACE}%{SPACE}%{GREEDYDATA:message}%{NOTSPACE}%{SPACE}%{GREEDYDATA:message}%{NOTSPACE}%{SPACE}%{CISCO_REASON}%{NOTSPACE}%{SPACE}%{GREEDYDATA:Attacker_IP}%{NOTSPACE}%{SPACE}%{CISCO_REASON}%{NOTSPACE}%{SPACE}%{GREEDYDATA:sender}%{NOTSPACE}%{SPACE}%{GREEDYDATA:message}%{NOTSPACE}%{SPACE}%{GREEDYDATA:receiver}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{GREEDYDATA:message}%{NOTSPACE}%{SPACE}%{GREEDYDATA:message}%{SPACE}%{NOTSPACE}%{SPACE}%{CISCOTIMESTAMP:time_attack}%{SPACE}%{WORD}%{SPACE}%{WORD}%{SPACE}%{NOTSPACE}%{WORD}%{SPACE}%{NOTSPACE}%{SPACE}%{GREEDYDATA:Attack}%{NOTSPACE}%{SPACE}%{CISCOTIMESTAMP:time_attack}%{SPACE}%{WORD}%{SPACE}%{WORD}%{SPACE}%{NOTSPACE}%{WORD}%{SPACE}%{NOTSPACE}%{SPACE}%{GREEDYDATA:Attack}%{NOTSPACE}%{SPACE}%{CISCOTIMESTAMP:time_attack}%{SPACE}%{WORD}%{SPACE}%{WORD}%{SPACE}%{NOTSPACE}%{WORD}%{SPACE}%{NOTSPACE}%{SPACE}%{GREEDYDATA:Attack}%{NOTSPACE}%{SPACE}%{CISCOTIMESTAMP:time_attack}%{SPACE}%{WORD}%{SPACE}%{WORD}%{SPACE}%{NOTSPACE}%{WORD}%{SPACE}%{NOTSPACE}%{SPACE}%{GREEDYDATA:Attack}" }
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
index => "golden"
document_type => "email"
hosts => "localhost:9200"
}

}


#6

Once again I wlil say that trying to match a pattern with a large number of DATA or GREEDYDATA is a terrible approach.

Anyways, if I had to process text like that I would use multiple smaller patterns. For example...

grok { match => [ "message", "==================== Excerpt from log for %{IPV4:attackip} ====================%{GREEDYDATA:attacklog}" ] }

(Rouchad Rouchad) #7

thank you very much Badger for ur answer ...
excuse me but I don't understand very well .... can I ask you a favor please?
please can you apply this type of filter to this entire email so that I can understand and see how things work properly ?

because i have to deal with seven types of messages if you help me understand this type........ i will be very grateful !


(Rouchad Rouchad) #8

Hi Blaireau , can you help me please ?


(system) #9

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.