Filebeat to parse json array

Hi,

We are currently using filebeats to send logs to our Graylog. So far it has worked out fine for as as the logs were single line.

Now we want to add logs from another folder but the logs are are multiline json array.

{
  "name": "SecureSphere_Audit__None_Enabled_MS_Audit_ALL_22.02.2021_0357_23.02.2021_0357_SOC_Training_SecureSphere_GW_0.0000000001",
  "messageAgg": [{
    "timeSlot": "2021-02-22T23:58:21Z",
    "hits": "1",
    "base": {
      "keysCrc": "2740124025295631225",
      "serverGroup": "MS_DAM_Monitoring",
      "service": "MSSQL",
      "application": "Default MsSql Application",
      "eventSourceType": "Agent",
      "userType": "Valid",
      "dbUser": "nt authoritysystem",
      "sqlSourceGroup": "Default MsSql group",
      "isUserAuthenticed": true,
      "sourceIp": "0.0.0.0",
      "sourceApp": "sqlagent - tsql jobstep (job 0x551fc248dbf37c48ae2d71f7a043d902 : step 1)",
      "host": "sqlnode",
      "serviceType": "MsSql",
      "destinationIp": "10.100.166.147",
      "eventType": "LOGIN",
      "operation": "Login",
      "database": "master",
      "gatewayName": "SOC_Training_SecureSphere_GW"
    },
    "responseSizeBucket": "Size0",
    "affectedRowsBucket": "Size0",
    "responseTimeBucket": "Time0to1",
    "policy": "_None_Enabled_MS_Audit_ALL",
    "policyId": "-7259370068985404010"
  }, {
    "timeSlot": "2021-02-22T23:58:21Z",
    "hits": "1",
    "base": {
      "keysCrc": "9026797400263262140",
      "serverGroup": "MS_DAM_Monitoring",
      "service": "MSSQL",
      "application": "Default MsSql Application",
      "eventSourceType": "Agent",
      "userType": "Valid",
      "dbUser": "nt authoritysystem",
      "sqlSourceGroup": "Default MsSql group",
      "isUserAuthenticed": true,
      "sourceIp": "0.0.0.0",
      "sourceApp": "sqlagent - tsql jobstep (job 0x551fc248dbf37c48ae2d71f7a043d902 : step 1)",
      "host": "sqlnode",
      "serviceType": "MsSql",
      "destinationIp": "10.100.166.147",
      "eventType": "QUERY",
      "database": "master",
      "queryCrc": "-4658695140972376793",
      "gatewayName": "SOC_Training_SecureSphere_GW"
    },
    "responseSizeBucket": "Size0",
    "affectedRowsBucket": "Size0",
    "responseTimeBucket": "Time0to1",
    "normalisedQuery": "set quoted_identifier off",
    "policy": "_None_Enabled_MS_Audit_ALL",
    "policyId": "-7259370068985404010"
  }]
}

The above only contains two entires for the array but it has thousands.

my filebeat.yml is configured as follows:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /opt/soc/Security/host-date-*

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /opt/soc/_staging/dar/**/*.json
  multiline.pattern: '^{'
  multiline.negate: true
  multiline.match: after
  processors:
  - decode_json_fields:
      fields: ["message"]
      target: "json"
      process_array: true
      max_depth: 3

How do I get filebeat to send the json array as a new entry to our logstash output on the filebeat

Hi @vantoryc, welcome to the Elastic community forums!

So just to make sure I understand your requirement, for the example you posted, you want Filebeat to generate two events, one for each of the elements of the messageAgg array?

If so, this is currently not possible with Beats processors. They can only emit up to one event for every event they take as input.

You might want to look into sending the entire JSON document, i.e. the parent document that includes the messageAgg field, to Logstash and then using the split filter in Logstash.

Shaunak

1 Like

@shaunak

Thanks for the response, I was suspecting that was it after I spent some more time on the forum,

welp, back to square 1.

Appreciate it, thank you so much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.