Array datatype limit size?

Hi all,
I'm trying to collect information about login of accounts on a website. Currently I'm able to save, for instance, the last 10 login attempts using the array datatype.
To do that I use Filebeat that sends messages to Logstash to grok strings.
Is there a way to limit the array's size?

This is the logstash.conf used right now:

input { 
	beats {
		client_inactivity_timeout => 600
		port => 5044
	}
}

filter {
 grok {
        break_on_match => false
		patterns_dir => [".\patterns"]
        match => [
            "message", ".*\s%{TT:timestamp}.*login\=\"%{USERNAME:username}\""
        ]
    }
 date {
   match => ["timestamp", "yyyyMMdd HHmmss"]
 }

mutate {
    add_field => {
	 "logins" => ["%{@timestamp}"]
	}
	remove_field => ["timestamp"]
}
}

output {
  elasticsearch { 		
   hosts => ["localhost:9200"]
   index => "ftaudit"
   document_id => "%{[username]}_%{[beat][name]}"
  }
 }

unfortunately, when add_field is performed, it will overwrite the previous value. can anyone help me?
@magnusbaeck maybe you could know the answer?

Please don't ping people who are not yet involved in the thread.
I moved your question to #logstash.

sorry @dadoonet.
at the end I modified the output section of my logstash.conf in this way:

 output {
  elasticsearch { 		
   hosts => ["localhost:9200"]
   index => "ftaudit"
   document_id => "%{[username]}_%{[beat][name]}"
   doc_as_upsert => true
   action => "update"
   script => 'ctx._source.logins += ";""%{@timestamp}"'
  }
 }

but I'm facing this error:

[2018-11-30T19:37:46,744][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["update", {:_id=>"NoTmanaGe_CE", :_index=>"ftaudit", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #LogStash::Event:0x28acefed], :response=>{"update"=>{"_index"=>"ftaudit", "_type"=>"doc", "_id"=>"NoTmanaGe_CE_dxfbft03", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [logins] of type [date]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: "2018-11-30T16:42:10.000Z2018-11-30T16:41:30.000Z" is malformed at "2018-11-30T16:41:30.000Z""}}}}}

so basically, in my index there is only the first document_id of each record. I'm not understanding how to manage my login field, composed by several date

just an update. I changed the output section of logstash.conf in this way:

output {
  elasticsearch { 		
   hosts => ["localhost:9200"]
   index => "ftaudit"
   document_id => "%{[username]}_%{[beat][name]}"
   doc_as_upsert => true
   action => "update"
   script => 'ctx._source.logins.add(%{@timestamp}); if(ctx._source.logins.length > 10) { ctx._source.logins.remove(0); }'
  }

it seems to be the correct way but I'm facing this error:

[2018-12-04T17:48:29,014][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["update", {:_id=>"NoTmanaGe_CE_dxfbft03", :_index=>"ftaudit", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #LogStash::Event:0x3ec54e10], :response=>{"update"=>{"_index"=>"ftaudit", "_type"=>"doc", "_id"=>"NoTmanaGe_CE_ftp", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"compile error", "script_stack"=>["... ource.list.add(2018-11-30T16:52:41.000Z)", " ^---- HERE"], "script"=>"ctx._source.list.add(2018-11-30T16:52:41.000Z)", "lang"=>"painless", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"invalid sequence of tokens near ['T16'].", "caused_by"=>{"type"=>"no_viable_alt_exception", "reason"=>nil}}}}}}}

here it is also the mapping of my index:

{
  "ftaudit" : {
    "mappings" : {
      "doc" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "@version" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "software" : {
            "properties" : {
              "message" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          },
          "beat" : {
            "properties" : {
              "hostname" : {
                "type" : "keyword"
              },
              "name" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "version" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          },
          "client" : {
            "properties" : {
              "ip" : {
                "type" : "keyword"
              },
              "port" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          },
          "filebeat_source" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "host" : {
            "properties" : {
              "name" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          },
          "logins" : {
            "type" : "date"
          },
          "message" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "name" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "offset" : {
            "type" : "long"
          },
          "server" : {
            "properties" : {
              "ip" : {
                "type" : "keyword"
              },
              "port" : {
                "type" : "keyword"
              }
            }
          },
          "tags" : {
            "type" : "keyword"
          },
          "username" : {
            "type" : "keyword"
          }
        }
      }
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.