Reindex with Logstash (Elasticsearch --> Filter --> Elasticsearch) looses data on the way

Hi

I am trying to reindex data from a daily index to an daily archive index, while dropping or combining some fields.
Easiest way to do this seams to be with LS and elasticsearch input and output with some filters in between.

→ LS processes the pipeline successfully, but the document count doesn't match.

Source: GET test-2017-11-10/_count gives { "count": 70213, "_shards": { "total": 5, "successful": 5, "failed": 0 }}

Archive, run 1: GET archive.test-2017-11-10/_count gives { "count": 69713, "_shards": { "total": 5, "successful": 5, "failed": 0 }}
Archive, run 2: GET archive.test-2017-11-10/_count gives { "count": 70171, "_shards": { "total": 5, "successful": 5, "failed": 0 }}
Archive, run 2: GET archive.test-2017-11-10/_count gives { "count": 70213, "_shards": { "total": 5, "successful": 5, "failed": 0 }} -- Only now the count is correct!

Does anyone know what the issue could be?

Many thanks!
Oliver

ENV

> ./logstash/bin/logstash -V
logstash 6.0.0
> ./elasticsearch/bin/elasticsearch -V
Version: 2.3.3, Build: 218bdf1/2016-05-17T15:40:04Z, JVM: 1.8.0_45

(Note: LS 6.0.0 might not be ideal for ES 2.3.3 – I also tried LS 2.3.4 without success )

Output

No obvious errors

$> ./logstash/bin/logstash -f conf/reorg.conf -l logs --path.data data/
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:108: warning: already initialized constant DEFAULT_MAX_POOL_SIZE
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:110: warning: already initialized constant DEFAULT_req_TIMEOUT
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:111: warning: already initialized constant DEFAULT_SOCKET_TIMEOUT
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:112: warning: already initialized constant DEFAULT_CONNECT_TIMEOUT
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:113: warning: already initialized constant DEFAULT_MAX_REDIRECTS
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:114: warning: already initialized constant DEFAULT_EXPECT_CONTINUE
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:115: warning: already initialized constant DEFAULT_STALE_CHECK
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:590: warning: already initialized constant ISO_8859_1
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:641: warning: already initialized constant KEY_EXTRACTION_REGEXP
Sending Logstash's logs to logs which is now configured via log4j2.properties
[2017-12-04T15:20:54,347][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/opt/elasticsearch/logstash/logstash/modules/fb_apache/configuration"}
[2017-12-04T15:20:54,350][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/opt/elasticsearch/logstash/logstash/modules/netflow/configuration"}
[2017-12-04T15:20:54,534][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2017-12-04T15:20:54,777][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-12-04T15:20:57,926][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch index=>"archive.%{[@metadata][_index]}", document_type=>"%{[@metadata][_type]}", document_id=>"%{[@metadata][_id]}", manage_template=>"false", id=>"747fc5d5bfae9c6f5887a430a33a737c685501d2f579f6b4c49f547a43dbba30">}
[2017-12-04T15:20:58,387][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2017-12-04T15:20:58,392][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2017-12-04T15:20:58,544][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
[2017-12-04T15:20:58,605][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1"]}
[2017-12-04T15:20:58,633][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x68464447@/opt/elasticsearch/logstash/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-04T15:20:58,713][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-04T15:20:58,733][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2017-12-04T15:21:36,991][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}

LS Config (for v6.0):

input {
  elasticsearch {
    index => "test-2017-11-11"
    docinfo => true
  }
}

filter {
  date {
    match => ["front_req", ISO8601 ]
  }
  if [front_req] and [front_res] and [back_req] and [back_res] {
    date {
      match => ["front_req", ISO8601 ]
      target => "front_req"
    }
    date {
      match => ["front_res", ISO8601 ]
      target => "front_res"
    }
    date {
      match => ["back_req", ISO8601 ]
      target => "back_req"
    }
    date {
      match => ["back_res", ISO8601 ]
      target => "back_res"
    }
    ruby {
      code => "
        back = (event.get('back_res') - event.get('back_req')) * 1000
        total = (event.get('front_res') - event.get('front_req')) * 1000
        front = total - back
        event.set 'back_time_ms', back
        event.set 'front_time_ms', front
      "
    }
    mutate {
      remove_field => [
        "back_reply_body", "consumer_request_body",
        "back_reply_header", "consumer_request_header",
        "error",
        "back_req", "back_res", "front_req", "front_res"
      ]
    }
  }
}

output {
  elasticsearch {
    index => "archive.%{[@metadata][_index]}"
    document_type => "%{[@metadata][_type]}"
    document_id => "%{[@metadata][_id]}"
    manage_template => false
  }
}

Okay, found the problem: The Elasticsearch instance did suffer from a out-of-memory error (OOM). Unfortunately with version 2.x the behaviour is so, that the node will not terminate, but will hang in some dangling state, hence the unpredictable behaviour.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.