AMQP Metadata from RabbitMQ input plugin

Hello Everyone,

We are inputting AMQP messages from RabbitMQ. These come to RabbitMQ via SMTP utilizing the rabbitmq-email plugin, so the AMQP headers contain some SMTP information we need.

By default it seems that only the body of the email is inputted from RabbitMQ.

We require the email subject and destination address from the AMQP metadata. We know they are there, as we are able to see these when consuming messages with a python script.

Attempting to get this working has lead me to using the following logstash.conf file.
Note: routing-key from AMPQ metadata is obtained from the SMTP email destination ("To") address.

input {
  rabbitmq {
    codec => "plain"
    tags => ["test","RabbitMQ","smtp"]
    host => "rabbitmq"
    port => 5672
    queue => "queue_t"
    durable => true
    passive => true
    user => "consumer_t"
    password => "BOGUS_PASSWORD"
    metadata_enabled => "extended"
    add_field => { "topic" => "%{[@metadata][rabbitmq_properties][routing-key]}"}
    add_field => { "subject" => "%{[@metadata][rabbitmq_headers][Subject]}"}
  }
}

filter {
  json {
      add_field => { "to_address" => "%{[@metadata][rabbitmq_properties][routing-key]}"}
      add_field => { "subject" => "%{[@metadata][rabbitmq_headers][Subject]}"}
  }
}

output {
  opensearch {
    hosts => ["opensearch:9200"]
    ssl => true
    ssl_certificate_verification => false
    user => "osearch-user"
    password => "BOGUS_PASSWORD"
    index => "test-rabbitmq-logstash-pipeline"
  }
  stdout { codec => rubydebug }
}

But it seems that I may have missed something, or made a mistake, as I cannot get the email info required.

Anyone able to see what might be wrong?

Cheers,
Eddie.

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

I suggest you add { metadata => true } as the codec options to verify that logstash is tagging the event with the metadata.

Yes it does show "routing-key" and "Subject" in the RabbitMQ metadata.

{
       "@timestamp" => 2024-02-09T08:58:06.541860511Z,
        "@metadata" => {
        "rabbitmq_properties" => {
             "consumer-tag" => "amq.ctag-_36BYl7dfFbFo5s8kAOZEQ",
             "content-type" => "text/plain",
            "delivery-mode" => 2,
               "message-id" => "9054ce4cf059eadb9e4c809f3926d798",
              "routing-key" => "smtp_test@example.com",
                "timestamp" => 1707469086,
                 "exchange" => "smtp_exchange"
        },
           "rabbitmq_payload" => "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send.",
           "rabbitmq_headers" => {
            "Subject" => "SMTP MQ Test",
               "From" => "random_sender@invalid_domain"
        }
    },
       "email_dest" => "smtp_test@example.com",
         "@version" => "1",
          "message" => "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send.",
            "event" => {
        "original" => "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send."
    },
    "email_subject" => "SMTP MQ Test",
             "tags" => [
        [0] "test",
        [1] "RabbitMQ",
        [2] "smtp"
    ]
}

And it also shows email_dest and email_subject fields added, as required, after correcting the logstash.conf file.

input {
  rabbitmq {
    codec => "plain"
    tags => ["test","RabbitMQ","smtp"]
    host => "rabbitmq"
    port => 5672
    queue => "queue_t"
    durable => true
    passive => true
    user => "consumer_t"
    password => "BOGUS_PASSWORD"
    metadata_enabled => "extended"
  }
}
filter {
  mutate {
    add_field => { "email_dest" => "%{[@metadata][rabbitmq_properties][routing-key]}" }
    add_field => { "email_subject" => "%{[@metadata][rabbitmq_headers][Subject]}" }
  }
}
output {
  opensearch {
    hosts => ["opensearch:9200"]
    ssl => true
    ssl_certificate_verification => false
    user => "osearch-user"
    password => "BOGUS_PASSWORD"
    index => "test-rabbitmq-logstash-pipeline"
  }
  stdout { codec => rubydebug { metadata => true } }
}

Thanks very much for you help.

I would argue that this is a bug. Processing the common options add_field / add_tag / remove_field / remove_tag is called decoration. The input calls decorate event before it adds the rabbitmq metadata. I think that is the wrong way around.

As you have discovered, the workaround is to create the email fields using a mutate filter.

Thanks for clarifying.