Handling log messages wrapped in metadata

Hi,

I have the following data pipeline working:

s3 (log archives) -- Ec2Instance -- kinesis -- logstash -- ELK

Logs are saved on s3, which the EC2Instance fetches and pushes them to kinesis. Different streams are used for 3 types of logs we have. On the other end, logstash pulls data from kinesis and loads it into ElasticSearch.

Now the problem I'm facing is that my logs do not have client identifier etc. So when working with multiple archives, I can't easily differentiate the logs from various sources.

One solution I was thinking was of wrapping the log messages being PUT in kinesis with metadata which I will have available at the EC2instance stage.

For example:
Record
{ client_id: 12347-127831,
customer_name: xyz,
raw_message: "'May 23 21:18:24 localhost rsyncd[29384]: building file list\nMay 23 21:18:24 localhost rsyncd[29383]: building file list\nMay 23 21:18:24 localhost rsyncd[29383]: sent 77 bytes received 30 bytes total size 0\nMay 23 21:18:24 localhost rsyncd[29384]: sent 77 bytes received 30 bytes total size 0\nMay 23 21:19:01 localhost systemd: Created slice user-0.slice.\nMay 23 21:19:01 localhost systemd: Starting Session 75 of user root.\nMay 23 21:19:01 localhost systemd: Started Session 75 of user root.\n'"
}

How would I go about handling this data efficiently on the other end on logstash?

If I use input type "json" and then further grok on the "raw_message" field, logstash treats the above multiple log messages as one and reads them whole.

Is there a way for me to have pass 1 where the metadata is stripped, relevant tags added and the further filters treat each lines as individual entry?

I would like to avoid adding metadata on each new line as that would increase my overhead both in terms of data transfer and processing.

Any help would be appreciated! Thanks!

Still haven't been able to figure out how to handle multiple lines. For now the workaround I'm using is to have single lines in the raw_message instead of multiple lines.

For example:
[{ client_id: 12347-127831, customer_name: xyz, raw_message: "'May 23 21:18:24 localhost rsyncd[29384]: building file list\nMay 23 21:18:24 localhost rsyncd[29383]: building file list\n'" },{ client_id: 12347-127831,
customer_name: xyz,
raw_message: "'May 23 21:19:01 localhost systemd: Created slice user-0.slice'"
}]`

filter.conf
grok { "raw_message" =>"%{SYSLOGTIMESTAMP:syslog_timestamp}..."}

This works, but seems very in-efficient given majority of my workload is batch processing.

Any pointers?