Filebeat stops sending data to Logstash and hence Elasticsearch after a few hundred thousand documents, works again on restarting Filebeat container

I am a new to Elastic stack and I am using Filebeat AWS module's vpcflow to fetch data from S3 through SQS. It works fine for a few hundred thousand documents, but after that I don't see any new document in Elasticsearch index. On restarting the Filbeat container, data starts moving again and stops again after 200000 - 300000 documents.

My flow is: S3 --> SQS --> Filebeat --> Logstash --> Elasticsearch Index

Things look fine to me, Filebeat and Logstash docker logs has no errors or warnings and look normal.

Filebeat docker logs look like:

|2021-07-29T12:34:34.451Z|INFO|[monitoring]|log/log.go:144|Non-zero metrics in the last 30s|{monitoring: {metrics: {beat:{cgroup:{cpuacct:{total:{ns:18501726}}},cpu:{system:{ticks:8920},total:{ticks:73170,time:{ms:21},value:73170},user:{ticks:64250,time:{ms:21}}},handles:{limit:{hard:1048576,soft:1048576},open:10},info:{ephemeral_id:20bbba13-a550-4736-9100-5d5e9bc8d253,uptime:{ms:18210111}},memstats:{gc_next:22657664,memory_alloc:11578696,memory_total:8351052936,rss:79257600},runtime:{goroutines:24}},filebeat:{harvester:{open_files:0,running:0}},libbeat:{config:{module:{running:1},scans:3},output:{events:{active:0}},pipeline:{clients:6,events:{active:2069}}},registrar:{states:{current:0}},system:{load:{1:0.08,15:0.02,5:0.06,norm:{1:0.04,15:0.01,5:0.03}}}}}}|
|---|---|---|---|---|---|
|2021-07-29T12:35:04.450Z|INFO|[monitoring]|log/log.go:144|Non-zero metrics in the last 30s|{monitoring: {metrics: {beat:{cgroup:{cpuacct:{total:{ns:9612964}}},cpu:{system:{ticks:8930,time:{ms:5}},total:{ticks:73180,time:{ms:8},value:73180},user:{ticks:64250,time:{ms:3}}},handles:{limit:{hard:1048576,soft:1048576},open:10},info:{ephemeral_id:20bbba13-a550-4736-9100-5d5e9bc8d253,uptime:{ms:18240110}},memstats:{gc_next:22657664,memory_alloc:12050688,memory_total:8351524928,rss:79257600},runtime:{goroutines:24}},filebeat:{harvester:{open_files:0,running:0}},libbeat:{config:{module:{running:1},scans:3},output:{events:{active:0}},pipeline:{clients:6,events:{active:2069}}},registrar:{states:{current:0}},system:{load:{1:0.05,15:0.01,5:0.05,norm:{1:0.025,15:0.005,5:0.025}}}}}}|

Logstash docker logs look like:

[2021-07-29T07:29:45,822][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.12.1", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 11.0.10+9 on 11.0.10+9 +indy +jit [linux-x86_64]"}
[2021-07-29T07:29:47,985][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-07-29T07:29:50,446][INFO ][org.reflections.Reflections] Reflections took 52 ms to scan 1 urls, producing 23 keys and 47 values 
[2021-07-29T07:29:50,913][WARN ][deprecation.logstash.inputs.beats] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2021-07-29T07:29:52,470][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>1000, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["/usr/share/logstash/pipeline/logstash.conf"], :thread=>"#<Thread:0x1b9b01f9 run>"}
[2021-07-29T07:29:54,337][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>1.86}
[2021-07-29T07:29:54,399][INFO ][logstash.inputs.beats    ][main] Starting input listener {:address=>"0.0.0.0:7044"}
[2021-07-29T07:29:54,434][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-07-29T07:29:54,552][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-07-29T07:29:54,662][INFO ][org.logstash.beats.Server][main][1dfc980bac0254f76e50245873af080551f4b8ff5bc18e9eef4f724a86e99061] Starting server on port: 7044

Can anyone help me to understand the issue here or if I am missing anything in my pipeline? Do I need to configure some configurations in logstash.yml (workers, batch_size, etc) or anything on Filebeat side?

Please redirect me to relevant post if this scenario already answered and I may have missed during my search before this post.

Note: I am using Elastic stack version 7.12.1 for all Filebeat, Logstash and Elasticsearch and docker containers for all of them.

Would you mind sharing your config file?

Sure @mtojek, here are my config files. All 'tochange_*' are variables which we update and data starts flowing but unfortunately stops after some minutes / hours when data flows in say in millions.

filebeat.yml

filebeat.config:
  modules:
    path: /usr/share/filebeat/modules.d/*.yml
    reload.enabled: true


processors:
- drop_event:
    when:
      equals:
        aws.vpcflow.log_status: NODATA
- drop_event:
    when:
      equals:
        aws.vpcflow.log_status: SKIPDATA
- convert:
    fields:
    - {from: aws.vpcflow.tcp_flags, to: aws.vpcflow.tcp_flags, type: integer}
    - {from: aws.vpcflow.start, to: aws.vpcflow.start, type: long}
    - {from: aws.vpcflow.end, to: aws.vpcflow.end, type: long}
    - {from: aws.vpcflow.version, to: aws.vpcflow.version, type: integer}
    - {from: aws.vpcflow.account_id, to: aws.vpcflow.account_id, type: long}
    - {from: aws.vpcflow.pkt_srcaddr, to: aws.vpcflow.pkt_srcaddr, type: ip}
    - {from: aws.vpcflow.pkt_dstaddr, to: aws.vpcflow.pkt_dstaddr, type: ip}
    - {from: source.address, to: source.address, type: ip}
    - {from: source.ip, to: source.ip, type: ip}
    - {from: destination.address, to: destination.address, type: ip}
    - {from: destination.ip, to: destination.ip, type: ip}
    - {from: network.iana_number, to: network.iana_number, type: integer}
    ignore_missing: true
    fail_on_error: false
- drop_event:
    when:
      equals:
        aws.vpcflow.action: REJECT
- drop_event:
    when:
      range:
        aws.vpcflow.tcp_flags.lt: 16
- copy_fields:
    fields:
    - from: network.iana_number
      to: source.iana_number
    - from: network.transport
      to: source.TransportProtocol
    fail_on_error: false
    ignore_missing: true
- drop_fields:
    fields: [fileset, log, aws.s3, source.address, destination.address, event, agent,
      tags, network, cloud, input, message]
    ignore_missing: false
- rename:
    fields:
    - from: aws.vpcflow
      to: aws_raw
    - from: source
      to: aws_destination
    - from: destination
      to: aws_source
    - from: aws_source.ip
      to: aws_source.SourceIp
    - from: aws_source.port
      to: aws_source.SourcePort
    - from: aws_destination.ip
      to: aws_destination.DestinationIp
    - from: aws_destination.port
      to: aws_destination.DestinationPort
    ignore_missing: false
    fail_on_error: true
- drop_fields:
    fields: [aws]
    ignore_missing: false

output.logstash:
  hosts: 127.0.0.1:7044
#output.console:
#  pretty: true

aws.yml from modules.d

- module: aws
  vpcflow:
    enabled: true
    var.queue_url: tochange_sqsurl
    #var.shared_credential_file: /etc/filebeat/aws_credentials
    #var.credential_profile_name: fb-aws
    var.access_key_id: tochange_accesskeyid
    var.secret_access_key: tochange_secretaccesskey
    #var.session_token: session_token
    #var.visibility_timeout: 300s
    #var.api_timeout: 120s
    #var.endpoint: amazonaws.com
    #var.role_arn: arn:aws:iam::123456789012:role/test-mb

logstash.conf


input {
  beats {
    port => 7044
  }
}

filter {
  if ![aws_source] { drop { } }
  if ![aws_destination] { drop { } }
  if ![aws_raw] { drop { } }
  mutate { remove_field => [ "tags", "service" ] }
  uuid { target => "[aws_common][flow_id]" }
                
  mutate {  copy => { "[aws_destination][DestinationIp]" => "[aws_destination][HOSTNAME]" } }
  dns {
  nameserver =>{
    address => ["tochange_dns"]
    }
    reverse => [ "[aws_destination][HOSTNAME]"]
    action => "replace"
    hit_cache_size => 1000
    hit_cache_ttl => 300
  }
   mutate {
     add_field => { "[aws_common][cid]" => "tochange_cid" }
     add_field => { "[aws_common][dmn]" => "tochange_dmn" }
     add_field => { "[aws_common][sid]" => "tochange_sid" }
  }

  translate {
    field => "[aws_destination][DestinationPort]"
    destination => "[aws_destination][Protocol]"
    dictionary_path =>  "/usr/share/data/lookup_files/port_to_protocol.json"
  }
  translate {
    field => "[aws_destination][DestinationPort]"
    destination => "[aws_destination][ProtocolType]"
    dictionary_path => "/usr/share/data/lookup_files/port_to_protocol_type.json"
  }
           
}

output {
  # stdout {
  #   codec => rubydebug  
  # }
 http {
   url => "tochange_svrlogipport" 
   http_method => "post"
   headers => {
     "Authorization" => "Bearer tochange_jwt_token"
   }
 }  
}

logstash.yml

xpack.monitoring.enabled: false

Did you try to record network traffic between all parties to make sure it's problem with Beats not with Logstash?

You can also enable debug logging in Beats and check if there are any errors.

I have doubt that there can be issues with Logstash being not able to process input documents at that speed and not Beats and that's why shared details for both, but was not sure how to figure out as logs don't seem to have anything suspicious.

  1. Record network traffic between all parties - Not recorded yet. How to proceed with such recording, through Packetbeat on top Filebeat and Logstash?
  2. Enable debug logging in Beats - you mean add logging.level: debug in filebeat.yml?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.