I have 2 logstash pipelines. The first one uploads log lines in elasticsearch using csv filter. These are the columns [timestamp,machine,application]. The other pipeline gets data from elasticsearch, sorts them by timestamp and exports them to csv.
The only thing that doesn't work is sorting.
First pipeline:
input {
beats {
port => "5044"
}
}
filter {
if "messages" in [tags] {
grok {
match => { "message" => "%{MONTH:month} +%{MONTHDAY:day} %{TIME:time} %{SYSLOGHOST:machine} %{DATA:application}: %{GREEDYDATA:msg}"}
}
mutate {
add_field => {
"temp" => "%{month} %{day} %{time} 2020"
}
remove_field => ["month","day","time"]
}
date{
match => [ "temp", "MMM d HH:mm:ss yyyy"]
target => "logdate"
}
ruby{
code =>'event.set("timestamp", event.get("logdate").to_i)'
}
}
csv {
add_field =>{
"timestamp" => "%{timestamp[0]}"
"machine" => "%{machine[0]}"
"application" => "%{application}"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
Second pipeline:
input {
elasticsearch {
hosts => "localhost"
}
}
filter {
elasticsearch {
hosts => "localhost"
index => "*"
query => '{ "query": { "query_string": { "query": "*" } } }'
sort => "timestamp:desc"
}
mutate {
convert => {
"lat" => "float"
"lon" => "float"
"weight" => "float"
}
}
}
output {
csv {
fields => ["timestamp","machine","application"]
path => "output/output.csv"
}
}
I don't know how to syntax elasticsearch input plugin and elasticsearch filter plugin to accomplish sorting.