How to use nested JSON fields to search for content and drop it

Hi all,

Been stuck on this since yesterday, and now I'm at a loss.

Basically: Using the Azure EventHub input plugin, I'm ingesting Azure logs from various sources, one of the being Azure SQL Audit logs, which are all in JSON format.

In my filters, I'm doing a split on the message field, with the output being in the records field, and then I remove the unparsed message field.

Now, I have some data that I don't care about, and want to drop them based on values in the records.properties.action_id and records.category fields.

I've tried multiple different things, none of them are working.

My latest attempt looks something like that, which makes the Logstash parser explode:

  if [type] == "azure_event_hub" {
    json {
        source => "message"
    }
    split {
        field => ["records"]
    }
    mutate {
        remove_field => [ "message" ]
    }
    if %{[records][category]} == "SQLSecurityAuditEvents" and %{[records][properties][action_id]} !~ "DBAF" and %{[records][properties][action_id]} !~ "DBAS" {
      drop { }
    }
  }

Here is my sanitised data from ElasticSearch. How should I go about doing what I need to do ?

Thanks in advance!

{
  "_index": "devops-diagsettings-2021.07.12",
  "_type": "_doc",
  "_id": "EG3GnHoBPvXLUEB8vkm0",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2021-07-12T22:11:47.560Z",
    "type": "azure_event_hub",
    "tags": [
      "azure-event-hub",
      "prod-a-azure",
      "prod"
    ],
    "records": {
      "originalEventTimestamp": "2021-07-12T22:10:37.5830011Z",
      "ResourceGroup": "<redacted>",
      "SubscriptionId": "<subid>",
      "category": "SQLSecurityAuditEvents",
      "operationName": "AuditEvent",
      "resourceId": "/SUBSCRIPTIONS/<redacted>/RESOURCEGROUPS/<redacted>/PROVIDERS/MICROSOFT.SQL/SERVERS/<redacted>/DATABASES/MASTER",
      "LogicalServerName": "<sqlservername>",
      "properties": {
        "sequence_number": 1,
        "securable_class_type": "DATABASE",
        "permission_bitmask": "00000000000000000000000000000000",
        "data_sensitivity_information": "",
        "database_name": "<redacted>",
        "client_tls_version": 0,
        "session_context": "",
        "object_name": "<redacted>",
        "connection_id": "25F8F4D8-E17D-4F7C-885C-7973EC0304E9",
        "server_instance_name": "<redacted>",
        "succeeded": "true",
        "is_server_level_audit": "true",
        "user_defined_event_id": 0,
        "target_server_principal_id": 0,
        "server_principal_id": 0,
        "additional_information": "<batch_information><transaction_info>begin transaction</transaction_info></batch_information>",
        "user_defined_information": "",
        "audit_schema_version": 1,
        "class_type_description": "DATABASE",
        "response_rows": 0,
        "session_id": 710,
        "host_name": "<redacted>",
        "sequence_group_id": "18054C2A-C110-4581-9E5E-2BD88F4D6AB8",
        "is_column_permission": "false",
        "affected_rows": 0,
        "action_id": "TRBC",
        "transaction_id": 9911978212,
        "session_server_principal_name": "<redacted>",
        "target_database_principal_name": "",
        "server_principal_name": "<redacted>",
        "target_server_principal_sid": "",
        "target_server_principal_name": "",
        "object_id": 15,
        "duration_milliseconds": 0,
        "class_type": "DB",
        "database_principal_id": 7,
        "event_id": "C93A3EC8-5048-441F-970F-39F15EE29FBE",
        "target_database_principal_id": 0,
        "event_time": "2021-07-12T22:10:36.611Z",
        "server_principal_sid": "01060000000100640000000000000000ec17c3056c3eae489eb40392a128c97a",
        "client_ip": "<redacted>",
        "database_principal_name": "<redacted>",
        "statement": "",
        "schema_name": "",
        "application_name": ".Net SqlClient Data Provider",
        "action_name": "TRANSACTION BEGIN COMPLETED"
      },
      "time": "2021-07-12T22:10:37.5959728Z"
    },
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2021-07-12T22:11:47.560Z"
    ],
    "records.time": [
      "2021-07-12T22:10:37.595Z"
    ],
    "records.originalEventTimestamp": [
      "2021-07-12T22:10:37.583Z"
    ],
    "records.properties.event_time": [
      "2021-07-12T22:10:36.611Z"
    ]
  },
  "highlight": {
    "records.category": [
      "@kibana-highlighted-field@SQLSecurityAuditEvents@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1626127907560,
    1626127837583
  ]
}

Try

if [records][category] == "SQLSecurityAuditEvents" and [records][properties][action_id] !~ "DBAF" and [records][properties][action_id] !~ "DBAS" {

sprintf references will not work in this context. Just use the field name.

I did try that earlier, and it still makes the parser explode with the following error:

[2021-07-13T17:33:08,630][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Expected one of #, \", ', } at line 96, column 13 (byte 2581) after filter {\n  if [@metadata][input-http] {\n    date {\n        match => [ \"date\", \"UNIX\" ]\n        remove_field => [ \"date\" ]\n
  }\n    mutate {\n        remove_field => [\"headers\",\"host\"]\n        rename => [\"log\", \"log-aks\"]\n    }\n  }\n  if \"appslogs\" in [tags]{\n    json {\n      source => \"message\"\n    }\n  }\n  if [AppIndex] {\n    mutate {\n      add_tag => \"%{AppIndex}\"\n    }\n  }  \n  if \"iis\" in [tags] and \"filebeat\" in [tags]{\n    dissect {\n      mapping
=> {\n        \"message\" => \"%{log_date} %{log_time} %{iisSite} %{server} %{site} %{method} %{requestUrl} %{querystring} %{port} %{username} %{clienthost} %{httpversion} %{userAgent} %{cookie} %{referer} %{hostUrl} %{response} %{subresponse} %{scstatus} %{sentBytes} %{receivedBytes} %{timetaken}\"\n      }\n      convert_datatype => {\n        \"response\" => \"int\"\n        \"subresponse\" => \"int\"\n        \"port\" => \"int\"\n        \"scstatus\" => \"int\"\n        \"sentBytes\" => \"int\"\n        \"receivedBytes\" => \"int\"\n
  \"timetaken\" => \"int\"\n      }\n    }\n    mutate {\n        add_field => {\n          \"log_timestamp\" => \"%{log_date} %{log_time}\"\n        }\n        rename => [\"host\", \"hostiis\"]\n    }\n    mutate {\n      remove_field => [ \"[message]\", \"[log_date]\", \"[log_time]\", \"[beat][hostname]\", \"[beat][name]\" ]\n    }\n    date {\n        match =>
[ \"log_timestamp\", \"YYYY-MM-dd HH:mm:ss\" ]\n        timezone => \"Etc/UTC\"\n    }\n  }\n  if [type] == \"azure_event_hub\" {\n\n    if [message] =~ \"foglight\" {\n      drop { }\n    }\n\n    json {\n        source => \"message\"\n    }\n    split {\n        field => [\"records\"]\n    }\n    mutate {\n        remove_field => [ \"message\" ]\n    }\n    if [records][category] == \"SQLSecurityAuditEvents\" and [records][properties][action_id] !~ \"DBAF\" and [records][properties][action_id] !~ \"DBAS\" {\n      drop {", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:22:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/reload.rb:43:in `block in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:96:in `block in exclusive'", "org/jruby/ext/thread/Mutex.java:165:in `synchronize'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:96:in `exclusive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/reload.rb:39:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:334:in `block in converge_state'"]}

What comes after this?

if [records][category] == "SQLSecurityAuditEvents" and [records][properties][action_id] !~ "DBAF" and [records][properties][action_id] !~ "DBAS" {
  drop {

Nothing.
This is the final filter in the filter list. After that the filter array is closed, and we configure the Output section.

If you configuration file ends after the open bracket then it is obviously invalid. If it does not end there then you need to say what is there.

I think you simply miscopied my original text.

This is the full if statement on that filter:

if [type] == "azure_event_hub" {
    json {
        source => "message"
    }
    split {
        field => ["records"]
    }
    mutate {
        remove_field => [ "message" ]
    }
    if %{[records][category]} == "SQLSecurityAuditEvents" and %{[records][properties][action_id]} !~ "DBAF" and %{[records][properties][action_id]} !~ "DBAS" {
      drop { }
    }
  }

What is the configuration that produces this error message? It is not one that contains

    if %{[records][category]} == "SQLSecurityAuditEvents" and %{[records][properties][action_id]} !~ "DBAF" and %{[records][properties][action_id]} !~ "DBAS" {

As soon as I insert either of the below if blocks within the parent if [type == "azure_event_hub"] block, the error quoted is being returned:

    if %{[records][category]} == "SQLSecurityAuditEvents" and %{[records][properties][action_id]} !~ "DBAF" and %{[records][properties][action_id]} !~ "DBAS" {
      drop { }
    }

OR

    if [records][category] == "SQLSecurityAuditEvents" and [records][properties][action_id] !~ "DBAF" and [records][properties][action_id] !~ "DBAS" {
      drop { }
    }

No idea why you deleted your message, because that was exactly it.
VS Code probably did something sketchy, and the "character" between the two curly braces in drop wasn't correct/interpreted correctly.

I have another drop statement that works fine in another filter: Copy-pasted it from there, edited my config, and now it works fine using the second "version:

    if [records][category] == "SQLSecurityAuditEvents" and [records][properties][action_id] !~ "DBAF" and [records][properties][action_id] !~ "DBAS" {
      drop { }
    }

That is a very, very strange error message. Note that it is not

"Expected one of '\\\"', any character, '\"' at line 76

or

Expected one of [ \\t\\r\\n], \"#\", [A-Za-z0-9_-], '\"', \"'\", [A-Za-z_], \"-\", [0-9], \"[\", \"{\" at line 70

I cannot think of any instance in the configuration where whitespace is not allowed.

I have no idea why this happened honestly. Usually VS Code, when you type a {, will automatically expand it into { }, with a whitespace between the two braces. Never had a problem with that since using VS Code, but something weird might have happened.

Now it works just fine.

Thanks for your help, greatly appreciate it! :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.