Aggregateフィルタについて。重複してしまいます。

logstash のAggregateフィルタを使って、下記のように、同じIDを目印に、fieldの項目を配列にして一行にしたいと思っています。
inputはjdbc input pluginを使い、mysqlからデータを取ってきています。

|id_field|field|
|100 |AAA |
|100 |BBB |
|100 |CCC |
|200 |DDD |
|200 |EEE |

|id_field|fields |
|100 |AAA,BBB,CCC|
|200 |DDD,EEE |

しかし、以下のように一部まとまらない行があるという状態になっています。

|id_field|fields |
|100 |AAA,BBB|
|100 |CCC |
|200 |DDD,EEE|

filter{
    aggregate {
            task_id => "%{id_field}"
            code => "
            map['id_field'] = event.get('id_field')
            map['fields'] ||= []
            map['fields'] << event.get('field')
            event.cancel()
            "
            push_previous_map_as_event => true
            timeout => 3
    }
}

task_id => "%{id_field}"で指定している限りそうなるはずだと思っているのですが。。
足りない情報などあればお教え下さい。

既に確認されているかもしれませんが、こちらで指摘されているような
ページングやソート順といった内容はクリアされてますでしょうか?

aggregate filterは、シングルスレッドでしか正常動作しないので、
filter処理のworker数を1に設定する必要があります。
設定されているでしょうか?

設定されていない場合、
logstsh.ymlでpipeline.workersを1に設定して起動してみてください。
https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html

また、サービス起動ではなく、コマンドライン起動している場合は、
-w 1 を追加しても同じ設定が可能です。
https://www.elastic.co/guide/en/logstash/current/running-logstash-command-line.html

注意点としては、上記は全てのconfがシングルスレッドで処理されてしまいます。
動作させるconf毎にworker数を設定する場合は、
マルチパイプラインで起動して、pipeline.yml側でpipeline毎に
worker数を設定する必要があります。
https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

ご回答ありがとうございます。こちら解決しました。
2つ原因があり、

1つめは、jsonフィルタを中で使っていたのですが、一部jsonではないデータがありそのフィルタをスキップしていたこと
2つめはご提示いただいたworker数を1にすることで解決しました。

ありがとうございました。

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.