Hi all,
I'm trying to collect information about login of accounts on a website. Currently I'm able to save, for instance, the last 10 login attempts using the array
datatype.
To do that I use Filebeat that sends messages to Logstash to grok strings.
Is there a way to limit the array's size?
This is the logstash.conf used right now:
input {
beats {
client_inactivity_timeout => 600
port => 5044
}
}
filter {
grok {
break_on_match => false
patterns_dir => [".\patterns"]
match => [
"message", ".*\s%{TT:timestamp}.*login\=\"%{USERNAME:username}\""
]
}
date {
match => ["timestamp", "yyyyMMdd HHmmss"]
}
mutate {
add_field => {
"logins" => ["%{@timestamp}"]
}
remove_field => ["timestamp"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "ftaudit"
document_id => "%{[username]}_%{[beat][name]}"
}
}
unfortunately, when add_field is performed, it will overwrite the previous value. can anyone help me?
@magnusbaeck maybe you could know the answer?
Please don't ping people who are not yet involved in the thread.
I moved your question to #logstash.
sorry @dadoonet.
at the end I modified the output section of my logstash.conf in this way:
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "ftaudit"
document_id => "%{[username]}_%{[beat][name]}"
doc_as_upsert => true
action => "update"
script => 'ctx._source.logins += ";""%{@timestamp}"'
}
}
but I'm facing this error:
[2018-11-30T19:37:46,744][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["update", {:_id=>"NoTmanaGe_CE", :_index=>"ftaudit", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #LogStash::Event:0x28acefed], :response=>{"update"=>{"_index"=>"ftaudit", "_type"=>"doc", "_id"=>"NoTmanaGe_CE_dxfbft03", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [logins] of type [date]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: "2018-11-30T16:42:10.000Z2018-11-30T16:41:30.000Z" is malformed at "2018-11-30T16:41:30.000Z""}}}}}
so basically, in my index there is only the first document_id
of each record. I'm not understanding how to manage my login
field, composed by several date
just an update. I changed the output section of logstash.conf
in this way:
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "ftaudit"
document_id => "%{[username]}_%{[beat][name]}"
doc_as_upsert => true
action => "update"
script => 'ctx._source.logins.add(%{@timestamp}); if(ctx._source.logins.length > 10) { ctx._source.logins.remove(0); }'
}
it seems to be the correct way but I'm facing this error:
[2018-12-04T17:48:29,014][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["update", {:_id=>"NoTmanaGe_CE_dxfbft03", :_index=>"ftaudit", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #LogStash::Event:0x3ec54e10], :response=>{"update"=>{"_index"=>"ftaudit", "_type"=>"doc", "_id"=>"NoTmanaGe_CE_ftp", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"compile error", "script_stack"=>["... ource.list.add(2018-11-30T16:52:41.000Z)", " ^---- HERE"], "script"=>"ctx._source.list.add(2018-11-30T16:52:41.000Z)", "lang"=>"painless", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"invalid sequence of tokens near ['T16'].", "caused_by"=>{"type"=>"no_viable_alt_exception", "reason"=>nil}}}}}}}
here it is also the mapping of my index:
{
"ftaudit" : {
"mappings" : {
"doc" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"software" : {
"properties" : {
"message" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"beat" : {
"properties" : {
"hostname" : {
"type" : "keyword"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"client" : {
"properties" : {
"ip" : {
"type" : "keyword"
},
"port" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"filebeat_source" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"host" : {
"properties" : {
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"logins" : {
"type" : "date"
},
"message" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"offset" : {
"type" : "long"
},
"server" : {
"properties" : {
"ip" : {
"type" : "keyword"
},
"port" : {
"type" : "keyword"
}
}
},
"tags" : {
"type" : "keyword"
},
"username" : {
"type" : "keyword"
}
}
}
}
}
}
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.