Hello there, my company is highly interested in monitoring AMQP protocol (version 0.9.1, this is relevant), used by RabbitMQ. I like challenges so I intend to implement i myself, except if someone is already working on it then I would be glad to help.
Hey @Keylor42 , that sounds great. I'm not aware of anyone working on AMQP but it would interesting to us. We'd be glad to help you along the way, just let us know what you need (testing, reviews, guidance, etc.)
Hello there, I began to work on the protocol implementation but something bothers me.
Studying the code I understood (correct me if I'm wrong) that a transaction is based on a request and a response message flowing in two directions, and once you have them both you publish the transaction.
But AMQP is a partly asynchronous protocol : when a client publish a message, the server doesn't send a confirmation, and when a client consumes one or several messages at once, it doesn't send a confirmation. I observed that while sending and receiving messages locally and outputing raw messages with Packetbeat. Something quite different from all the protocols already implemented that really work with a request/response model.
My goal is to monitor messages being published or consumed. So I managed to publish the content of messages, queue names and other informations in Kibana by getting the both sides of stream in my parser. The prototype of my function thus looks like this :
func (amqp *Amqp) amqpMessageParser(s *AmqpStream, s_rev *AmqpStream, tcptuple *common.TcpTuple) (ok bool, complete bool)
and I publish a transaction when I'm still inside the function, when I finish to parse a body frame, by fetching informations in the two message structures. It works for now, but I feel bad about this because it seems like I don't use the code as it was intended (for example : status is always OK because I can't expect an error code) and it could cause me some trouble in the end . So I wanted to know what you thought about this.
By the way my repo is here : https://github.com/Keylor42/packetbeat/tree/protoamqp/protos
(I had to change several headers to make the project compile)
Right, all protocols that we support so far are mainly request-response oriented (with the small exception that Thrift-RPC has an async call type).
I think the way you did it is just fine, if we only ever expect one message, there's no point in storing it for correlation like the other protocols do, so the code can be a bit simpler.
Right, some fields won't make sense, like the status
or responsetime
, but what's probably very useful is that they all get centralized in Elasticsearch so you can use searches to follow the messages as they pass through the queues.
I've looked a bit over the code, looks like great work! I look forward to having this in Packetbeat.
Ok, cool, thanks for your answer ! Right now I'm selecting the relevant exported fields for the protocol, my problem is that some of them (like the exchange type) are present in the case of publishing, but absent when a client consume the message and vice versa. Is it important that all the exported fields of a protocol are the same and are all filled ?
Sorry, I don't know much about AMQP. Does this mean that depending on the message type, there can be a different set of fields indexed? That's totally fine and happens in other protocols as well.
That's right. for example, when a publisher send a message to an exchange, the exchange type is precised : it can be direct, fanout, topic etc. But for the consumer that pulls the message from the queue, it doesn't have the information and doesn't really need it. So for the Publish method, there would be indeed an exchange type field, but not for the consume method. Same thing for the Consume method that pulls from a queue, but the publish method doesn't know the name of the queue. So if it's ok to have different exported fields depending of the method, that is great.
I don't really know what I should do when I receive FIN or GapInStream ? Check if there is a message and a possible transaction ? If GapInStream is received, does it means that there is necessarily a gap of nbytes in the data received ?
By the way I'm nearly finished. Regarding my last message, I finally kept the original protoype of messageparser function and I separated the AMQP methods in synchronous and asynchronous ones (like publish, deliver, ack etc.). I made a new branch to commit my work with the beta3 version of packetbeat : https://github.com/Keylor42/packetbeat/tree/amqp/protos/amqp.
Yes, GapInStream means that data was lost at sniffing time and it gives you the opportunity to still try to make some sense of the data you received so far. Not all protocols do something with it, it's especially useful for cases where the responses tend to be very large, which I think it's not the case for AMQP. So I think you can just live the method empty. Similarly, ReceivedFin is useful for protocols that don't have a good way of splitting between messages, so I suspect you don't need it.
By the way I'm nearly finished. Regarding my last message, I finally kept the original protoype of messageparser function and I separated the AMQP methods in synchronous and asynchronous ones (like publish, deliver, ack etc.). I made a new branch to commit my work with the beta3 version of packetbeat : https://github.com/Keylor42/packetbeat/tree/amqp/protos/amqp.
This is great news! I recommend opening a Pull Request early, it's always better to discuss on code and we can also help add any missing pieces that there might be after the PR is opened.
Ok, thanks for the answer ! As for the pull request, expect it maybe next week.
EDIT : whoops, spoke too fast. My company will first test it on main server. So it rather will be next weekS.
In the meantime, we wrote a developer guide about how to add support for a new protocol in Packetbeat. Here is the link: https://www.elastic.co/guide/en/beats/packetbeat/current/_developer_guide_adding_a_new_protocol.html.
If you need help or if you have any other questions, don't hesitate to contact us!
Just to inform you, I'm just waiting for the contributor agreement signature from my hierarchy to do the pull request. In the meantime, the plugin has been tested on our servers, and it's working. Pull request could happen this week if things go smooth.
Thats really awesome. Looking forward for the PR.