Copy contents of one field to same field of another row if certain condition matches

I need to copy contents of “Service” field from No. 15256 into the :path field of No.15257 if “Stream_Identifier” of both lines in 15256 and 15257 are matching.
Kindly suggest how to get this done in the conf file of logstash or any other way to achieve the same
Please refer to the image/snippet attached for the above query.

Also given below is my logstash conf file for ref

input {

file {
path => "/home/student/test-csv/test-http2-1.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter {
csv {
separator => ","
skip_header => "true"
columns => ["Time","No.","Source","Destination","Protocol","Length","HTTP_Header_Status","Stream_Identifier","method",":path","String value","Key","Info"]
}
date {
match => [ "Time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
target => "@timestamp"
timezone => "UTC"
}

    mutate {

            copy => { ":path" => "Service_Request" }
    }
 
    mutate {
            split => { ":path" => "/" }
            add_field => { "Service" => "%{[:path][1]}" }
            remove_field => ["message","host","path","@version"]
    }
 
    aggregate {
            task_id => "%{Stream_Identifier}"
            code => '
                    p = event.get("Service")
                            if p
                                    map["Service"] = p
                            elsif map["Service"]
                                    event.set("Service", map["Service"])
                            end
                    '
            }
    mutate {
            remove_field => [":path"]
    }

}

output {

Elasticsearch {
hosts => "http://localhost:9200"
index => "demo-http-csv-split-2"
}

stdout { codec => rubydebug }

}

You need to make adding the "Service" field conditional on the field [:path][1] existing. Take the add_field out of the mutate+split filter.

if [:path][1] { mutate { add_field => { "Service" => "%{[:path][1]}" } } }

Hi Badger - Thanks for your response. Here is sharing the updated logstash conf file as per your suggestion.

input {

file {
path => "/home/student/test-csv/test-http2-1.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter {
csv {
separator => ","
skip_header => "true"
columns => ["Time","No.","Source","Destination","Protocol","Length","HTTP_Header_Status","Stream_Identifier","method",":path","String value","Key","Info"]
}
date {
match => [ "Time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
target => "@timestamp"
timezone => "UTC"
}

    mutate {
            copy => { ":path" => "Service_Request" }
    }

	mutate {
            split => { ":path" => "/" }
    }

    if [:path][1] { mutate { add_field => { "Service" => "%{[:path][1]}" } } }
    
    mutate {
			remove_field => ["message","host","path","@version"]
    }

    aggregate {
            task_id => "%{Stream_Identifier}"
            code => '
                    p = event.get("%{Service}")
                            if p
                                    map["Service"] = p
                            elsif map["Service"]
                                    event.set("Service", map["Service"])
                            end
                    '
            }
    mutate {
            remove_field => [":path"]
    }

}

output {

    elasticsearch {
            hosts => "http://localhost:9200"
            index => "demo-http-csv-split"
            }

    file {
            path => "/tmp/my_output_text_file"
            codec => rubydebug
            }
    stdout {
            codec => rubydebug
            }

}

However still the Service field value from 15256 is not getting populated in 15257. Snippet given below for reference. Any modification required in code or aggregation logic ?

Does your configuration comply with the limits on pipeline.workers and pipeline.ordered that are documented for the aggregate filter?

Snippet of the pipeline.ordered and pipeline.worker setting in my logstash.yml file.

:path is set (mutate+copy) to the value of Service_Request. Since that is empty [:path] will be empty.

Yes, but the requirement is to copy the contents of Service_Request from the row which has an identical Stream_Identifier.

i.e. to copy Service_Request contents from Row no. 15256 in Row No. 15257 as their Stream_Identifier matches.

Then change your aggregate filter to process [Service_Request] rather than [Service], and move the code that extracts [Service] from [Service_Request] to execute after the aggregate.

I have carried out suggested changes but still no joy. I have shared the updated conf file below. Let me know if the same is ok or any changes required.

input {

file {
path => "/home/student/test-csv/test-http2-1.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter {
csv {
separator => ","
skip_header => "true"
columns => ["Time","No.","Source","Destination","Protocol","Length","HTTP_Header_Status","Stream_Identifier","method",":path","String value","Key","Info"]
}
date {
match => [ "Time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
target => "@timestamp"
timezone => "UTC"
}

    mutate {
            copy => { ":path" => "Service_Request" }
    }

            mutate {
            split => { ":path" => "/" }
    }

    mutate {
                            remove_field => ["message","host","path","@version"]
    }

    aggregate {
            task_id => "%{Stream_Identifier}"
            code => '
                    p = event.get("%{Service_Request}")
                            if p
                                    map["Service_Request"] = p
                            elsif map["Service_Request"]
                                    event.set("Service_Request", map["Service_Request"])
                            end
                    '
            }

    if [:path][1] { mutate { add_field => { "Service" => "%{[:path][1]}" } } }

    mutate {
            remove_field => [":path"]
    }

}

output {

    elasticsearch {
            hosts => "http://localhost:9200"
            index => "pcap-csv"
            }

    file {
            path => "/tmp/my_output_text_file"
            codec => rubydebug
            }
    stdout {
            codec => rubydebug
            }

}

Remove all the remove_field options, so that you keep all the fields, then post the two events from "/tmp/my_output_text_file" that you want to aggregate (redacted as necessary, but keep [:path], [Service_Request], and [Service]).

Please find the sample two events from the /tmp/my_output_text_file.

For the highlighted Stream_Identifier I want Service_Request from No. => 274931 entry to be copied to Service_Request field of entry in No. => 274932

Adding the text o/p of the two events of the snippet...

{
"message" => ""2022-02-07 19:50:41.590957","274931","10.242.14.148","10.242.22.77","HTTP2","106","","1296227","","/nsmsf-sms/v2/ue-contexts/imsi-525058131708832","","","[TCP Spurious Retransmission] , HEADERS[1296227]"\r",
"Protocol" => "HTTP2",
"Length" => "106",
"host" => "es7",
"path" => "/home/student/test-csv/test-http2-1.csv",
"Stream_Identifier" => "1296227",
":path" => [
[0] "",
[1] "nsmsf-sms",
[2] "v2",
[3] "ue-contexts",
[4] "imsi-525058131708832"
],
"Service_Request" => "/nsmsf-sms/v2/ue-contexts/imsi-525058131708832",
"Info" => "[TCP Spurious Retransmission] , HEADERS[1296227]",
"Service" => "nsmsf-sms",
"Destination" => "10.242.22.77",
"@version" => "1",
"@timestamp" => 2022-02-07T19:50:41.590Z,
"method" => "",
"Key" => "",
"Time" => "2022-02-07 19:50:41.590957",
"String value" => "",
"Source" => "10.242.14.148",
"No." => "274931",
"HTTP_Header_Status" => ""
}
{
"message" => ""2022-02-07 19:50:41.628290","274932","10.242.22.77","10.242.14.148","HTTP2","71","204","1296227","","","","","[TCP ACKed unseen segment] , HEADERS[1296227]: 204 No Content"\r",
"Protocol" => "HTTP2",
"Length" => "71",
"host" => "es7",
"path" => "/home/student/test-csv/test-http2-1.csv",
"Stream_Identifier" => "1296227",
":path" => ,
"Service_Request" => "",
"Info" => "[TCP ACKed unseen segment] , HEADERS[1296227]: 204 No Content",
"Destination" => "10.242.14.148",
"@version" => "1",
"@timestamp" => 2022-02-07T19:50:41.628Z,
"method" => "",
"Key" => "",
"Time" => "2022-02-07 19:50:41.628290",
"String value" => "",
"Source" => "10.242.22.77",
"No." => "274932",
"HTTP_Header_Status" => "204"
}

You do not want a sprintf reference there. Try

    csv {
        skip_header => true
        columns => ["Time","No.","Source","Destination","Protocol","Length","HTTP_Header_Status","Stream_Identifier","method",":path","String value","Key","Info"]
    }
    date {
        match => [ "Time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
        target => "@timestamp"
        timezone => "UTC"
    }
    if [:path] != "" { mutate { copy => { ":path" => "Service_Request" } } }
    aggregate {
        task_id => "%{Stream_Identifier}"
        code => '
            p = event.get("Service_Request")
            if p
                map["Service_Request"] = p
            elsif map["Service_Request"]
                event.set("Service_Request", map["Service_Request"])
            end
        '
    }
    if [Service_Request] {
        mutate { copy => { "Service_Request" => "[@metadata][path]" } }
        mutate { split => { "[@metadata][path]" => "/" } }
        if [@metadata][path][1] { mutate { add_field => { "Service" => "%{[@metadata][path][1]}" } } }
    }

I think the below code is working on a row basis.

Ideally the o/p of below code should have Service_Request field populated in the entry on No. 15257 from No.15256. But the same doesn't seem to happen due to which we still have the Service_Request field empty in No.15257

aggregate {
    task_id => "%{Stream_Identifier}"
    code => '
        p = event.get("Service_Request")
        if p
            map["Service_Request"] = p
        elsif map["Service_Request"]
            event.set("Service_Request", map["Service_Request"])
        end

'

Sample o/p after having the above suggestion of logstash conf file

{
"path" => "/home/student/test-csv/test-http2-1.csv",
"Service_Request" => [
[0] "",
[1] "nsmsf-sms",
[2] "v2",
[3] "ue-contexts",
[4] "imsi-525058131708832"
],
"Length" => "106",
"HTTP_Header_Status" => "",
"message" => ""2022-02-07 19:50:41.590957","274931","10.242.14.148","10.242.22.77","HTTP2","106","","1296227","","/nsmsf-sms/v2/ue-contexts/imsi-525058131708832","","","[TCP Spurious Retransmission] , HEADERS[1296227]"\r",
"Info" => "[TCP Spurious Retransmission] , HEADERS[1296227]",
"Destination" => "10.242.22.77",
"host" => "es7",
"Stream_Identifier" => "1296227",
"No." => "274931",
":path" => [
[0] "",
[1] "nsmsf-sms",
[2] "v2",
[3] "ue-contexts",
[4] "imsi-525058131708832"
],
"method" => "",
"@timestamp" => 2022-02-07T19:50:41.590Z,
"Source" => "10.242.14.148",
"String value" => "",
"Time" => "2022-02-07 19:50:41.590957",
"Service" => "nsmsf-sms",
"Protocol" => "HTTP2",
"Key" => "",
"@version" => "1"
}

{
"path" => "/home/student/test-csv/test-http2-1.csv",
"Service_Request" => , ===> No contents in this field
"Length" => "71",
"HTTP_Header_Status" => "204",
"message" => ""2022-02-07 19:50:41.628290","274932","10.242.22.77","10.242.14.148","HTTP2","71","204","1296227","","","","","[TCP ACKed unseen segment] , HEADERS[1296227]: 204 No Content"\r",
"Info" => "[TCP ACKed unseen segment] , HEADERS[1296227]: 204 No Content",
"Destination" => "10.242.14.148",
"host" => "es7",
"Stream_Identifier" => "1296227",
"No." => "274932",
":path" => , ====> No contents in this field
"method" => "",
"@timestamp" => 2022-02-07T19:50:41.628Z,
"Source" => "10.242.22.77",
"String value" => "",
"Time" => "2022-02-07 19:50:41.628290",
"Protocol" => "HTTP2",
"Key" => "",
"@version" => "1"
}

Hi - Any insights or suggestions ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.