Access fields from input plugin (cloudwatch_logs_importer)

Hi,

I am using the cloudwatch_logs_importer plugin to read and grok logs from cloudwatch.
It all works fine, but I am facing issues to access a field which is created by the input module itself:

[cloudwatch_logs][log_stream]
test_app.1.xydqwtwpkrgo3gvjppxxh0iar

I want to extract app.1 from this field, but I am unable to access it.
Simply adding a new field based on it already does not work:

mutate {
     add_field => { "node" => "%{[cloudwatch_logs][log_stream]}" }
}

And results in a new field:

node %{[cloudwatch_logs][log_stream]}

But does not contain the actual contents.

What am I missing here?

I am using a simple pipeline:

input {
    cloudwatch_logs_importer {
        log_groups => [ "/group/prod/envx" ]
        .........
     }   
} 
filter {
                if "SalesCount" in [message] {
                grok {
                        match => {
                        "message" => "\*SalesCount\* +(%{NUMBER:SalesCount:int});?\*SalesCount\*"
                        }

  mutate {
       add_field => { "node" => "%{[cloudwatch_logs][log_stream]}" }
  }
}

output {
                elasticsearch {
                     template_name => "app-log-test"
                     hosts => ['https://elastic:9200']
                     ......
                }
}

What am I missing?

Can you use ruby debug and show cloudwatch_logs JSON structure?
If this is the value: [cloudwatch_logs][log_stream] : "test_app.1.xydqwtwpkrgo3gvjppxxh0iar" you should use gsub of grok to extact a part of value. Since there is node %{[cloudwatch_logs][log_stream]}, it means it's empty from some reason.

@Rios
I have added

stdout { codec => rubydebug }

in the output section.
Below you will find the JSON data it produces, for readability I have removed the timestamps

{
    "hostname" => "testserver",
    "requestEndTime" => "2023-06-22 07:46:46,095",
    "computationalTime" => 107.0,
    "@version" => "1",
    "[cloudwatch_logs][log_stream]" => "test_app.1.xydqwtwpkrgo3gvjppxxh0iar",
    "requestTotalTime" => 106.0,
    "[cloudwatch_logs][event_id]" => "37630723598827586088574315395387495174745513581372309504",
    "tags" => [
         [0] "TestTag"
    ],
    "requestStartTime" => "22/Jun/2023:09:46:45 +0200",
    "[cloudwatch_logs][ingestion_time]" => 1687420011217,
    "[cloudwatch_logs][log_group]" => "/group/prod/envx",
    "lbip" => "192.168.0.123",
    "@timestamp" => 2023-06-22T07:46:45.000Z
 }
{
    "hostname" => "testserver",
    "requestEndTime" => "2023-06-22 07:46:54,297",
    "computationalTime" => 83.0,
    "@version" => "1",
    "[cloudwatch_logs][log_stream]" => "test_app.1.xydqwtwpkrgo3gvjppxxh0iar",
    "requestTotalTime" => 82.0,
    "[cloudwatch_logs][event_id]" => "37630723781738298206922486407856083795682725195603902464",
    "tags" => [
         [0] "TestTagTwo"
    ],
    "requestStartTime" => "22/Jun/2023:09:46:54 +0200",
    "[cloudwatch_logs][ingestion_time]" => 1687420015843,
    "[cloudwatch_logs][log_group]" => "/group/prod/envx",
    "lbip" => "192.168.0.123",
    "@timestamp" => 2023-06-22T07:46:54.000Z
 }

I see basically there is a value present in "[cloudwatch_logs][log_stream]", which I also see in Elasticsearch.

This is not a nested field.
Can you try with:

mutate {
       add_field => { "node" => "%{"[cloudwatch_logs][log_stream]"}" }
  }

I added:

mutate {
        add_field => { 
          "node" => "%{"[cloudwatch_logs][log_stream]"}"
        }
}

In the filter section, but logstash keeps crashing:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \"}\" at line 57, column 35 (byte 3278) after filter

Line 57 is this part:

"node" => "%{"[cloudwatch_logs][log_stream]"}"

@Rios
If I add:

  mutate {
                  add_field => {
                    "node" => "%{"[cloudwatch_logs][log_stream]"}"
                  }
                }

Logstash crashed with:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \"}\" at line 57, column 35 (byte 3278) after filter

if I add:

  mutate {
                  add_field => {
                    "node" => "%{[cloudwatch_logs][log_stream]}"
                  }
                }

I see in elasticsearch json document:

    "node": "%{[cloudwatch_logs][log_stream]}",

Which version of LS are using?

Normally you should have a nested field:

   "cloudwatch_logs" => {
        "log_stream" => "test_app.1.xydqwtwpkrgo3gvjppxxh0iar"
    },

Not the [cloudwatch_logs][log_stream] field starting with brackets : "[cloudwatch_logs][log_stream]" => "test_app.1.xydqwtwpkrgo3gvjppxxh0iar"

I cannot reproduce this since couldn't generate your named field, as mutate add_field or ruby code make the nested by default. Even the documentation

This is pretty weird and I'm not sure you can solve this in Logstash.

The main issue is that this third-party plugin seems to be creating the fields wrongly, looking at the code this seems to be how the fields are created:

        val processLog : (String, FilteredLogEvent)->Unit = { logGroup, it ->
            consumer.accept(mutableMapOf(
                "@timestamp" to Timestamp(it.timestamp()),
                "[cloudwatch_logs][ingestion_time]" to it.ingestionTime(),
                "[cloudwatch_logs][log_group]" to logGroup,
                "[cloudwatch_logs][log_stream]" to it.logStreamName(),
                "[cloudwatch_logs][event_id]" to it.eventId(),
                "message" to it.message()
            ))
        }

The fields are not nested under a object named cloudwatch_logs, they are just fields with square brackets in the name.

But when they arrive in the logstash pipeline, every logstash filter will try to treat those fields as nested fields, so the mutate filters will not work.

I wasn't able to replicate this issue because I could not create fields with square brackets in the name.

Since your output is Elasticsearch, my suggestion would be to try to solve this using a rename processor on a ingest pipeline, in Elasticsearch nested fields are referenced with a dot, not with square brackets like in logstash.

2 Likes

Using logstash 8.8.1

@leandrojmp

Thanks for looking at the code of the third party plugin.
Since I am using a corporate shared elasticsearch environment I am not able to use a rename processor.

The best thing I can do is creating an issue on github repo of the developer of this custom plugin and point to this post as an reference.

Thanks for the support

@Rios
Many thanks for your support, I will create an issue in the github repo of this custom plugin.

Not sure if I got what you mean, but if you can create an ingest pipeline for this case you do not to rename, you can just try to create the node field with the Ingest pipeline since it is not working with Logstash.

This can be done using the set processor.

Renaming the field was just a suggestion.

Any reference or example how that can be done in Logstash?

It is not in Logstash, it is in Elasticsearch using an ingest pipeline.

You would need to create an ingest pipeline in Elasticsearch with a set processor that would copy the value of the field [cloudwatch_logs][log_stream] to the field node.

Something like this:

PUT _ingest/pipeline/cloudwatch-set-node-field
{
  "description": "sets the value of node field",
  "processors": [
    {
      "set": {
        "field": "node",
        "value": "{{{[cloudwatch_logs][log_stream]}}}",
        "ignore_empty_value": true,
        "ignore_failure": true
      }
    }
  ]
}

The in your Logstash output you need to tell it ro use this ingest pipeline when sending data to Elasticsearch

Just add this line to the output configuration:

pipeline => "cloudwatch-set-node-field"

I'm not sure if this will work, but in Elasticsearch the nested fields are referenced by dots, not square brackest, so it may work.

1 Like

I wasn't able to replicate this issue because I could not create fields with square brackets in the name.

Me too, I had lost several hours to figure out that can't create such kind of field.

Badger do you have any idea how to solve this?

When I investigated this morning I couldn't find a way to create a field with square brackets in the name, but this afternoon I was reading through Event.java and thought of a way I hadn't tried. (Event.from_json called by the codec, which bypasses the FieldReference stuff done when you go through Event.SetField)

input { generator { count => 1 lines => [ '{ "[foo][bar]": 1 }' ] codec => json } }
output { stdout { codec => rubydebug { metadata => false } } }

results in

"[foo][bar]" => 1,
2 Likes

Yes,

That could be a solution, but then I am doing processing at two levels, one in logstash, where it should take place and also something in elasticsearch.

Obviously I want to do more than only getting data in the field node.
I want to process the contents of [cloudwatch_logs][log_stream] and strip of some parts of the data.
This is an activity that should take place in logstash.

So I still see two options:

  1. We find a trick how to deal with it in logstash

  2. The custom plugin developer changes his code to be more compliant so it becomes a nested field or a normal field from within the plugin itself.

I am also reading the response of @Badger but for me its not exactly clear what he is suggesting other then a way how to pupulate "[foo][bar]".

I couldn't get value for that field even Badger made the solution how to set the field with .

Yeah, but the main issue is that I do not think that this can be solved directly in Logstash, at least not without an auxiliary pipeline.

There are basically two ways to manipulate data from fields in Logstash one is using the mutate filter and the other is using the ruby filter with some ruby code, the problem is none of those filters can access a field named [top-level][nested].

When you reference a field in logstash in that way it interpretes as being a top-level field with a nested field.

For logstash [top-level][nested] is { "top-level": { "nested": "value" } }, which is not the case, so none of the filters used to manipulate data can get this data.

It is an edge case that needs to be solved in the source, which is the third-party plugin.

The trick here is to use an extra pipeline, this pipeline would just have the cloudwatch_logs_importer input and it would output the data to file using the file output.

Then your main pipeline would read from this file and before parsing the message it would remove the [ and ] characters.

Thanks to the pipeline @Badger shared I was able to create a field with square brackets and simulate this.

I used this pipeline to simulate the creation of a field with square brackets in the name and output it to a file:

#
input { 
    generator { 
        count => 1 
        lines => [ '{ "[cloudwatch][log_stream]": "logStreamName" }' ] 
        codec => json 
    }
}
#
output {
    file {
        path => "/opt/data/discuss.log"
    }
}
#

This is the output line in the file:

{"@timestamp":"2023-06-23T11:53:00.155903042Z","host":"lab","@version":"1","[cloudwatch][log_stream]":"logStreamName","sequence":0}

Now I can read this file and replace the [ and ] characters with this pipeline:

#
input { 

    file {
        path => "/opt/data/discuss.log"
        sincedb_path => "/dev/null"
        start_position => "beginning"
    }
}
#
filter {

    mutate {
        gsub => ["message", "[\]\[]", "_"]
    }
    json {
        source => "message"
        remove_field => ["message"]
    }
}
#
output {
    stdout {}
}
#

And this is the final output:

{
                  "@timestamp" => 2023-06-23T11:53:00.155903042Z,
                    "sequence" => 0,
                    "@version" => "1",
                        "host" => "lab",
    "_cloudwatch__log_stream_" => "logStreamName",
                        "path" => "/opt/dados/stone/discuss.log"
}

The field named [cloudwatch][log_stream] was renamed to _cloudwatch__log_stream_ and it is a normal field now that can be used on mutate or ruby filters.

Unless the developer of the plugin changes how the plugin creates those fields, you will need this workaround to work with it.

1 Like

That is all I was suggesting :smiley: I was not addressing your actual problem.