You can write your own module for this and send it as a PR
Something like this may help
Note that when you write your own module in beat, you use mostly ingest processor of elasticsearch instead of beat processors
When the beat is runned for the first time, the ingest pipeline ($BEAT_HOME/modules/${MODULE_NAME}/${STREAM_TYPE}/ingest/pipeline.yml
) is loaded into elasticsearch
description: Pipeline for parsing PostgreSQL CSV logs.
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- csv:
field: message
separator: ","
target_fields: ["postgresql.log.timestamp", "user.name", "postgresql.log.database", "process.pid", "temp.connection_from", "temp.session_id", "temp.session_line_num", "temp.command_tag", "temp.session_start_time", "temp.virtual_transaction_id", "temp.transaction_id", "log.level", "postgresql.log.error.code", "log.message", "temp.detail", "temp.hint", "temp.internal_query", "temp.internal_query_pos", "temp.context", "postgresql.log.query", "temp.query_pos", "temp.location", "temp.application_name"]
ignore_missing: true
trim: true
- date:
field: postgresql.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss.SSS zz
- yyyy-MM-dd HH:mm:ss zz
- convert:
field: postgresql.log.error.code
type: integer
- script:
lang: painless
source: ctx.event.duration = Math.round(ctx.temp.duration * params.scale)
params:
scale: 1000000
if: ctx.temp?.duration != null
- remove:
field: temp.duration
ignore_missing: true
- set:
field: event.kind
value: event
- append:
field: event.category
value:
- database
- append:
field: event.type
value:
- info
- append:
field: event.type
value:
- error
if: "ctx?.postgresql?.log?.error?.code != null && ctx.postgresql.log.error.code >= 02000"
- append:
field: related.user
value: "{{user.name}}"
if: "ctx?.user?.name != null"
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'