geoIP enrichment stops ES from receiving data from packetbeat

Hey there,

I'm currently running Packetbeat on a linux box and it sends directly data to an Elasticsearch cluster.

Everything runs perfectly fine, except when I want to add geoIP info following this documentation. Indeed, as soon as I update the config file, I don't receive any data anymore.

Events continue to be emitted, but nothing appears in the dashboard [Packetbeat] Overview ECS anymore. Fail rates climbs to 3/s and open handles climb to 10/s.

The weird thing is that when I test the config with sudo packetbeat test config -c packetbeat.yml -e , it says that the config is ok. And when I launch it as a service, it runs fine, but I still don't receive any data.

As soon as I take away the pipeline in the config file and restart the service, everything runs fine again and I receive all the data.

Do you have any idea what's happening and how I could fix this ?

In case you need it, here is the config file:

packetbeat.interfaces.device: any

packetbeat.flows:
  timeout: 30s
  period: 10s

packetbeat.protocols:
- type: icmp
  enabled: true

- type: amqp
  ports: [5672]

- type: cassandra
  ports: [9042]

- type: dhcpv4
  ports: [67, 68]

- type: dns
  ports: [53]

- type: http
  ports: [80, 8080, 8000, 5000, 8002]

- type: memcache
  ports: [11211]

- type: mysql
  ports: [3306,3307]

- type: pgsql
  ports: [5432]

- type: redis
  ports: [6379]

- type: thrift
  ports: [9090]

- type: mongodb
  ports: [27017]

- type: nfs
  ports: [2049]

- type: tls
  ports:
    - 443   # HTTPS
    - 993   # IMAPS
    - 995   # POP3S
    - 5223  # XMPP over SSL
    - 8443
    - 8883  # Secure MQTT
    - 9243  # Elasticsearch

name: "box_name"
setup.template.pattern: "packetbeat*"
setup.template.overwrite: false
setup.template.settings:
  index.number_of_shards: 2
  index.number_of_replicas: 1
  index.routing.allocation.require.temp: "hot"
setup.ilm.enabled: true
setup.ilm.rollover_alias: "packetbeat"
setup.ilm.pattern: "000001"
setup.ilm.policy_name: "hotwarm_policy"

tags: ["my_tags"]

setup.kibana:
  host: "kibana_ip:5601"
  username: "${KIB_USER}"
  password: "${KIB_PWD}"

output.elasticsearch:
  hosts: ["elasticsearch_node1_ip:9200","elasticsearch_node2_ip:9200"]
  username: "${ES_USER}"
  password: "${ES_PWD}"
  pipeline: geoip-info    

processors:
  - add_host_metadata:
      netinfo.enabled: true
      Geo:
        name: box_name
        location: xxxxx
        continent_name: xxxxx
        country_iso_code: xxxxx
        region_name: xxxxx
        region_iso_code: xxxxx
        city_name: xxxxx
  - add_locale: ~
  - add_cloud_metadata: ~
  - add_fields:
      when.network.source.ip: private
      fields:
        source.geo.location:
          lat: xxxxx
          lon: xxxxx
        source.geo.continent_name: xxxxx
        source.geo.region_iso_code: xxxxx
        source.geo.country_iso_code: xxxxx
        source.geo.region_name: xxxxx
        source.geo.name: box_name
      target: ''
  - add_fields:
      when.network.destination.ip: private
      fields:
        source.geo.location:
          lat: xxxxx
          lon: xxxxx
        source.geo.continent_name: xxxxx
        source.geo.region_iso_code: xxxxx
        source.geo.country_iso_code: xxxxx
        source.geo.region_name: xxxxx
        source.geo.name: xxxxx
      target: ''

monitoring.enabled: true

and here is the pipeline in the elasticsearch cluster:

"geoip-info" : {
    "description" : "Add geoip info",
    "processors" : [
      {
        "geoip" : {
          "field" : "client.ip",
          "target_field" : "client.geo",
          "ignore_missing" : true
        }
      },
      {
        "geoip" : {
          "field" : "source.ip",
          "target_field" : "source.geo",
          "ignore_missing" : true
        }
      },
      {
        "geoip" : {
          "field" : "destination.ip",
          "target_field" : "destination.geo",
          "ignore_missing" : true
        }
      },
      {
        "geoip" : {
          "field" : "server.ip",
          "target_field" : "server.geo",
          "ignore_missing" : true
        }
      },
      {
        "geoip" : {
          "field" : "host.ip",
          "target_field" : "host.geo",
          "ignore_missing" : true
        }
      }
    ]
  }

Thanks in advance for your help !

Cheers,

jsu

Are there any errors in the Elasticsearch server log right after you start up Packetbeat (with the pipeline config setting in place) on either of the two Elasticsearch nodes?

Shaunak

Hi Shaunak, thanks for your time !

So, actually there is something yes:

[2020-01-16T09:37:11,869][DEBUG][o.e.a.b.T.BulkRequestModifier] [elasticsearch_node1] failed to execute pipeline [_none] for document [packetbeat/_doc/null]
org.elasticsearch.ingest.IngestProcessorException: java.lang.IllegalArgumentException: field [host.ip] of type [java.util.ArrayList] cannot be cast to [java.lang.String]
        at org.elasticsearch.ingest.CompoundProcessor.newCompoundProcessorException(CompoundProcessor.java:215) ~[elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:147) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.Processor.execute(Processor.java:53) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:137) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:156) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.Processor.execute(Processor.java:51) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:137) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:156) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.Processor.execute(Processor.java:51) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:137) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:156) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.Processor.execute(Processor.java:51) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:137) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:156) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.Processor.execute(Processor.java:51) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:137) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:123) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:100) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.IngestService.innerExecute(IngestService.java:504) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.IngestService.executePipelines(IngestService.java:409) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.IngestService.access$000(IngestService.java:75) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.IngestService$3.doRun(IngestService.java:383) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773) [elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.5.1.jar:7.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:830) [?:?]
Caused by: java.lang.IllegalArgumentException: field [host.ip] of type [java.util.ArrayList] cannot be cast to [java.lang.String]
        at org.elasticsearch.ingest.IngestDocument.cast(IngestDocument.java:551) ~[elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.IngestDocument.getFieldValue(IngestDocument.java:117) ~[elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.IngestDocument.getFieldValue(IngestDocument.java:132) ~[elasticsearch-7.5.1.jar:7.5.1]
        at org.elasticsearch.ingest.geoip.GeoIpProcessor.execute(GeoIpProcessor.java:106) ~[?:?]
        at org.elasticsearch.ingest.Processor.execute(Processor.java:50) ~[elasticsearch-7.5.1.jar:7.5.1]
        ... 24 more

So it seems that the pipeline is indeed the problem, but why ?

Hey @jsu,

A host can have multiple IP addresses. However the geoip plugin at this time can only process a single IP address at a time.

The simplest fix is to avoid doing geoip on host.ip for now.

If you still want to do geoip on your host IPs (e.g. your cloud provider assigns public IPs to it), you can wait for 7.6, which will include this fix https://github.com/elastic/elasticsearch/pull/49573 (original issue https://github.com/elastic/elasticsearch/issues/46193).

Or you can extract out public IPs to different fields containing a single IP, and do GeoIP on those instead.

Hey @webmat,

Thanks for your answer! As I don't absolutely need this geooIP enrichment - I am mostly testing with it so far - , I used your simplest fix and remade the pipeline without touching the host.ip and it works like a charm, thanks again !

Cheers,

jsu

1 Like