Filebeats with postgresql module, custom log_line_prefix

Hello,

I'm struggling to set up filebeats with postrgesql module in order to ship postgresql logs to Elastic.

The logs can be seen in Kibana, however they are not parsed and present following error message

Provided Grok expressions do not match field value

I' m struggling to understand how I can tell filebeats, what is the proper log pattern
In PG, I have set following

log_line_prefix = '%t [%p]: db=%d,user=%u,app=%a,client=%h '	
					# special values:
					#   %a = application name
					#   %u = user name
					#   %d = database name
					#   %r = remote host and port
					#   %h = remote host
					#   %p = process ID
					#   %t = timestamp without milliseconds
					#   %m = timestamp with milliseconds
					#   %n = timestamp with milliseconds (as a Unix epoch)
					#   %i = command tag
					#   %e = SQL state
					#   %c = session ID
					#   %l = session line number
					#   %s = session start timestamp
					#   %v = virtual transaction ID
					#   %x = transaction ID (0 if none)
					#   %q = stop here in non-session processes
					#   %% = '%'
					# e.g. '<%u%%%d> '
{
  "_index": "filebeat-7.4.0-2019.10.21-000001",
  "_type": "_doc",
  "_id": "HrIw7m0Bye1teIScpU9u",
  "_version": 1,
  "_score": null,
  "_source": {
    "agent": {
      "hostname": "ms-vg-stats-db-01-prod.mnw.no",
      "id": "4569fe1c-798f-4eee-9a3f-ffe29f6de791",
      "type": "filebeat",
      "ephemeral_id": "1795d61c-2a2d-4e73-a565-7aee5574c09c",
      "version": "7.4.0"
    },
    "log": {
      "file": {
        "path": "/var/log/postgresql/postgresql-Mon.log"
      },
      "offset": 53929347
    },
    "message": "2019-10-21 14:00:26 CEST [3176]: db=postgres,user=patroni,app=Patroni,client=127.0.0.1 LOG:  statement: WITH replication_info AS (SELECT usename, application_name, client_addr, state, sync_state, sync_priority FROM pg_catalog.pg_stat_replication) SELECT pg_catalog.to_char(pg_catalog.pg_postmaster_start_time(), 'YYYY-MM-DD HH24:MI:SS.MS TZ'), CASE WHEN pg_catalog.pg_is_in_recovery() THEN 0 ELSE ('x' || pg_catalog.substr(pg_catalog.pg_walfile_name(pg_catalog.pg_current_wal_lsn()), 1, 8))::bit(32)::int END, CASE WHEN pg_catalog.pg_is_in_recovery() THEN 0 ELSE pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_current_wal_lsn(), '0/0')::bigint END, pg_catalog.pg_wal_lsn_diff(COALESCE(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn()), '0/0')::bigint, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_replay_lsn(), '0/0')::bigint, pg_catalog.to_char(pg_catalog.pg_last_xact_replay_timestamp(), 'YYYY-MM-DD HH24:MI:SS.MS TZ'), pg_catalog.pg_is_in_recovery() AND pg_catalog.pg_is_wal_replay_paused(), (SELECT pg_catalog.array_to_json(pg_catalog.array_agg(pg_catalog.row_to_json(ri))) FROM replication_info ri)",
    "fileset": {
      "name": "log"
    },
    "error": {
      "message": "Provided Grok expressions do not match field value: [2019-10-21 14:00:26 CEST [3176]: db=postgres,user=patroni,app=Patroni,client=127.0.0.1 LOG:  statement: WITH replication_info AS (SELECT usename, application_name, client_addr, state, sync_state, sync_priority FROM pg_catalog.pg_stat_replication) SELECT pg_catalog.to_char(pg_catalog.pg_postmaster_start_time(), 'YYYY-MM-DD HH24:MI:SS.MS TZ'), CASE WHEN pg_catalog.pg_is_in_recovery() THEN 0 ELSE ('x' || pg_catalog.substr(pg_catalog.pg_walfile_name(pg_catalog.pg_current_wal_lsn()), 1, 8))::bit(32)::int END, CASE WHEN pg_catalog.pg_is_in_recovery() THEN 0 ELSE pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_current_wal_lsn(), '0/0')::bigint END, pg_catalog.pg_wal_lsn_diff(COALESCE(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn()), '0/0')::bigint, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_replay_lsn(), '0/0')::bigint, pg_catalog.to_char(pg_catalog.pg_last_xact_replay_timestamp(), 'YYYY-MM-DD HH24:MI:SS.MS TZ'), pg_catalog.pg_is_in_recovery() AND pg_catalog.pg_is_wal_replay_paused(), (SELECT pg_catalog.array_to_json(pg_catalog.array_agg(pg_catalog.row_to_json(ri))) FROM replication_info ri)]"
    },
    "input": {
      "type": "log"
    },
    "@timestamp": "2019-10-21T12:00:27.522Z",
    "ecs": {
      "version": "1.1.0"
    },
    "service": {
      "type": "postgresql"
    },
    "host": {
      "hostname": "ms-vg-stats-db-01-prod.mnw.no",
      "os": {
        "kernel": "4.18.0-80.11.2.el8_0.x86_64",
        "codename": "Core",
        "name": "CentOS Linux",
        "family": "redhat",
        "version": "8 (Core)",
        "platform": "centos"
      },
      "containerized": false,
      "name": "ms-vg-stats-db-01-prod.mnw.no",
      "id": "875bf5a456aa4f9dab3b7f184af3e63b",
      "architecture": "x86_64"
    },
    "event": {
      "module": "postgresql",
      "dataset": "postgresql.log"
    }
  },
  "fields": {
    "suricata.eve.timestamp": [
      "2019-10-21T12:00:27.522Z"
    ],
    "@timestamp": [
      "2019-10-21T12:00:27.522Z"
    ]
  },
  "sort": [
    1571659227522
  ]
}

And the log pipeline filebeat has added is as follows
GET /_ingest/pipeline/filebeat-7.4.0-postgresql-log-pipeline

{
  "filebeat-7.4.0-postgresql-log-pipeline": {
    "description": "Pipeline for parsing PostgreSQL logs.",
    "processors": [
      {
        "grok": {
          "field": "message",
          "ignore_missing": true,
          "patterns": [
            "^%{DATETIME:postgresql.log.timestamp} \\[%{NUMBER:process.pid:long}(-%{BASE16FLOAT:postgresql.log.core_id:long})?\\] ((\\[%{USERNAME:user.name}\\]@\\[%{POSTGRESQL_DB_NAME:postgresql.log.database}\\]|%{USERNAME:user.name}@%{POSTGRESQL_DB_NAME:postgresql.log.database}) )?%{WORD:log.level}:  (?:%{NUMBER:postgresql.log.error.code:long}|%{SPACE})(duration: %{NUMBER:temp.duration:float} ms  statement: %{GREEDYDATA:postgresql.log.query}|: %{GREEDYDATA:message}|%{GREEDYDATA:message})"
          ],
          "pattern_definitions": {
            "DATETIME": "[-0-9]+ %{TIME} %{WORD:event.timezone}",
            "GREEDYDATA": "(.|\n|\t)*",
            "POSTGRESQL_DB_NAME": "[a-zA-Z0-9_]+[a-zA-Z0-9_\\$]*"
          }
        }
      },
      {
        "date": {
          "field": "postgresql.log.timestamp",
          "target_field": "@timestamp",
          "formats": [
            "yyyy-MM-dd HH:mm:ss.SSS zz",
            "yyyy-MM-dd HH:mm:ss zz"
          ]
        }
      },
      {
        "script": {
          "source": "ctx.event.duration = Math.round(ctx.temp.duration * params.scale)",
          "params": {
            "scale": 1000000.0
          },
          "if": "ctx.temp?.duration != null",
          "lang": "painless"
        }
      },
      {
        "remove": {
          "field": "temp.duration",
          "ignore_missing": true
        }
      }
    ],
    "on_failure": [
      {
        "set": {
          "field": "error.message",
          "value": "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }
}

Not an expert in grok, but it indeed does not look compatible with what PG is outputting.

Hi @mashuma,

you can find here the log examples that the team seems to be using for tests:

hope that helps

Hi @pmercado,

Thank you very much for your reply.

From what I gather it looks like I need to create my own pipeline for this to work with custom log_line_prefix.

Was hoping filebeats postrgresql module to be able to receive the pattern and work with that. Are there any guides to follow ? I've failed to find anything about this in filebeat docs.

Solved by changing the pipeline pattern

PUT /_ingest/pipeline/filebeat-7.4.0-postgresql-log-pipeline
{
    "description": "Pipeline for parsing PostgreSQL logs.",
    "processors": [
      {
        "grok": {
          "ignore_missing": true,
          "patterns": [
            "^%{DATETIME:postgresql.log.timestamp} \\[%{NUMBER:process.pid:long}\\]: db=%{POSTGRESQL_DB_NAME:postgresql.log.database},user=%{USERNAME:user.name},app=%{DATA:postgresql.log.app},client=%{IP:postgresql.log.client} %{WORD:log.level}:  (?:%{NUMBER:postgresql.log.error.code:long}|%{SPACE})(duration: %{NUMBER:temp.duration:float} ms  statement: %{GREEDYDATA:postgresql.log.query}|: %{GREEDYDATA:message}|%{GREEDYDATA:message})"
          ],
          "pattern_definitions": {
            "DATETIME": "[-0-9]+ %{TIME} %{WORD:event.timezone}",
            "GREEDYDATA": "(.|\n|\t)*",
            "POSTGRESQL_DB_NAME": "[a-zA-Z0-9_]+[a-zA-Z0-9_\\$]*"
          },
          "field": "message"
        }
      },
      {
        "date": {
          "target_field": "@timestamp",
          "formats": [
            "yyyy-MM-dd HH:mm:ss.SSS zz",
            "yyyy-MM-dd HH:mm:ss zz"
          ],
          "field": "postgresql.log.timestamp"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": "ctx.event.duration = Math.round(ctx.temp.duration * params.scale)",
          "params": {
            "scale": 1000000.0
          },
          "if": "ctx.temp?.duration != null"
        }
      },
      {
        "remove": {
          "field": "temp.duration",
          "ignore_missing": true
        }
      }
    ],
    "on_failure": [
      {
        "set": {
          "field": "error.message",
          "value": "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.