こんにちわ。
コマンドIDをキーとして、ログAとログBが1対1で紐づいている、ということでしょうか。
ということであれば、Logstashの中でOutputのフィルタリングにelasticsearchを指定しているかと思いますが、
ここで、 doc_as_upsertという設定ができるので、これをtrueにしてみてください。
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-doc_as_upsert
こうすることで、IDをキーに1つのドキュメントに対して、それぞれAとBのログをいれると、「コマンドID、コマンド名、コマンド応答」のようなドキュメントとなります。
AのあとにBが入っても、先にBがあって後からAでも、IDさえあればマージされたように見えるはずです。
以下こちらで試したところを貼っておきます。
テストデータ
2つのCSVを用意する。それぞれ、a.csv, b.csvとする。
- a.csv
ID,NAME
1,コマンド名A
2,コマンド名B
3,コマンドー
- b.csv
ID,VALUE
1,成功
2,失敗
3,成功
Logstashの設定
ポイントはOutputのところでUpsertを指定してところです。
document_idがなければinsert、あればupdateとなります。
既存の項目値はそのまま上書されてしまいますが、今回のようにNAMEとVALUEとかぶらないフィールドであればマージされたような状態となり期待した結果となるように思います。
input {
file {
path => "/tmp/a.csv"
start_position => "beginning"
tags => ["type_a"]
}
file {
path => "/tmp/b.csv"
start_position => "beginning"
tags => ["type_b"]
}
}
filter {
if "type_a" in [tags] {
csv {
columns => ["id","name"]
separator => ","
skip_header => true
}
} else {
csv {
columns => ["id","value"]
separator => ","
skip_header => true
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "forum0404"
document_id => "%{id}"
doc_as_upsert => true
action => "update"
}
}
結果
{
"_index" : "forum0404",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"tags" : [
"type_b"
],
"@timestamp" : "2019-04-04T16:48:41.302Z",
"host" : "bd72215c114b",
"id" : "3",
"name" : "コマンドー",
"message" : "3,成功\r",
"path" : "/tmp/b.csv",
"value" : "成功"
}
}
確認した環境
- Elasticserch: 7.0.0-rc2 (on Docker)
- Logstash: 7.0.0-rc2 (on Docker)
- Kibana: 7.0.0-rc2 (on Docker)