Handling events after using split

Hi,

I'm trying to figure out how to use the split-filter in Logstash correctly. I've read forum-posts and the doc, but don't get to the bottom of this.

I'm using the Azure Eventhub plugin to ingest stream of logs. The events comes in bulks (array) like this:

"message": {
    "records": [
        { "time": "2020-06-05T08:19:03.6240267Z", "tenantId": "", ... , "Timestamp":"2020-06-05T08:16:14.8727964Z" },
        { "time": "2020-06-05T08:19:04.6240267Z", "tenantId": "", ... , "Timestamp":"2020-06-05T08:16:14.8727964Z"},
        ...
    ]
}

My current filter seems to work:

filter {
    json {
        source => "message"
    }
    split {
        field => ["records"]
        remove_field => [ "message" ]
    }
}

It splits the records within the field "message" correctly.

Q1: But how am I able to handle these separate records after they are split? I see the doc list some options. Are there any ways to handle the separate records more advanced than using these common options? E.g. if I want to run other plugins on the records after they are split?

Q2: I've tried to replace the @timestamp field of each record with the actual log timestamp - without luck. I see that the update-option is not available using split. After using the split-filter the actual log timestamp appear as a field called "records.properties.Timestamp". So this works:

filter {
    json {
        source => "message"
    }
    split {
        field => ["records"]
        remove_field => [ "message" ]
        add_field => { "newTimestampField" => "%{[records][properties][Timestamp]}" }
    }
}

But how am I able to replace the @timestamp with the [records][properties][Timestamp] ?

Thaks a million :slight_smile:

Use a date filter after the split.

Thanks for replying!

I tested a bit more and I got it to work.

It seems like everything after the split is performed on each record-element.

For the record I ended up with this which works for me;

######### Input #########

# Azure Eventhub
input {
    azure_event_hubs {
        event_hub_connections => ["removed"]
        threads => 3
        decorate_events => true
        consumer_group => "removed"
        storage_connection => ""
        storage_container => "removed"
        type => "azure_event_hub"
    }
}

######### Filter #########

filter {
    json {
        source => "message"
    }

    split {
        field => ["records"]
        remove_field => [ "message" ]
    }
    
    mutate {
        add_field => {"ingest.time" => "%{@timestamp}"}
    }
    
    date {
        match => ["[records][properties][Timestamp]", "ISO8601"]
        target => "@timestamp"
    }
    
    if [records][category] == "cat1" {
        mutate {
            add_tag => [ "cat1" ]
        }
    } else if [records][category] == "cat2" {
        mutate {
            add_tag => [ "cat2" ]
        }
    } else [records][category] == "cat3" {
        mutate {
            add_tag => [ "cat3" ]
        }
    } 
}

########## Output #########
output {
    elasticsearch {
        hosts => ["host:9200"]
        index => "index"
        manage_template => false
    }
}
2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.