Elasticsearch内のデータの結合について


#1

初心者の者です。
Aというログには、「コマンドID、コマンド名」Bというログには、「コマンドID、コマンド応答」が多数記載されています。
それぞれのログをlogstashから取り込んで、「コマンドID、コマンド名、コマンド応答」
というドキュメントにしたい場合、どのようにすればよいのでしょうか?
ログを取り込む前に、kibanaのdevtoolsに何か設定しておいてうまくできるのでしょうか?
基本的なことで申し訳ありませんがご教授ください。


(tsgkdt) #2

こんにちわ。

コマンドIDをキーとして、ログAとログBが1対1で紐づいている、ということでしょうか。
ということであれば、Logstashの中でOutputのフィルタリングにelasticsearchを指定しているかと思いますが、
ここで、 doc_as_upsertという設定ができるので、これをtrueにしてみてください。
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-doc_as_upsert

こうすることで、IDをキーに1つのドキュメントに対して、それぞれAとBのログをいれると、「コマンドID、コマンド名、コマンド応答」のようなドキュメントとなります。
AのあとにBが入っても、先にBがあって後からAでも、IDさえあればマージされたように見えるはずです。

以下こちらで試したところを貼っておきます。

テストデータ

2つのCSVを用意する。それぞれ、a.csv, b.csvとする。

  • a.csv
ID,NAME
1,コマンド名A
2,コマンド名B
3,コマンドー
  • b.csv
ID,VALUE
1,成功
2,失敗
3,成功

Logstashの設定

ポイントはOutputのところでUpsertを指定してところです。
document_idがなければinsert、あればupdateとなります。
既存の項目値はそのまま上書されてしまいますが、今回のようにNAMEとVALUEとかぶらないフィールドであればマージされたような状態となり期待した結果となるように思います。

input {
  file {
      path => "/tmp/a.csv"
      start_position => "beginning"
      tags => ["type_a"]
  }

  file {
      path => "/tmp/b.csv"
      start_position => "beginning"
      tags => ["type_b"]
  }
}

filter {

   if "type_a" in [tags] {

       csv {
            columns => ["id","name"]
            separator => ","
            skip_header => true
       }
   } else {

       csv {
            columns => ["id","value"]
            separator => ","
            skip_header => true
       }
   }


   
}

output {
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "forum0404"
        document_id => "%{id}"
        doc_as_upsert => true
        action => "update"
    }
}

結果

{
        "_index" : "forum0404",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "tags" : [
            "type_b"
          ],
          "@timestamp" : "2019-04-04T16:48:41.302Z",
          "host" : "bd72215c114b",
          "id" : "3",
          "name" : "コマンドー",
          "message" : "3,成功\r",
          "path" : "/tmp/b.csv",
          "value" : "成功"
        }
      }

確認した環境

  • Elasticserch: 7.0.0-rc2 (on Docker)
  • Logstash: 7.0.0-rc2 (on Docker)
  • Kibana: 7.0.0-rc2 (on Docker)

参考情報


#3

早急なご回答、ありがとうございました。
ご教授いただいた通りにやったらうまくいきました。大変助かりました。

一つ興味でお聞きしたいのですが、Referenceマニュアルのdoc_as_upsertの記載を見ると
updateモードを有効にする、みたいなことが書いてあるのでdoc_as_upsertだけ追記すればいい
のかな、と思い、action=>の行をはじめ省いてやってみたのですが、うまくできませんでした。
このあたりのマニュアルからは読めないところをどのように習得していっていますか?

よろしくお願いいたします。


(tsgkdt) #4

すみません、action => "update" のところについて言及しなかったのは、言葉足らずでした。

さて、公式マニュアルだけじゃよく分からん!というときどうするか、です。
私なら、という私見になりますがご容赦ください。

確実なのは・・・

  1. Elastic社のトレーニングに行き質問をたくさんしてくる。(ちょうど5、6月にトレーニングあります)
  2. サブスクリプションをつけて、サポートの人に聞く

直近のトレーニング予定はこちら(日本語で研修受けられます)
https://training.elastic.co/instructor-led-training/ElasticsearchEngineerI
なお、回し者ではありません。

他には・・・私が思いつく習得方法です。

  1. Elastic社のイベントにいき Ask me Anythingブースで質問しまくる(本当に何でも聞いて良いらしい)
  2. Elasticsearchの勉強会にいき、懇親会で人を見つけて聞いてみる(どうやって情報収集していますか? こういうことで困ってるんですよ、とか)
  3. DiscussやGitHubのissueで類似の事例がないかどうかを”検索”する(なければ、スレッドをたてて実際に聞いてみる)
  4. 自分で仮説を立てて試行錯誤してみる
  5. OSSの強みを活かして、ソースを読む

コミュニティに参加したり頼ったりすることで、習得が早まると思いますです。


#5

どうもありがとうございます。
やはり公式マニュアルを読むだけでなく、有識者からノウハウを伝授頂かないとですね。
参考になります。


#6

お世話になっております。
ご教授いただいた内容から、以下を新たに実現させたいと思っています。

・コマンドIDが同じでかつ、ログ内のタイムスタンプの「時」が1だけずれている際に、何か別の同一のIDにして紐づけたい

上記を要望する背景ですが tsgkdt様の例でいうと、a.csvとb.csv内のIDはログの中で使いまわされており、データを一意には紐づけられないことがわかりました。また、二つのログは要求ログ、応答ログの関係にあり、ログ内のタイムスタンプが最大数十秒ずれており、タイムスタンプのみをIDにして括り付けることもできないことがわかっています。
そこで、
①IDが同じである
②タイムスタンプがずれていてもせいぜい「時」が1ずれているだけ
というのを条件にして括り付ければよい、という条件を設定しました。
が、「すでにDBに登録されている値とこれから入れようとしている値を比較して差が1以下であれば・・・」のようなif文をlogstash? のconfigに入れ込むことは可能なのでしょうか?
※logstashのconfigはあくまでElasticsearchに入れる際の条件を書いているので、そのようなことは
できなそうだな・・・・と思っているのですが。

よろしくお願いいたします。


(tsgkdt) #7

予めどちらかのデータがインデックス済みで、検索可能状態になっている、という制限はありますが、
こんなアプローチも考えられそうです。

前提

  1. 2つのインデックスは、共通のIDというフィールドを持っており、値は共通である
  2. IDとタイムスタンプが±1秒の範囲内という条件で、一意に対となるデータがみつかる

filterの中でElasticsearchにクエリを投げ、その結果を取得してデータを加工することができます。

https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

考えられるアプローチ

後からデータを入れる方が、先に入っているデータを”検索して”、その結果をもとにインデックスを登録するようにする。

IDが同じで、timestampで前後1秒の時間のズレ、というものがありますが、これをクエリで表現して結合先の対象データを取得できれば良いと考えます。

Logstashのfilter部分をこのように書いて、template.jsonを使ってクエリを渡すようにします。


filter {
  elasticsearch {
  
    hosts => ["xxx.xxx.xxx.xxx:9200"]
    index => "testtesttest"
    query_template => "template.json"
    # メタ情報
    docinfo_fields => {
      "_id" => "document_id"
      "_index" => "document_index"
    }
    #取得した検索結果から、値を取り出してマッピングする
    fields => {
      "code" => "code"
    }
  }
}

template.jsonでは、IDが同じ、前後1秒のズレを考慮したクエリを書きます。


{
    "size": 1,
    "sort": [
        {
            "@timestamp": "desc"
        }
    ],
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "@timestamp": {
                            "gte": "%{[target_date]}||-1s",
                            "lt": "%{[target_date]}||+1s",
                            "time_zone": "+00:00"
                        }
                    }
                },
                {
                    "term": {
                        "_id": {
                            "value": "%{[target_id]}"
                        }
                    }
                }
            ]
        }
    },
    "_source": [
        "code"
    ]
}

rangeクエリのところを見ていただくと、%{[フィールド名]} があり、ここにLogstashで置換される部分です。
後ろに||+1s||-1sと書くことで、前後1秒を求めています。

もし置換文字列でなければ、このようなイメージです。

 "range": {
     "@timestamp": {
         "gte": "2019-04-16 21:00:10||-1s",
          "lt": "2019-04-16 21:00:10||+1s",
          "time_zone": "+00:00"
     }
}

このアプローチですと、Elasticsearchで検索できることが前提なので、既にデータが入っていて検索可能となっていなければなりません。
また、場合によっては多量のクエリが発行されることもあるため、必ずしも良いやり方とは言えないでしょう。

時間をずれないようにするとか、もう少し手前の処理で2つのファイルを統合するような処理を別に検討するとか考えても良いように思います。