Elasticsearch filter plugin does not work as expected

I'm trying to calculate job duration time using elasticsearch filter.
The problem is that I can not get job.duration when I input the whole json.

logstash conf:

input {
  stdin { codec => "json" }
}

filter {
          date {
            match => ["[job][log_date]" , "yyyy/MM/dd HH:mm:ss.SSS"]
            target => "[job][log_date]"
          }

      if [job][message_id] == "JC00000002" {
        elasticsearch {
          hosts => ["srv03:9200"]
          query => 'tags:startjobnet AND job.jobnet_id:"%{[job][jobnet_id]}"'
          index => "tmp-*"
          enable_sort => true
          sort => "job.log_date:desc"
          fields => { "[job][log_date]" => "started" }
        }

        date {
          match => [ "[started]", "ISO8601"]
          target => "[started]"
        }

        if [started] {
          ruby {
            code => "event.set('[job][duration]', (event.get('[job][log_date]') - event.get('started')) )"
          }
        }
      }

      if [job][message_id] == "JC00000001" {
        mutate { add_tag => [ "startjobnet" ] }
      }
}

output {
    elasticsearch {
      hosts => ["srv03:9200"]
      index => "tmp-%{+yyyy.MM.dd}"
    }
}

json input:

{ "job": { "log_date": "2019/03/27 17:06:03.108", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000001", "message": "start jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } },
{ "job": { "log_date": "2019/03/27 17:06:04.008", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000002", "message": "end jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } }
{ "job": { "log_date": "2019/03/27 17:10:02.216", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000001", "message": "start jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } },
{ "job": { "log_date": "2019/03/27 17:10:03.901", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000002", "message": "end jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } }
{ "job": { "log_date": "2019/03/27 17:14:02.216", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000001", "message": "start jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } },
{ "job": { "log_date": "2019/03/27 17:16:03.901", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000002", "message": "end jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } }
{ "job": { "log_date": "2019/03/27 17:26:03.108", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000001", "message": "start jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } },
{ "job": { "log_date": "2019/03/27 17:26:04.008", "inner_jobnet_main_id": 922556, "inner_jobnet_id": 922556, "run_type": 0, "public_flag": 0, "jobnet_id": "hoge", "job_id": "", "message_id": "JC00000002", "message": "end jobnet", "jobnet_name": "hoge-mon", "job_name": "", "user_name": "user1", "update_date": "2019/03/12 14:15:14", "return_code": "" } }

But I cat get job.duration by inputting json one by one manually.
elk_ok

It seems logstash filter pass to Output after the whole event completely processed in Filter area.
Is there any way to query between events that occur at almost same time by elasticsearch filter plugin?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.