Hi
I am trying to reindex data from a daily index to an daily archive index, while dropping or combining some fields.
Easiest way to do this seams to be with LS and elasticsearch
input and output with some filters in between.
→ LS processes the pipeline successfully, but the document count doesn't match.
Source: GET test-2017-11-10/_count
gives { "count": 70213, "_shards": { "total": 5, "successful": 5, "failed": 0 }}
Archive, run 1: GET archive.test-2017-11-10/_count
gives { "count": 69713, "_shards": { "total": 5, "successful": 5, "failed": 0 }}
Archive, run 2: GET archive.test-2017-11-10/_count
gives { "count": 70171, "_shards": { "total": 5, "successful": 5, "failed": 0 }}
Archive, run 2: GET archive.test-2017-11-10/_count
gives { "count": 70213, "_shards": { "total": 5, "successful": 5, "failed": 0 }}
-- Only now the count is correct!
Does anyone know what the issue could be?
Many thanks!
Oliver
ENV
> ./logstash/bin/logstash -V
logstash 6.0.0
> ./elasticsearch/bin/elasticsearch -V
Version: 2.3.3, Build: 218bdf1/2016-05-17T15:40:04Z, JVM: 1.8.0_45
(Note: LS 6.0.0 might not be ideal for ES 2.3.3 – I also tried LS 2.3.4 without success )
Output
No obvious errors
$> ./logstash/bin/logstash -f conf/reorg.conf -l logs --path.data data/
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:108: warning: already initialized constant DEFAULT_MAX_POOL_SIZE
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:110: warning: already initialized constant DEFAULT_req_TIMEOUT
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:111: warning: already initialized constant DEFAULT_SOCKET_TIMEOUT
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:112: warning: already initialized constant DEFAULT_CONNECT_TIMEOUT
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:113: warning: already initialized constant DEFAULT_MAX_REDIRECTS
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:114: warning: already initialized constant DEFAULT_EXPECT_CONTINUE
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:115: warning: already initialized constant DEFAULT_STALE_CHECK
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:590: warning: already initialized constant ISO_8859_1
/opt/elasticsearch/logstash/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:641: warning: already initialized constant KEY_EXTRACTION_REGEXP
Sending Logstash's logs to logs which is now configured via log4j2.properties
[2017-12-04T15:20:54,347][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/opt/elasticsearch/logstash/logstash/modules/fb_apache/configuration"}
[2017-12-04T15:20:54,350][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/opt/elasticsearch/logstash/logstash/modules/netflow/configuration"}
[2017-12-04T15:20:54,534][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2017-12-04T15:20:54,777][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-12-04T15:20:57,926][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch index=>"archive.%{[@metadata][_index]}", document_type=>"%{[@metadata][_type]}", document_id=>"%{[@metadata][_id]}", manage_template=>"false", id=>"747fc5d5bfae9c6f5887a430a33a737c685501d2f579f6b4c49f547a43dbba30">}
[2017-12-04T15:20:58,387][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2017-12-04T15:20:58,392][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2017-12-04T15:20:58,544][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
[2017-12-04T15:20:58,605][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1"]}
[2017-12-04T15:20:58,633][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x68464447@/opt/elasticsearch/logstash/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-04T15:20:58,713][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-04T15:20:58,733][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2017-12-04T15:21:36,991][INFO ][logstash.pipeline ] Pipeline terminated {"pipeline.id"=>"main"}
LS Config (for v6.0):
input {
elasticsearch {
index => "test-2017-11-11"
docinfo => true
}
}
filter {
date {
match => ["front_req", ISO8601 ]
}
if [front_req] and [front_res] and [back_req] and [back_res] {
date {
match => ["front_req", ISO8601 ]
target => "front_req"
}
date {
match => ["front_res", ISO8601 ]
target => "front_res"
}
date {
match => ["back_req", ISO8601 ]
target => "back_req"
}
date {
match => ["back_res", ISO8601 ]
target => "back_res"
}
ruby {
code => "
back = (event.get('back_res') - event.get('back_req')) * 1000
total = (event.get('front_res') - event.get('front_req')) * 1000
front = total - back
event.set 'back_time_ms', back
event.set 'front_time_ms', front
"
}
mutate {
remove_field => [
"back_reply_body", "consumer_request_body",
"back_reply_header", "consumer_request_header",
"error",
"back_req", "back_res", "front_req", "front_res"
]
}
}
}
output {
elasticsearch {
index => "archive.%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
manage_template => false
}
}