Logstash join (aggregate?) two aggregated maps (postfix)


I have two postfix servers. Call one 'local' and the other 'dmz'. I collect log from each and I aggregate them with taskid being the queue id postfix gives to each process. All works fine so far, I get one aggregated log entry for the whole processing of the mail from 'local' and from 'dmz' too.

But I'd also like to see the path a mail takes in one joint/aggregated log entry. It gets tricky here. I have the GROK and the fields/values for this. When one server forwards the mail for the other (whether it's ingress or egress, irrelevant, same mechanism), at the end, the recipient server sends back the queue id they gave to the process.

So in reality something like this:
local 's log:

AAAAAA1: processing starts
AAAAAA1: some other relevant log info
AAAAAA1: blabla..
AAAAAA1: successfully sent ( queued as BBBBBB2 )

and dmz 's log:

BBBBBB2: processing starts
BBBBBB2: blablbalba

I'm already aggregating the individual servers' logs based on the queue id (AAAAA1 and BBBBB2) but as you can see in the end I receive the other server's queue id too.

  • Can I somehow aggregate on that too?
  • How would I make sure to only join/aggregate the final products of each individual aggregating? I mean I'm tagging the finished aggregates as 'aggregated', but the problem is, once it's tagged as that (timeout_tags) then it finished processing
  • If I can even do this .. I'd be overwriting necessary fields with one another's. Can I make sure somehow that when I do the final aggregation, I rename all fields from one input. Like
    smtp_status_code => dmz_smtp_status_code


(Bonus question: if all this is possible and viable, sometimes I'd need even further nesting as if a bounce happens this is how it goes: local sends successfully dmz notices the bounce and as a result dmz sends a mail to local to inform it. The 2 links chain becomes 3..)

Not an easy problem. I do not think an aggregate filter using the queue id as task_id can do it. You may be able to do it in a ruby filter, or an aggregate filter with a constant task_id, which is basically a ruby filter with some framework (timeouts) that may be useful.

The first message processed contains the queue id of the second message processed.

One approach would be to store the aggregated data for AAAAAA1 in the filter (maybe in a hash of hashes with BBBBBB2 as the key). Then when BBBBBB2 is processed, check if you have a hash entry for it. If so, join and remove.

Another would be to use a staging index. Store a document for every queue id. Then scan it periodically seeing if you can match pairs (or triplets), join them, delete them, and writing to the final resting place. This is probably more robust.

Note that writes to Elasticsearch are not immediate. If you process AAAAAA1 and then quickly try and do a lookup when processing BBBBBB2 there are several reasons why AAAAAA1 may not have been indexed (pipeline batching, output delays, index refresh delays and probably more).

Yeah.. hoped it's just my limited experience with logstash that I don't realise a quick (and possibly dirty) solution for this. So I reckon your 2nd approach is more robust but still comes with its own caveats.

The only approach I could come up with was keep processing the logs from both servers as before and output them to both elastic and a file. Then re-process the output file, throw away anything that's not tagged as aggregated already, add a new field like: intersection_id and aggregate on that, output to Elasticsearch.

So this way theoretically Elasticsearch would have all individual logs, the individual logs' aggregate, and also the aggregates' aggregate. (This is probably why your 1st approach wouldn't be optimal for my use-case anyway. I'd like to keep ALL logs as in the case of some deep, sophisticated debug it could come in handy to have all stages of logs ~ all lines separately too)

The only problem I see could come up with this is somewhat similar to your 2nd approach. I'd somehow had to make sure this intermediate log file gets processed only after both servers' logs' pipelines wrote their own aggregate to it. This, in my case, practically isn't a problem, most likely, but a difficult theoretical question as if it was a huge enterprise setup some logs could possibly take a minute or two before arriving there. I'm thinking raising the aggregation's timeout to like 5-10 minutes should do it? Same with your 2nd approach?

Probably. Unless you are willing to persist the unaggregated data forever there is always a chance, however tiny, that BBBBBB2 logs arrive after you have deleted the unaggregated AAAAAA1 logs from the staging area. Unless you make the persistent storage the staging area, persist the unaggregated data and go back and scan it from time to time to look for data that can be aggregated.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.