Would this work?
input {
file {
path => "/data2/logstash/*.csv"
type => "utm"
start_position => "beginning"
}
}
filter {
csv {
columns => ["g_date", "g_hour", "g_hostname", "nsm_type",
"s_ip", "t_ip", "n_ip", "t_port", "transport_proto", "in_interface",
"out_interface", "group_id", "user_id", "object_name", "object_access",
"http_hostname", "uri", "s_port", "s_bytes", "t_bytes", "duration",
"g_timestamp"]
separator => ","
}
mutate {
lowercase => [ "g_hostname" ]
}
}
filter {
if [nsm_type] == "fw.auth.allow" [nsm_type] == "fw.auth.deny" {
grok {
add_field => [ "service", "fw" ]
}
}
}
output {
elasticsearch {
action => "index"
hosts => ["10.140.56.140:9700", "10.140.56.141:9700", "10.140.56.142:9700", "10.140.56.143:9700", "10.140.56.144:9700", "10.140.56.145:9700"]
index => "utm-%{g_date}"
workers => 12
}
stdout {
codec => rubydebug
}
}