currently i have the following config setting which concatenate the formid together, group by sessionid.
input {
file {
path => "/usr/share/logstash/data/ML/t_process_log_202005_test.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => "|"
#skip_header => "true"
#skip_empty_rows => "true"
columns => ["ApplicationId","UserId","ProcessId","FormId","Action","Status","Reference","TranRef","IP","CreationDate","SessionId"]
skip_empty_columns => "true"
}
mutate {
gsub => ["message","\r\n",""]
}
mutate {
gsub => ["message","\r",""]
}
mutate {
gsub => ["message","\n",""]
}
if ![message] {
drop { }
}
mutate {
add_field => ["Data_Source", "Web1" ]
}
date {
match => ["CreationDate","ISO8601"]
timezone => "Asia/Singapore"
target => "CreationDate"
}
mutate {
add_field => {"keyfield" => "%{UserId}%{SessionId}" }
}
}
output{
elasticsearch {
hosts => "https://myhost:9200"
index => "table_transform"
user => "elastic"
password => "password"
ssl => true
ssl_certificate_verification => false
cacert => "/etc/elasticsearch/certs/cert1.crt"
document_id => "%{keyfield}"
doc_as_upsert => true
script => 'ctx._source.FormId += " %{[FormId]}"'
action => "update"
}
stdout{}
}
And i tried to add the following elasticsearch filter plugin to read from 'table_transform' index and thinking to check if current record found inside the 'table_transform' index, if found then add new field to indicate the pattern. This new field needed because (FundA,II) and (FundA,DD) belong to same pattern. And this pattern value will be used as the term aggregation in the pie chart.
elasticsearch {
hosts => ["https://myhost:9200"]
index => "table_transform"
user => "elastic"
password => "password"
ca_file => "/etc/elasticsearch/certs/cert1.cer"
query => "keyfield:%{[keyfield]}"
fields => {"formidlist" => "new_FormId"}
}
however, i receive the following error when i run logstash.
[0] "_elasticsearch_lookup_failure"
I assume that logstash will read one record and insert and then read another record and insert.....but somehow logstash doesn't work in the way i imagine it will be.
How should i modify my code ? Or how does logstash function ?