Logstash Json Split Data is NOT loading correctly and gives Http Status 409 at logstash log file

Hi,
We have a table that contains JSON data as text/string. Logstash has below configuration that uses JDBC, JSON, SPLIT, HTTP and ELASTICSEARCH plugins.

Logstash error contains Http Status 409 and Data is NOT loading properly.

On the first iteration, data is correct. Later as the scheduler runs, data is getting incorrect (not even matching the table record count).

Could you please help me.

Logstash Configuration File

    input {
      beats {
        port => 5044
      }

      jdbc {
        jdbc_driver_library => "/etc/logstash/mysql/mysql-connector-java-5.1.47.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/db?useSSL=false" 
        jdbc_user => dbuser
        jdbc_password => "dbpassword"
        statement => "SELECT * FROM db_query_results dbmodified_on >= :sql_last_value"
        add_field => { "index_type" => "db-alert" }
        schedule => "* * * * *"
        clean_run => true
      }

    }

    filter {
        if [index_type] == "db-alert" {
          if [fetcted_row_count] == "0" or [fetched_row_count] == 0 {
             mutate {
                update => [ "json_data", "[{}]" ]
             }
          }

          json {
              source => "json_data"
              target => "query"
          }

          split {
              field => "query"
              add_field => { "split_id" => "%{id}"  }
          }
        }

        mutate {
            rename => ["type", "alert_type" ]
        }
    }

    output {
        if [index_type] == "db-alert" {
            http {
                url => "http://elk-prod.com:9200/%{index_type}-index/_delete_by_query"
                http_method => "post"
                content_type => "application/json"
                headers => { "Authorization" => "Basic ZWxhc3RpYzpjaGFuZ2VtZQ==" }
                format => "message"
                message => '{ "query": { "match": { "id": "%{id}" } }}'
            }
            
            elasticsearch {
                hosts => ["http://elk-prod.com:9200"]
                user => "elastic"
                index => "%{index_type}-index"
                password => "changeme"
                document_type => "%{index_type}"
            }
        } else {
            elasticsearch {
                hosts => ["http://elk-prod.com:9200"]
                user => "elastic"
                index => "%{index_type}-index"
                password => "changeme"
                document_id => "%{id}"
                document_type => "%{index_type}"
            }
        }

      stdout {
        codec => rubydebug
      }

    }

Error at Logstash Logs

[2020-03-30T10:36:58,414][ERROR][logstash.outputs.http    ][main] [HTTP Output Failure] Encountered non-2xx HTTP code 409 {:response_code=>409, :url=>"http://gms-elk-prod.ppvt.manh.oraclevcn.com:9200/gms-db-alert-index/_delete_by_query", :event=>#<LogStash::Event:0xe8fae94>}
[2020-03-30T10:36:58,423][ERROR][logstash.outputs.http    ][main] [HTTP Output Failure] Encountered non-2xx HTTP code 409 {:response_code=>409, :url=>"http://gms-elk-prod.ppvt.manh.oraclevcn.com:9200/gms-db-alert-index/_delete_by_query", :event=>#<LogStash::Event:0x5eb371ce>}

Do the elasticsearch logs give any indication of why it returned a 409?

Elasticsearch Logs while loading the data

[2020-03-30T10:36:52,010][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,073][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,127][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,196][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,265][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,324][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,613][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,672][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:52,728][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:56,115][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:56,174][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:56,280][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:56,361][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:56,439][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:36:56,560][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:37:00,244][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]
[2020-03-30T10:37:00,341][INFO ][o.e.c.m.MetaDataMappingService] [elk-prod..com] [db-alert-index/YQVYzRqzQhODEjCmOZkNCg] update_mapping [db-alert]

But I do see some other exceptions after some time (not at the same of time 409 error)

[2020-03-30T11:13:33,623][DEBUG][o.e.a.s.TransportSearchAction] [elk-prod.com] All shards failed for phase: [query]
[2020-03-30T11:13:33,623][WARN ][r.suppressed             ] [elk-prod.com] path: /db-alert-index/_delete_by_query, params: {index=db-alert-index}
org.elasticsearch.action.search.SearchPhaseExecutionException: all shards failed
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:545) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:306) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:574) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:386) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.access$200(AbstractSearchAsyncAction.java:66) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:242) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:73) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:59) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:423) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1118) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.InboundHandler.lambda$handleException$2(InboundHandler.java:244) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.common.util.concurrent.EsExecutors$DirectExecutorService.execute(EsExecutors.java:225) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.InboundHandler.handleException(InboundHandler.java:242) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.InboundHandler.handlerResponseError(InboundHandler.java:234) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:137) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:103) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:667) [elasticsearch-7.6.0.jar:7.6.0]
	at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:62) [transport-netty4-client-7.6.0.jar:7.6.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241) [netty-handler-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1478) [netty-handler-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1227) [netty-handler-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1274) [netty-handler-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:503) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:442) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:600) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:554) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final]
	at java.lang.Thread.run(Thread.java:830) [?:?]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.