Logstash reporting error sending to Elasticsearch

I can post my config.

I haven't spent any time, working on the filtering aspect. Attached is a Kibana GUI view.

##########
# LOGHTTP Input Parameters
##########
input {
  http_poller {
    urls => {
      DATANODE01_cluster => {
        id => "DATANODE01_cluster_health"
        method => get<img src="//cdck-file-uploads-global.s3.dualstack.us-west-2.amazonaws.com/elastic/original/2X/9/9b4c85a6750ff81b73939f1d238ee006e98b893d.png" width="690" height="238">
        url => "http://DATANODE01:9200/_cluster/health"
        headers => {
          Accept => "application/json"
        }
      }
      DATANODE01_nodes => {
        id => "DATANODE01_nodes"
        method => get
        url => "http://DATANODE01:9200/_cat/nodes?v&s=name&h=name,ram.current,ram.percent,heap.percent,file_desc.current,request_cache.miss_count,merges.current_docs,merges.current_size,segments.memory"
        headers => {
          Accept => "application/json"
        }
      }
      DATANODE01_allocation => {
        id => "DATANODE01_allocation"
        method => get
        url => "http://DATANODE01:9200/_cat/allocation?v&s=node&h=node,shards,disk.total,disk.used,disk.percent"
        headers => {
          Accept => "application/json"
        }
      }
      DATANODE01_thread_pool => {
        id => "DATANODE01_thread_pool"
        method => get
        url => "http://DATANODE01:9200/_cat/thread_pool?v&s=rejected&h=node_name,name,size,active,queue,rejected"
        headers => {
          Accept => "application/json"
        }
      }
    }
    request_timeout => 60
    # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
    schedule => { cron => "* * * * * UTC"}
    codec => "json"
    # A hash of request metadata info (timing, response headers, etc.) will be sent here
    # metadata_target => "http_poller_metadata"
    add_field => [ "pipeline", "LOGHTTP" ]
    add_field => [ "pipeline_protocol", "tcp" ]
    add_field => [ "pipeline_port", "9200" ]
  }
}
##################################################

I mean, at this point, this has gone past comical.

I have upgraded every data node from 16GB to 40GB. The jvm.options are set to 30GB.

I have set the refresh_interval to 10seconds.

I have increased the indexing buffer from 10% to 50%.

I have added 2 more data nodes (started with 4).

I have turned off all replicas and configured 6 shards (one for each data node).

I have turned on rebalance as well as enabled cluster routing allocation.

I have also updated all servers to 5.2.

I literally do not know what else to do but seriously need help!

I just caught this in a log, from one of my data nodes. I formatted it a little so it was readable:

[2017-02-03T18:18:23,469][DEBUG][o.e.a.b.TransportShardBulkAction] [DATANODE-01] [logstash-2017.02.03][2] failed to execute bulk item

 (index) index {[logstash-2017.02.03][logs][AVoGRYPtnBzDCCuyf359]
 source[{"pipeline_protocol":"udp"
  "appliance":"########"
  "syslog_severity_code":6
  "syslog_body":"<Root><EventID>90050</EventID><Priority>4</Priority><Message>Session - End</Message><Category>AUDIT</Category><Realm>SecureAuth2</Realm><Appliance>########</Appliance><Company>########</Company><Version>9.0.1.74</Version><HostName>x.x.x.x</HostName></Root>\n"
  "city":"########"
  "syslog_facility":"LOCAL2"
  "syslog_facility_code":18
  "device_type":"Secure Auth Server"
  "message":"<13>Feb  3 18:18:23.457708 x.x.x.x <150>Feb 03 18:18:23 <Root><EventID>90050</EventID><Priority>4</Priority><Message>Session - End</Message><Category>AUDIT</Category><Realm>########</Realm><Appliance>########</Appliance><Company>########</Company><Version>9.0.1.74</Version><HostName>x.x.x.x</HostName></Root>"
  "event_category":"AUDIT"
  "priority":"4"
  "version":"9.0.1.74"
  "syslog_severity":"INFORMATIONAL"
  "pipeline":"LOGSECURITY"
  "device_ip":"x.x.x.x"
  "pipeline_ingress":"########"
  "@timestamp":"2017-02-03T23:18:23.458Z"
  "event_id":"90050"
  "message_info":"Session - End"
  "@version":"1"
  "host":"x.x.x.x"
  "pipeline_port":"35514"
  "realm":"########"
  "syslog_pri":"150"}]}

org.elasticsearch.index.mapper.MapperParsingException: failed to parse [version]
        at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:298) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:438) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.parseDynamicValue(DocumentParser.java:789) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.parseValue(DocumentParser.java:571) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.innerParseObject(DocumentParser.java:384) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrNested(DocumentParser.java:361) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.internalParseDocument(DocumentParser.java:93) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:66) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:275) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.shard.IndexShard.prepareIndex(IndexShard.java:533) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.index.shard.IndexShard.prepareIndexOnPrimary(IndexShard.java:510) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:196) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:201) ~[elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:348) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.index(TransportShardBulkAction.java:155) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.handleItem(TransportShardBulkAction.java:134) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.onPrimaryShard(TransportShardBulkAction.java:120) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.onPrimaryShard(TransportShardBulkAction.java:73) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.support.replication.TransportWriteAction.shardOperationOnPrimary(TransportWriteAction.java:76) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.support.replication.TransportWriteAction.shardOperationOnPrimary(TransportWriteAction.java:49) [elasticsearch-5.2.0.jar:5.2.0]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:914) [elasticsearch-5.2.0.jar:5.2.0]

Ok regarding this error, I don't have much to say for the moment. But regarding the bulk issue, would you might adding this configuration. Please adjust as need arises

thread_pool.bulk.queue_size: 1000

Please keep me updated.

How do I configure this?

This URL doesn't say: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

Still do not know where to configure this. Anyone able to help me out?

In the elasticsearch.yml.

For every node? Every data node?

Applied only to the the data nodes.

Restarted all 6 nodes.

Will update in a little while.

FYI, prior to the change, I had over 84,000 es_rejected messages in the last 60 minutes.

@jstar

I am pleased to report ZERO rejected messages.

Now, my question is, why was me changing that setting such a bad idea? I seemed to see people get quite a bit flack regarding that setting. With ALL of the changes I made, upgrades, additional servers, etc....why was that the fix? I had 6 data nodes receiving logs from 30 Logstash shippers. Where was the breakdown?

@kopacko.
That was because of the size of the bulk queue. If you look at the screenshot that you posted, you see that the queue capacity is 50. But once to augment it, the queue capacity because greater and report some rejection any more.
Imagine that you are sending 100 each second data to a queue that is supposed to process 50 per second. Eventually 50 data will be lost because there couldn't enter the queue at time. Despite the fact that you had many servers with much memory, all their queue size were still the same that is 50. As such there were alot of rejected.

@jstar
Yea that all makes sense, even to me...the non-DB guy.

What I don't get is, why is changing default config from 50 is so "hidden"?

From my perspective, I am not doing anything amazingly complex. I have about 500 network devices, about 1000 servers and 30'ish locations. By no means, my environment overly large. I don't even think it qualifies as medium sized.

I say all that to say this. I have posted tons of info, tried many, many changes (all of which I agree were probably needed), and in the end, the fix was a single configuration change.

This was a huge learning opportunity and I enjoy teachable moments. I just wish, that, for the idiot level people like me, this type change was more documented or maybe more obvious as to what exactly and where exactly to make that change.

I imagine, even in smaller environments than mine, even only a few servers with winlogbeat on them would exceed that default 50 configuration. Maybe not. Maybe they are and have found the right posts to read or even followed this thread. Not sure.

In the end, I am incredibly thankful for all the help received. I am and have been struggling for months to get this from "proof of concept" to "production" that all of our IT can take advantage of. But I have yet to feel that way. So, as of today, I checked and still no ES rejected logs. I feel better about that state of my design and deployment and hope to be able to move forward without any more major issues.

For all that have helped, I sincerely say thank you.

The reason we discourage it is Any idea what these errors mean version 2.4.2

Yea, I get that. You can push in faster than the it will accept.

My issue is, these are TCP connections from Logstash. Even with TCP aside, based on 50 for a queue, that would almost require 1 data node for every Logstash shipper. I am more than happy to try any other ideas you might have out. But, each Logstash shipper is configured with all 6 data nodes. That means round robin right?

Is there something else I could configured on the Logstash side to "slow" it down? Or maybe increase the number of logs per push to ES?

Also, I want to scale the config back from 1000, to some other number.

Is there a API query I use to look at the current size of the bulk queue?

Mark,
I bumped my data node servers up from 4 CPUs to 8 CPUs.

If I understand it, each CPU, for the bulk queue is 50. So with 8, that would be 400, correct?

And NOW........I am randomly seeing "Courier Fetch" errors.

I can't seem to reproduce them, it pops up with a orange'ish banner across the top and is gone before I can screen shot it.

While doing a query, I get this:

Shard Failures
The following shard failures ocurred:

Index: logstash-2017.02.08 Shard: 0 Reason: {"type":"transport_serialization_exception","reason":"Failed to deserialize response of type [org.elasticsearch.search.fetch.FetchSearchResult]","caused_by":{"type":"illegal_state_exception","reason":"unexpected byte [0x69]"}}

Quite literally all I did was, shutdown all the data nodes and increase the CPUs from 4 to 8.

Now what do I need to do?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.