Filebeat -> logstash -> rabbitmq -> logstash -> elasticsearch

Hello All,

I have a configuration described in the subject line and also here.

Logs are shipped by filebeat to a logstash which then outputs it to a rabbitmq.
All is well with this part, I can intercept messages in rabbitmq.

The content however gets lost in the second part, and I have been unable to figure out where the messages disappear.

There is a rabbimq input that processes the queue the filebeat messages are stored in, together with other kind of messages, they then pass through a reasonably simple path to end up in elasticsearch.

I can see the other messages being stored in elasticsearch from the same queue but not the ones that are originally from the beats input.

I had this in place on an older version of logstash/ES all working, this is the latest version of everything (7.3.1) where the behaviour is observed.

What can be causing this?

Thank you all in advance,

What does the configuration of the second logstash look like?

inputs:

        rabbitmq {
                host => "xxx.xxx.xxx.xxx"
                vhost => "/"
                queue => "received.logs"
                tags => "received.logs"
                passive => true
                user => "processor"
                password => "xxxxxx"
        }
        rabbitmq {
                host => "xxx.xxx.xxx.xxx"
                vhost => "/"
                queue => "received.logs.dev"
                tags => [ "received.logs.dev", "dev" ]
                passive => true
                user => "processor"
                password => "xxxxxx"
        }
         rabbitmq {
                host => "xxx.xxx.xxx.xxx"
                vhost => "/"
                queue => "shoveled.logs"
                tags => "shoveled.logs"
                passive => true
                user => "processor"
                password => "xxxxxx"
        }

}

filters:

filter {

  if "10.xx." in [host] or "10.xx" in [IpAddress] {
    mutate {
      add_tag => [ "us", "live" ]
    }
  } else if "10.xa." in [host] or "10.xa" in [IpAddress] {
    mutate {
      add_tag => [ "eu", "live" ]
    }
  } else if "10.xb." in [host] {
    mutate {
      add_tag => [ "xxx", "live" ]
    }
  } else if "10.xc." in [host] or "10.xc" in [IpAddress] {
    mutate {
      add_tag => [ "eu", "dev" ]
    }
  } else if "10.xd." in [host] or "10.xd" in [IpAddress] {
    mutate {
      add_tag => [ "eu", "uat" ]
    }
  }

}

output:

output {
        elasticsearch {
                hosts => ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"]
        }
}

The messages in question are all in the queue called received logs and only the ones by filebeat that appear to be discarded silently.

There is nothing conditional about the output, and your filters never drop messages, so it looks like the input has to be the issue. Could it be a mismatch on the vhost?

The messages originating in from beats are mixed with messages by syslog in the same queue.

This is the received.logs queue, and the ones by syslog do end up in elasticsearch while the ones from beats do not, again, they are all in the same queue picked up by the same input.

I can see the messages in RabbitMQ, here is an example of each.

A syslog one, this gets into ES.

{
  "facility": 2,
  "type": "syslog",
  "host": "xxx.xxx.xxx.xxx",
  "severity_label": "Informational",
  "severity": 6,
  "message": "connect from AAAAA[xxx.xxx.xxx.xxx]",
  "pid": "4980",
  "priority": 22,
  "@timestamp": "2019-09-02T14:12:15.331Z",
  "logsource": "localhost",
  "timestamp8601": "2019-09-02T14:12:15.331818+00:00",
  "program": "postfix/smtpd",
  "facility_label": "mail",
  "@version": "1",
  "timestamp": "2019-09-02T14:12:15.331818+00:00"
}

A beats one that does not end up in ES.

{
  "@timestamp": "2019-09-02T13:10:39.931Z",
  "tags": [
    "beats_input_codec_plain_applied"
  ],
  "host": {
    "name": "ip-xx-xx-xx-xx"
  },
  "log": {
    "file": {
      "path": "/opt/xxxxxxxx/logs/xxxxxx.log"
    },
    "offset": 31891946
  },
  "message": "2019-09-02 13:10:39.773|DEBUG|hz.ip-xxxxxxx.priority-generic-operation.thread-0||[]:5701 [dev] [3.9.2] Received heartbeat from Member []: - 16cd2b6b-4872-4661-a5c3-52bba18f43f1 (now: 2019-09-02 13:10:39.773, timestamp: 2019-09-02 13:10:39.771)|com.hazelcast.internal.cluster.impl.ClusterHeartbeatManager",
  "@version": "1",
  "input": {
    "type": "log"
  },
  "ecs": {
    "version": "1.0.1"
  },
  "agent": {
    "hostname": "ip-xx-xx-xx-xx",
    "id": "c61120bc-f353-4159-95a3-59145bf7d7fd",
    "type": "filebeat",
    "version": "7.3.1",
    "ephemeral_id": "d51a5206-3bfa-4d8c-be2d-0c18f13adad4"
  }
}

These two were taken from the RabbitMQ queue directly.

Thanks a lot for your replies so far.

Getting there slowly.
So this is the relevant error in logstash when running it in debug mode.

[WARN ] 2019-09-02 15:49:39.833 [[main]>worker1] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x3454ec8c>], :response=>{"index"=>{"_index"=>"logstash-2019.08.28-000001", "_type"=>"_doc", "_id"=>"hCKq8mwB43T24gvi3aB0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [host] of type [text] in document with id 'hCKq8mwB43T24gvi3aB0'. Preview of field's value: '{name=ip-xx-xx-xx-xx}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:492"}}}}}

You cannot index both types of messages to the same elasticsearch index. The structures are incompatible. In one [host] is a string

"host": "xxx.xxx.xxx.xxx",

and in the other it is an object

"host": {
    "name": "ip-xx-xx-xx-xx"
 },

Whichever one gets indexed first will cause all the messages of the other format to get mapping exceptions.

You could mutate+rename [host] if ![host][name]

1 Like

I can see now, the mapping clashes, will have to send these to a separate index.

Thanks a lot, this all makes sense now.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.