How to use Logstash filter condition to copy docs

Hi All,

We are trying to copy docs from one index to another index using logstash. Our index (es_item) doc count is nearly 194,556,883 with primary size of 567.7GB, the docs which are indexed to the es_item has different item types such as standard_item & non_standard_item.

  1. Our need is to copy only standard_item docs to the new index (es_item1). Is it possible to add ITEM_TYPE in filter condition in order to copy only std_items to the new index.
  2. We have a field "ITEM_TYPE": STANDARD_ITEM in the es_item index.

Please let us know your suggestions.

Below is the logstash configuration setup file,
input {

We read from the "old" index

elasticsearch {
hosts => ["10.8.41.121:9200"]
user => "esadmin"
password => "XXXX"
index => "es_item"
size => 500
scroll => "5m"
docinfo => true
}
}

filter {
mutate {
remove_field => [ "@timestamp", "@version" ]
}
}

output {

We write to the "new" index

elasticsearch {
host => "10.8.41.121:9200"
protocol => "http"
user => "esadmin"
password => "XXXX"
index => "es_item1"
index_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}

We print dots to see it in action

stdout {
codec => "dots"
}
}

Thanks,
Ganeshbabu R

Hi All,

Please provide some feedback regarding matching item_type condition in logstash filter by using grok..

filter {
grok {
patterns_dir => "./patterns"
match => { "ITEM_TYPE" => "%{SGI}" }
}
}

I tried by using grok inside the logstash conf filter to match ITEM_TYPE = SGI but I am getting the following response while I start logstash run,

[esadmin@tparhebfmi005 logstash-1.5.5]$ bin/logstash -f logstash.conf
The error reported is: pattern %{SGI} not defined

Please let us know any suggestions.

Thanks,
Ganeshbabu R

Our need is to copy only standard_item docs to the new index (es_item1). Is it possible to add ITEM_TYPE in filter condition in order to copy only std_items to the new index.

To only send events whose item_type field matches "std_item" to the elasticsearch output:

output {
  if [item_type] == "std_item" {
    elasticsearch {
      ..
    }
  }
}

Hi @magnusbaeck

I tried by giving the condition in elasticsearch output as like below,

output {
if [ITEM_TYPE] == "SGI" {
elasticsearch {
host => "10.8.44.124:9201"
protocol => "http"
user => "esadmin"
password => "XXXX"
index => "es_item2"
index_type => "%{[@metadata][_type]}"
document_id => "%{[ITEM_ID]}"
}

We print dots to see it in action

stdout {
codec => "dots"
}
}}

I can able to see logstash were running but copying docs to the new index were not happening.

Please let us know your suggestions

Thanks,
Ganeshbabu R

First, make sure that Logstash has events to process. Disable the ES outputs and verify that events flow nicely. If they do, look in the logs of both Logstash and ES.

Hi @magnusbaeck

I am very new to the logstash can you tell us how to find the event process in logstash? Incase If I disable ES output in logstash conf file how & where the copy docs will be indexed.

Please let us know your suggestions.

Thanks,
Ganeshbabu R

If you don't see any events in ES this can basically be caused by three things:

  • Logstash isn't processing any events.
  • Logstash isn't able to send the documents.
  • You're looking in the wrong place. For example, the events could have a @timestamp value that doesn't match your expectations so the time filter in Kibana is wrong.

Before we jump to conclusions we need to understand what the problem is. We do that by isolating things. So, disable the elasticsearch output so that the stdout output is the only output and run Logstash. Are you getting dots printed to stdout?

Thanks for your response @magnusbaeck

As you said disable elasticsearch output so that the stdout is the only output in logstash conf file, Below is the response of logstash run after disabling elasticsearch output

input {

We read from the "old" index

elasticsearch {
hosts => ["10.8.44.124:9201"]
user => "esadmin"
password => "dev01"
index => "es_item"
size => 500
scroll => "5m"
docinfo => true
}
}

filter {
}
output {

We print dots to see it in action

stdout {
codec => "dots"
}
}

Yes, I am getting dots printed in stdout & below is the output
[esadmin@tparhebfmi003 logstash-1.5.5]$ bin/logstash -f logstash.conf
Logstash startup completed
.................................................................................................................................................................................................................................................................................................................................................

Regards,
Ganeshbabu R

Okay, so that seems to work then. So what's in the Logstash and ES logs after you reenable the ES output?

After I reenable the ES output in logstash conf file
input {

We read from the "old" index

elasticsearch {
hosts => ["10.8.44.122:9200"]
user => "esadmin"
password => "dev01"
index => "es_item2"
size => 500
scroll => "5m"
docinfo => true
}
}
filter {

}
output {
if [ITEM_TYPE] == "SGI" {

We write to the "new" index

elasticsearch {
host => "10.8.44.122:9200"
protocol => "http"
user => "esadmin"
password => "dev01"
index => "es_item1"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}

We print dots to see it in action

stdout {
codec => "dots"
}
}
}

below is the response of logstash run and it stucked in somewhere..
[esadmin@tparhebfmi003 logstash-1.5.5]$ bin/logstash -f logstash.conf
Logstash startup completed
.............................

Below is the ES client logs,

[2016-02-26 03:59:18,631][DEBUG][action.search.type ] [tparhebfmi003_PERF_CLIENT] All shards failed for phase: [query]
org.elasticsearch.transport.RemoteTransportException: [tparhebfmi005_PERF_DATA][inet[/10.8.44.124:9260]][indices:data/read/search[phase/query]]
Caused by: org.elasticsearch.search.SearchParseException: [ogrds_item2][11]: from[-1],size[-1],sort[,<custom:"CRT_DTTM": org.elasticsearch.index.fielddata.fieldcomparator.LongValuesComparatorSource@7cfd8342>!]: Parse Failure [Failed to parse source [{"sort":[{"_score":{"order":"desc"}},{"CRT_DTTM":{"order":"desc"}}],"query":{"filtered":{"query":{"bool":{"should":[]}},"filter":{"bool":{"must":[{"bool":{"should":[{"nested":{"query":{"match":{"EXTRN_CODE_CONTAINS":"4023900543008"}},"path":"XCD"}}]}},{"bool":{"should":[{"nested":{"query":{"filtered":{"filter":{"terms":{"XCD.PROC_GRP_ID":["REWE - PETZ REPEZ (DE)"]}}}},"path":"XCD"}}]}},{"bool":{"should":[{"query":{"filtered":{"filter":{"terms":{"ITEM_SPECIFICITY_REF_ID":[184]}}}}}]}}]}}}},"from":0,"fields":["MOD_DSCR","ITEM_CODE","ITEM_DSCR","IS_SHARED_IND","HAS_IMAGE_IND","DIST.RGN_NM","HAS_HIST_IND","ITEM_TYPE","CRT_DTTM","ITEM_MISUSED_GTIN_FLG"],"size":2000}]]
at org.elasticsearch.search.SearchService.parseSource(SearchService.java:747)
at org.elasticsearch.search.SearchService.createContext(SearchService.java:572)
at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:544)
at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:306)
at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:776)
at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:767)
at org.elasticsearch.shield.transport.ShieldServerTransportService$ProfileSecuredRequestHandler.messageReceived(ShieldServerTransportService.java:176)
at org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler.doRun(MessageChannelHandler.java:279)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string: "REWE - PETZ REPEZ (DE)"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at org.elasticsearch.index.mapper.core.NumberFieldMapper.parseLongValue(NumberFieldMapper.java:354)
at org.elasticsearch.index.mapper.core.LongFieldMapper.indexedValueForSearch(LongFieldMapper.java:171)
at org.elasticsearch.index.mapper.core.AbstractFieldMapper.termFilter(AbstractFieldMapper.java:469)

Regards,
Ganeshbabu R