Copy content from one field of another record if certain condition matches

Hi

I need to copy contents of :path field from No. 15256 into the :path field of No.15257 if Stream_Identifier of both lines in 15256 and 15257 are matching.

Kindly suggest how to get this done in the conf file of logstash or any other way to achieve the same

Please refer to the image/snippet attached for the above query. Also attached is snippet of the my logstash conf file for ref

You could use an aggregate filter for that. Perhaps something like

aggregate {
    task_id => "Service_Identifier"
    code => '
        p = event.get(":path")
        if p
            map["path"] = p
        elsif map["path"]
            event.set(":path", map["path"])
        end
    '
}

Make sure you read the documentation about the restriction on workers when using this filter and also ensure that pipeline.ordered evaluates to true.

Thanks Badger for your time and prompt reply. I incorporated the suggestion in the conf file. Snippet given below, however the desired result is not seen

How to write the syntax to compare the Stream_Identifier and if they are equal then copy the contents of Service from 15256 row into the Service field in 15257...

Also in this example the rows where the fields are compared and copied are adjacent, there might be a case where they aren't adjacent how to factor this possibility as well ?

Desired result is to populate contents in Service field in No. 15256 copied to Service filed in No. 15257 as the Stream_Identifier are same for both the rows

I am very new to the ELK world and might be missing some trivial thing here. Please suggest

You said you wanted to copy [:path] from one event to another based on the value of [Stream_Identifier]. In that case the task_id should be "%{Stream_Identifier}", not a copy of [:path].

Thanks Badger for your response.

Yeah there was a change in requirement hence the change in my snippet. But from the above snippet if I want to copy contents of "Service" field from No 15256 to No 15257 when there is a matching Stream_Identifier then is the following aggregation code ok or you suggest any change/correction

    aggregate {
            task_id => "%{Stream_Identifier}"
            code => '
                    p = event.get("Service")
                            if p
                                    map["Service"] = p
                            elsif map["Service"]
                                    event.set("Service", map["Service"])
                            end
                    '
            map_action => "update"
            }

That looks reasonable. I have not tested it.

Hi Badger

I have tested the above aggregation code however the field content of Service from No 15256 to No 15257 is not getting populated.

Pasting the filter section of my logstash conf file for reference

filter {
csv {
separator => ","
skip_header => "true"
columns => ["Time","No.","Source","Destination","Protocol","Length","HTTP_Header_Status","Stream_Identifier","method",":path","String value","Key","Info"]
}
date {
match => [ "Time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
target => "@timestamp"
timezone => "UTC"
}

    mutate {
            copy => { ":path" => "Service_Request" }
    }
    aggregate {
            task_id => "%{Stream_Identifier}"
            code => '
                    p = event.get("Service")
                            if p
                                    map["Service"] = p
                            elsif map["Service"]
                                    event.set("Service", map["Service"])
                            end
                    '
            map_action => "update"
            }
    mutate {
            split => { ":path" => "/" }
            add_field => { "Service" => "%{[:path][1]}" }
            remove_field => ["message","host","path","@version"]
    }
    mutate {
            remove_field => [":path"]
    }

}

Please do let me know if anything is amiss

The mutate that creates the [Service] field has to come before the aggregate that references it.

Hi Badger

I have made the suggested changes, however still the Service field not getting populated as desired

input {

file {
path => "/home/student/test-csv/test-http2-1.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter {
csv {
separator => ","
skip_header => "true"
columns => ["Time","No.","Source","Destination","Protocol","Length","HTTP_Header_Status","Stream_Identifier","method",":path","String value","Key","Info"]
}
date {
match => [ "Time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
target => "@timestamp"
timezone => "UTC"
}

    mutate {
            copy => { ":path" => "Service_Request" }
    }

    mutate {
            split => { ":path" => "/" }
            add_field => { "Service" => "%{[:path][1]}" }
            remove_field => ["message","host","path","@version"]
    }

    aggregate {
            task_id => "%{Stream_Identifier}"
            code => '
                    p = event.get("Service")
                            if p
                                    map["Service"] = p
                            elsif map["Service"]
                                    event.set("Service", map["Service"])
                            end
                    '
            }
    mutate {
            remove_field => [":path"]
    }

}

output {

Elasticsearch {
hosts => "http://localhost:9200"
index => "demo-http-csv-split-2"
}

stdout { codec => rubydebug }

}

What does this produce for the two events you are trying to aggregate?

Hi Badger

Below is the o/p of stdout { codec => rubydebug }

root@es7:/var/log/logstash# cat debugfile.out | more
Using bundled JDK: /usr/share/logstash/jdk
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2022-03-05 14:51:14.134 [main] runner - Starting Logstash {"logstash.version"=>"7.17.0", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fb
d1 OpenJDK 64-Bit Server VM 11.0.13+8 on 11.0.13+8 +indy +jit [linux-x86_64]"}
[INFO ] 2022-03-05 14:51:14.148 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+Us
eCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.int
erruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[WARN ] 2022-03-05 14:51:14.602 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2022-03-05 14:51:16.236 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[INFO ] 2022-03-05 14:51:17.959 [Converge PipelineAction::Create] Reflections - Reflections took 108 ms to scan 1 urls, producing 119 keys and 417 valu
es
[WARN ] 2022-03-05 14:51:18.763 [Converge PipelineAction::Create] plain - Relying on default value of pipeline.ecs_compatibility, which may change in
a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2022-03-05 14:51:18.835 [Converge PipelineAction::Create] file - Relying on default value of pipeline.ecs_compatibility, which may change in
a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2022-03-05 14:51:18.943 [Converge PipelineAction::Create] csv - Relying on default value of pipeline.ecs_compatibility, which may change in a
future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2022-03-05 14:51:19.167 [Converge PipelineAction::Create] plain - Relying on default value of pipeline.ecs_compatibility, which may change in
a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2022-03-05 14:51:19.232 [Converge PipelineAction::Create] Elasticsearch - Relying on default value of pipeline.ecs_compatibility, which may c
hange in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mo
de.
[INFO ] 2022-03-05 14:51:19.556 [[main]-pipeline-manager] Elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::Elasticsearch", :hosts=>["htt
p://localhost:9200"]}
[INFO ] 2022-03-05 14:51:20.114 [[main]-pipeline-manager] Elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://localhost
:9200/]}}
[WARN ] 2022-03-05 14:51:20.558 [[main]-pipeline-manager] Elasticsearch - Restored connection to ES instance {:url=>"http://localhost:9200/"}
[INFO ] 2022-03-05 14:51:20.606 [[main]-pipeline-manager] Elasticsearch - Elasticsearch version determined (7.17.0) {:es_version=>7}
[WARN ] 2022-03-05 14:51:20.607 [[main]-pipeline-manager] Elasticsearch - Detected a 6.x and above cluster: the type event field won't be used to determine
the document _type {:es_version=>7}
[INFO ] 2022-03-05 14:51:20.724 [[main]-pipeline-manager] Elasticsearch - Config is not compliant with data streams. data_stream => auto resolved to false
[INFO ] 2022-03-05 14:51:20.925 [Ruby-0-Thread-10: :1] Elasticsearch - Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[INFO ] 2022-03-05 14:51:21.046 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size
"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/etc/logstash/conf.d/csv-read-3.conf"], :thread=>"#<Thread:0x46ef62f6
run>"}
[INFO ] 2022-03-05 14:51:22.398 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>1.35}
[INFO ] 2022-03-05 14:51:22.477 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2022-03-05 14:51:22.553 [[main]<file] observingtail - START, creating Discoverer, Watch with file and sincedb collections
[INFO ] 2022-03-05 14:51:22.556 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[WARN ] 2022-03-05 14:51:22.951 [[main]<file] plain - Relying on default value of pipeline.ecs_compatibility, which may change in a future major release of
Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.

No, that is the logstash log, I was asking for stdout, which will show the structure of the events that you are trying to aggregate.

Hi Badger,

I am unaware about the location where to fetch the logs.

I generally see the logs on the console when i run the below command

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/csv-read-3.conf

I have invoked the command line to get the verbose/debug level logs as well. However if these are not the ones you are expecting pl guide to locate the same so that I can fetch them as well

Sample data o/p is given below, (if this may help)

{
"@timestamp" => 2022-02-07T19:40:00.488Z,
"No." => "48789",
"method" => "",
"HTTP_Header_Status" => "200",
"Stream_Identifier" => "246505,246505",
"Length" => "222",
"String value" => "AUTHENTICATION_SUCCESS,4F8B776A1ADD57CB2612E550FA503F622B8DCF484D0498613DA8E6B08837AC20",
"Time" => "2022-02-07 19:40:00.488173",
"Destination" => "10.242.14.143",
"Source" => "10.242.21.25",
"Info" => "[TCP Spurious Retransmission] , HEADERS[246505]: 200 OK, DATA[246505], JavaScript Object Notation (application/json)",
"Protocol" => "HTTP2/JSON",
"Service" => "%{[:path][1]}",
"Service_Request" => "",
"Key" => "authResult,kseaf"
}
{
"@timestamp" => 2022-02-07T19:40:00.538Z,
"No." => "48790",
"method" => "",
"HTTP_Header_Status" => "",
"Stream_Identifier" => "1399597,1399597",
"Length" => "458",
"String value" => "4742c3ce-87f7-42d9-8d5e-75ac7d6e43e4,imeisv-3506355824625301,HOMOGENEOUS_SUPPORT,http://10.242.14.150:8080/nudm-uecm/v1.0.2/imsi-525055000090149/registrations/amf-3gpp-access/notification,525,05,010069,NR",
"Time" => "2022-02-07 19:40:00.538474",
"Destination" => "10.242.21.20",
"Source" => "10.242.14.141",
"Info" => "HEADERS[1399597], DATA[1399597], JavaScript Object Notation",
"Protocol" => "HTTP2/JSON",
"Service" => "nudm-uecm",
"Service_Request" => "/nudm-uecm/v1/imsi-525055000090149/registrations/amf-3gpp-access",
"Key" => "amfInstanceId,pei,imsVoPs,deregCallbackUri,mcc,mnc,plmnId,amfId,guami,ratType"

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.