Ingest pipeline split MESSAGE field into multiple fields

Hello everybody,

I am new to elasticsearch and kibana and wanted to ask how to split a message into multiple fields.
I tried creating an ingest pipeline with MESSAGE field and "," as separator, but it doesn't work as I expected.

We have syslog-ng receiving firewall logs and sending them into elasticsearch. All relevant data is in the MESSAGE field:

{"zone_src":"SRC","zone_dst":"DST","reason":"rule","rule_id":XXXXX,"rule_description":"RULE-Decription","action":"ACCEPT","@timestamp":"2022-07-04T22:11:30.501591+0200","timestamp":1656965490,"timestamp_usec":501591,"iface_in":"ethXX.XXXX","iface_out":"ethXX.XXX","ip_src":"XXX.XXX.XXX.XXX","ip_dst":"XXX.XXX.XXX.XXX","protocol":17,"port_src":XXXXX,"port_dst":XX,"mark":17825792,"tos":0,"host_id":164119,"host_name":"some_hostname","logtype":"forward"}

How can I create an ingest pipeline to split the message to separate fields with it's values, e.g.:

zone_src: SRC
zone_dst: DST
reason: rule
rule_id: XXXXX
.....

Thank you in advance,

Best regards,
M

Hi @moberreiter Welcome to the community

The best way is to show us the whole raw document... And your ingest pipeline. Please format your code going forward with the </> button

It is unclear if you're saying that is the message field or the whole document?

If you're message field is actually a JSON document, then use the JSON processor

If it's key value pairs, then use the KV processor

Show us your whole raw document and your whole and ingest pipeline and we'll be able to help.

Hello Stephen,

Here is the complete document:

{
  "_index": "xxxx-fw-000001",
  "_type": "_doc",
  "_id": "mVtmyoEBGm8vzSyzjInY",
  "_version": 1,
  "_score": 1,
  "_ignored": [
    "MESSAGE.keyword"
  ],
  "_source": {
    "SOURCE": "s_network_fw",
    "PROGRAM": "xxxxx",
    "PRIORITY": "notice",
    "MESSAGE": "{\"zone_src\":\"SRC\",\"zone_dst\":\"DST\",\"reason\":\"rule\",\"rule_id\":533462,\"rule_description\":\"RULE_DSC\",\"action\":\"ACCEPT\",\"@timestamp\":\"2022-07-04T20:08:46.278529+0200\",\"timestamp\":1656958126,\"timestamp_usec\":278529,\"iface_in\":\"ethXX.XXXX\",\"iface_out\":\"ethXX.XXX\",\"ip_src\":\"XXX.XXX.XXX.XXX\",\"ip_dst\":\"XXX.XXX.XXX.XXX\",\"protocol\":17,\"port_src\":49375,\"port_dst\":53,\"mark\":17825792,\"tos\":0,\"host_id\":164119,\"host_name\":\"some_hostname\",\"logtype\":\"forward\"}",
    "ISODATE": "2022-07-04T20:08:47+02:00",
    "HOST_FROM": "XXX.XXX.XXX.XXX",
    "HOST": "XXX.XXX.XXX.XXX",
    "FACILITY": "user",
    "@timestamp": "2022-07-04T20:08:47+02:00"
  },
  "fields": {
    "FACILITY": [
      "user"
    ],
    "PRIORITY.keyword": [
      "notice"
    ],
    "PROGRAM.keyword": [
      "XX-Firewall"
    ],
    "HOST": [
      "XXX.XXX.XXX.XXX"
    ],
    "PRIORITY": [
      "notice"
    ],
    "HOST_FROM.keyword": [
      "XXX.XXX.XXX.XXX"
    ],
    "ISODATE": [
      "2022-07-04T18:08:47.000Z"
    ],
    "MESSAGE": [
      "{\"zone_src\":\"SRC\",\"zone_dst\":\"DST\",\"reason\":\"rule\",\"rule_id\":533462,\"rule_description\":\"rule_description\",\"action\":\"ACCEPT\",\"@timestamp\":\"2022-07-04T20:08:46.278529+0200\",\"timestamp\":1656958126,\"timestamp_usec\":278529,\"iface_in\":\"ethXX.XXXX\",\"iface_out\":\"ethXX.XXX\",\"ip_src\":\"XXX.XXX.XXX.XXX\",\"ip_dst\":\"XXX.XXX.XXX.XXX\",\"protocol\":17,\"port_src\":49375,\"port_dst\":53,\"mark\":17825792,\"tos\":0,\"host_id\":164119,\"host_name\":\"some_hostname\",\"logtype\":\"forward\"}"
    ],
    "@timestamp": [
      "2022-07-04T18:08:47.000Z"
    ],
    "HOST_FROM": [
      "XXX.XXX.XXX.XXX"
    ],
    "SOURCE": [
      "s_network_fw"
    ],
    "FACILITY.keyword": [
      "user"
    ],
    "HOST.keyword": [
      "XXX.XXX.XXX.XXX"
    ],
    "SOURCE.keyword": [
      "s_network_fw"
    ],
    "PROGRAM": [
      "XX-Firewall"
    ]
  },
  "ignored_field_values": {
    "MESSAGE.keyword": [
      "{\"zone_src\":\"SRC\",\"zone_dst\":\"DST\",\"reason\":\"rule\",\"rule_id\":533462,\"rule_description\":\"rule_description\",\"action\":\"ACCEPT\",\"@timestamp\":\"2022-07-04T20:08:46.278529+0200\",\"timestamp\":1656958126,\"timestamp_usec\":278529,\"iface_in\":\"ethXX.XXXX\",\"iface_out\":\"ethXX.XXX\",\"ip_src\":\"XXX.XXX.XXX.XXX\",\"ip_dst\":\"XXX.XXX.XXX.XXX\",\"protocol\":17,\"port_src\":49375,\"port_dst\":53,\"mark\":17825792,\"tos\":0,\"host_id\":164119,\"host_name\":\"some_hostname\",\"logtype\":\"forward\"}"
    ]
  }
}```

and thank you for your quick response!!

Best regards,
M

Forgot the ingest pipeline:

PUT _ingest/pipeline/xx-fw
{
  "processors": [
    {
      "split": {
        "field": "MESSAGE",
        "separator": ","
      }
    }
  ]
}```

It also looks like you're using a default mapping, but you should create your own through an index template

Not sure why you have _ignored. message.keyword

So here is a mapping (you should complete it with all the fields from the message.
I used KV processor and cleaned up some stuff.
There is an issues and that is your message has an @timestamp so you either put the message fields under a target_field or you exclude the timestamp "exclude_keys" : ["@timestamp"] .. your choice.... or you could add more logic to save to target fields replace the time stamp then copy everything to root... I like the target_field approach :slight_smile:

# Mapping
DELETE discuss-test-syslog

PUT discuss-test-syslog/
{
  "mappings": {
    "properties": {
      "SOURCE": {
        "type": "keyword"
      },
      "PROGRAM": {
        "type": "keyword"
      },
      "PRIORITY": {
        "type": "keyword"
      },
      "ISODATE": {
        "type": "date"
      },
      "HOST_FROM": {
        "type": "ip"
      },
      "HOST": {
        "type": "ip"
      },
      "FACILITY": {
        "type": "keyword"
      },
      "@timestamp": {
        "type": "date"
      },
            "MESSAGE": {
        "type": "keyword"
      }
    }
  }
}

GET discuss-test-syslog/_search

DELETE discuss-test-syslog

PUT discuss-test-syslog/
{
  "mappings": {
    "properties": {
      "SOURCE": {
        "type": "keyword"
      },
      "PROGRAM": {
        "type": "keyword"
      },
      "PRIORITY": {
        "type": "keyword"
      },
      "ISODATE": {
        "type": "date"
      },
      "HOST_FROM": {
        "type": "ip"
      },
      "HOST": {
        "type": "ip"
      },
      "FACILITY": {
        "type": "keyword"
      },
      "@timestamp": {
        "type": "date"
      },
            "MESSAGE": {
        "type": "keyword"
      }
    }
  }
}


# Ingest Pipeline

DELETE _ingest/pipeline/discuss-test-syslog

PUT _ingest/pipeline/discuss-test-syslog
{
  "processors": [
    {
      "kv": {
        "field": "MESSAGE",
        "field_split": ",",
        "value_split": ":",
        "trim_key": "{\\\"",
        "trim_value": "\\\"}",
        "target_field": "details"
      }
    }
  ]
}
  
  
# Post with the pipeline
  
POST discuss-test-syslog/_doc?pipeline=discuss-test-syslog
{
    "SOURCE": "s_network_fw",
    "PROGRAM": "xxxxx",
    "PRIORITY": "notice",
    "MESSAGE": "{\"zone_src\":\"SRC\",\"zone_dst\":\"DST\",\"reason\":\"rule\",\"rule_id\":533462,\"rule_description\":\"RULE_DSC\",\"action\":\"ACCEPT\",\"@timestamp\":\"2022-07-04T20:08:46.278529+0200\",\"timestamp\":1656958126,\"timestamp_usec\":278529,\"iface_in\":\"ethXX.XXXX\",\"iface_out\":\"ethXX.XXX\",\"ip_src\":\"XXX.XXX.XXX.XXX\",\"ip_dst\":\"XXX.XXX.XXX.XXX\",\"protocol\":17,\"port_src\":49375,\"port_dst\":53,\"mark\":17825792,\"tos\":0,\"host_id\":164119,\"host_name\":\"some_hostname\",\"logtype\":\"forward\"}",
    "ISODATE": "2022-07-04T20:08:47+02:00",
    "HOST_FROM": "192.168.1.1",
    "HOST": "192.168.2.1",
    "FACILITY": "user",
    "@timestamp": "2022-07-04T20:08:47+02:00"
}

GET discuss-test-syslog/_search

Easy way to quickly iterate and test pipeline and simulate all in one.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "kv": {
          "field": "MESSAGE",
          "field_split": ",",
          "value_split": ":",
          "trim_key": "{\\\"",
          "trim_value": "\\\"}",
          "exclude_keys" : ["@timestamp"]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "SOURCE": "s_network_fw",
        "PROGRAM": "xxxxx",
        "PRIORITY": "notice",
        "MESSAGE": "{\"zone_src\":\"SRC\",\"zone_dst\":\"DST\",\"reason\":\"rule\",\"rule_id\":533462,\"rule_description\":\"RULE_DSC\",\"action\":\"ACCEPT\",\"@timestamp\":\"2022-07-04T20:08:46.278529+0200\",\"timestamp\":1656958126,\"timestamp_usec\":278529,\"iface_in\":\"ethXX.XXXX\",\"iface_out\":\"ethXX.XXX\",\"ip_src\":\"XXX.XXX.XXX.XXX\",\"ip_dst\":\"XXX.XXX.XXX.XXX\",\"protocol\":17,\"port_src\":49375,\"port_dst\":53,\"mark\":17825792,\"tos\":0,\"host_id\":164119,\"host_name\":\"some_hostname\",\"logtype\":\"forward\"}",
        "ISODATE": "2022-07-04T20:08:47+02:00",
        "HOST_FROM": "192.168.1.1",
        "HOST": "192.168.2.1",
        "FACILITY": "user",
        "@timestamp": "2022-07-04T20:08:47+02:00"
      }
    }
  ]
}

Hello Stephen,

works like a charm, you made my day!
Thank you very much for your help!!

BR,
M

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.