Logstash JMS plugin extremly slow

Hello,
I'm using Logstash JMS plugin to connect to an EMS server and fetch data from a queue.
I output then to my elastic cluster (2 nodes).
The issue is that even i have a lot of consumers (300 thread/logstach server and 3 servers), the queue consumption is very slow.
I tried to change the output, and put file output instead of elasticsearch, and the consumption is very fast, i managed to consume all data using only one logstash node.
Any ideas on how i can troubleshoot this issue ?

This suggests that Elasticsearch is your bottleneck and not Logstash.

What are the specs of your Elasticsearch nodes? Like, CPU, Memory, Disk type (SSD or HDD?)

Do you have a mapping or are using dynamic mapping? This documentation has some tips on how to tune elasticsearch for indexing speed.

Also, did you change the values of pipeline.batch.size for your logstash pipeline? This can also impact on the performance, but in this case it seems that the main bottleneck is Elasticsearch.

Hello @leandrojmp and thanks for the quick answer.
for my elastic nodes, i have 16Gb memory, 8 cores and a mounted share HDD.
And here my conf file

input {
    jms {
        include_headers => false
        include_properties => false
        include_body => true
        use_jms_timestamp => false
        interval => 5
        destination => "bwpm.event"
        threads => "300"
        yaml_file => "/monitoring/logstash/logstash-8.12.2/config/ems/jms.yml"
        yaml_section => "jms"
    }
}

filter {
    xml {
        source => "message"
        remove_namespaces => true
        store_xml => false
        target => "pasred_xml"
        xpath => [
            "/LogMessage/LogID/text()", "LogID",
            "/LogMessage/CorrelationLogID/text()", "CorrelationLogID",
                        "/LogMessage/JobID/text()", "JobID",
                        "/LogMessage/DomainName/text()", "DomainName",
                        "/LogMessage/MachineName/text()", "MachineName",
                        "/LogMessage/EngineName/text()", "EngineName",
                        "/LogMessage/Deployment/text()", "Deployment",
                        "/LogMessage/ProcessName/text()", "ProcessName",
                        "/LogMessage/JobStart/text()", "JobStart",
                        "/LogMessage/JobEnd/text()", "JobEnd",
                        "/LogMessage/Status/text()", "Status",
                        "/LogMessage/Events/Event[1]/Attributes/Attribute[not (contains(Name,'ransaction'))]/Value/text()", "MainInput",
                       "/LogMessage/Events/Event[1]/Attributes/Attribute[contains(Name,'ransaction')]/Value/text()", "TransactionID",
                        "/LogMessage/Events/Event/Attributes/Attribute/Value/text()", "Attributes",
                        "/LogMessage/Events/Event[1]/Payload/text()", "Request_Payload",
                        "/LogMessage/Events/Event[2]/Payload/text()", "Response_Payload",
                        "/LogMessage/Events/Event[1]/Attributes/Attribute[contains(Name,'USER')]/Value/text()","userName",
                        "/LogMessage/Events/Event[1]/Attributes/Attribute[contains(Name,'Adresse')]/Value/text()","IPaddr",
                       "/LogMessage/Events/Event/EventMsg/text()", "EventMsg",
                       "/LogMessage/Events/Event/EventMsgCode/text()", "EventMsgCode",
                       "/LogMessage/Events/Event[ActivityName = 'LogError']/EventMsgCode/text()", "FailureMsgCode",
                       "/LogMessage/Events/Event/Stacktrace/text()", "StackTrace",
                        "/LogMessage/Track/Transition/text()", "Transitions",
                        "/LogMessage/Track/*", "CompleteTrack",
                        "sum (/LogMessage/Track/Transition/@ElapsedTime)", "ElapsedTime(ms)"
        ]
    }

    mutate {
            remove_field => ["message"]
            remove_field => ["pasred_xml"]
            gsub => [
                "Request_Payload", "&lt;", "<",
                "Request_Payload", "&gt;", ">",
                "Response_Payload", "&lt;", "<",
                "Response_Payload", "&gt;", ">",
                "StackTrace", "&lt;", "<",
                "StackTrace", "&gt;", ">",
                "Status", "1", "OK",
                "Status", "2", "KO",
                "Status", "3", "KO"

                    ]
           convert => {
               "ElapsedTime(ms)" => "integer"
                   }


        }
}

output {
elasticsearch{
                hosts => ["https://xxxx:9200","https://xxxx:9200"]
                index => "pm-elk-%{+YYYY-MM-dd_hh:mm}"
                user => "elastic"
                password => "HbbrE-UaR3oQFcht9LjM"
                ilm_rollover_alias => "pm-elk"
                ssl => true
                cacert => "/monitoring/logstash/logstash-8.12.2/config/certs/http_ca.crt"
                validate_after_inactivity => 0
        }

}

Also for some reason, the index part is not being taken into consideration, the default index is being created.
Note that i have a very large amount of data to handle (written in a file, i had 6GB of data/minute in busy hours)

Refreshing this topic

Hello, please avoid bumping posts when less then 24 hours have passed

This may be your issue, HDD is bad for performance, not sure you will be able to improve this without changing to SSD.

You didn't answer that, I'm assuming you didn't change this setting.

Try increasing pipeline.batch.size in your logstash.yml or pipelines.yml, depends on where you are setting your pipeline.

The default value is 125, increase it and see if anything changes, like teste with 250, then 500.

You may need to restart logstash for the changes to work.

Also, did you check the documetation about tuning elasticsearch for indexing speed? There are some hints there of things that you may try to see if this improve.

Hello,
Thanks a lot for the recommandations, and sorry for the spam.
I am trying to follow the recommandations to optimize.
I’ll also check the indexing part, maybee there’s an issue there also.

Few more tips:

  • Check LS API statistics: http://localhost:9600/_node/stats/pipelines?pretty
  • How many GB RAM do you have on LS? The XML plugin use more then usual plugins like Grok.
  • Have you check LS logs, is there any trace?
  • If you are using the persistent queue, switch to the memory queue
  • Have tried to enable compression_level?
  • Have you tried to send data only to 1 node? Test the connection per every single node. Data shouldn't be sent to master nodes
  • Check can pool_max and pool_max_per_route help to speedup the network connection. Also check again is good to set validate_after_inactivity =0, maybe is better 100 ms

On ES side:

  • You have the ES cluster with two nodes, maybe you have the internal ES issue like split brain
  • Have you check ES logs? Anything there?
  • Do you have any issue with the cluster or shards? Check the trouble. documentation.
  • Have you tried to send data to a single node without ILM, no replica?

Hello
Thanks all for the great helping tips, i managed to figure this out, the issue actually was the disk on elastic, i was using a NAS and having very bad network quality, so data trasnfert was taking too long, i switched to local disks and now everything is running smoothly.

1 Like