Increment custom field ID on certain event using logstash conf file

Problem: I want to increment trap_id on every process.php request. but in my case, trap_id is always 0. I dont know why. Please help me

My log file is in csv format. I am trying to fetch the periodical logs of a single user(i.e IP). I am trying to group the requests made in between login and logout request. My log file looks like below

10.128.2.1,29/Nov/2017:06:58:55,GET /home.php HTTP/1.1,200
10.128.2.1,29/Nov/2017:06:58:55,GET /login.php HTTP/1.1,200
10.128.2.1,29/Nov/2017:06:59:02,POST /process.php HTTP/1.1,302
10.128.2.1,29/Nov/2017:06:59:03,GET /about.php HTTP/1.1,200
10.128.2.1,29/Nov/2017:07:05:53,GET /logout.php HTTP/1.1,302

I want to add two more fields in log.conf file.

  1. seq_id - to find if page is requested when logged-in
  2. trap_id - category_id

NOTE: whenever process.php is requested, seq_id = 1 and trap_id++ to be done, and trap_id for consequent requests is same id as given to process.php. Whenever logout.php is requeseted, seq_id = 0

I am expecting the below field values when I visualize in kibana.

			seq_id		trap_id
/home.php     - 	0		0
/login.php    - 	0		0

/process.php  - 	1		1
/about.php    - 	1		1
/logout.php   - 	1		1	

/home.php     - 	0		0
/home.php     - 	0		0
/login.php    - 	0		0

/process.php  - 	1		2
/about.php    - 	1		2
/logout.php   - 	1		2

I have also added my log.conf file

input {
    file {
        path => "path_to_log/log.csv"
        start_position => "beginning"
    }
}
filter {
    csv {
        separator => ","
        skip_header => "true"
        columns => ["IP","DateTime","URL","Status"]
    }
    grok {
        match => {
            "URL" => ["%{WORD:method} %{DATA:request} HTTP/%{NUMBER:httpversion}"]
        }
    }
    grok {
        match => {
            "DateTime" => ["%{DATA:Date}\:%{TIME:Time}"]
        }
    }
    grok {
        match => {
            "Date" => ["%{MONTHDAY:day}/%{MONTH:month}/%{YEAR:year}"]
        }
    }
    grok {
        match => {
            "Time" => ["%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}"]
        }
    }
    ruby {
        init => '@trap_id = 0'
        code => 'event.set("seq_id", 0)'
    }
    if [request] == "/process.php" {
        mutate { add_field => ["label", "1"] }
        ruby {
            code => '
                @trap_id += 1
                event.set("seq_id", 1)
                event.set("trap_id", @trap_id)
            '
        }
        mutate { convert => { "trap_id" => "integer" } }
    } else if [request] == "/logout.php" {
        mutate { add_field => ["label", "2"] }
        ruby {
            code => '
                event.set("trap_id", @trap_id)
                event.set("seq_id", 0)
            '
        }
        mutate { convert => { "trap_id" => "integer" } }
    } else {
        mutate { add_field => ["label", "0"] }
        ruby {
            code => '
                if event.get("seq_id").to_i == 1
                    event.set("trap_id", @trap_id)
                else
                    @trap_id = 0
                    event.set("trap_id", @trap_id)
                end
            '
        }
        mutate { convert => { "trap_id" => "integer" } }               
    }
}   
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "logdb2"
        user => "elastic"
        password => "my_elastic_pass"
    }
}

Those variables have instance scope, so in each instance of a ruby filter it refers to a different variable. If you want to share a variable across different instances of a ruby filter you should make it a class variable @@trap_id

Also, you need the order of events to be preserved, so you will need to set pipeline.workers to 1 and pipeline.ordered to true.

Thanks for the help. After changing @trap_id to @@trap_id, increments the field. But this was not my expected result. May be my logic was not correct. The below was the result I got:

My log file doesn't have a sequence of logs from same IP address. They are mixed. Like first two log lines are for IP 10.128.2.1 and next two for IP 10.131.0.1. For the below dataset, I want the respective trap_id accordingly.

				trap_id

10.128.2.1 /home.php     - 	0
10.128.2.1 /login.php    - 	0

10.128.2.1 /process.php  - 	1
10.128.2.1 /about.php    - 	1
10.128.2.1 /logout.php   - 	1	

10.131.2.1 /home.php     - 	0
10.131.2.1 /login.php    - 	0
10.128.2.1 /home.php     - 	0
10.128.2.1 /home.php     - 	0
10.128.2.1 /login.php    - 	0

10.131.2.1 /home.php     - 	0
10.131.2.1 /login.php    - 	0
10.131.2.1 /process.php  - 	2
10.131.2.1 /about.php    - 	2
10.128.2.1 /process.php  - 	3
10.131.2.1 /logout.php   - 	2
10.128.2.1 /about.php    - 	3
10.128.2.1 /logout.php   - 	3

In this data sample, the last 8 lines is a mixture of 2 ip_addresses. Though it is not a sequence of same IP and I want trap_id to be separate for unique log-in session of that IP.

For example:
Two IP addresses 10.128.2.1, 10.131.2.1.
Say, On Nov 10, 2022, two clients tried login and logout.
10.128.2.1 has logged-in twice, while
10.131.2.1 logged-in once on the same day.

Then the trap_id will be,
First login session of 10.128.2.1 - trap_id = 1
First login session of 10.131.2.1 - trap_id = 2
Second login session of 10.128.2.1 - trap_id = 3

Any suggestion would be helpful to me. Thanks in advance.

For the data you show the following works

    grok { match => { "message" => "%{IPV4:ip} %{URIPATH:uri}" } }
    aggregate {
        task_id => "%{ip}"
        code => '
            @trap ||= 0
            uri = event.get("uri")
            if uri == "/login.php"
                trap = 0
            elsif uri == "/process.php"
                @trap += 1
                map["trap"] = @trap
                trap = map["trap"]
            elsif uri == "/logout.php"
                trap = map["trap"]
                map["trap"] = 0
            else
                trap = map["trap"] ? map["trap"] : 0
            end
            event.set("trap", trap)
        '
    }
1 Like

Thank you very much. You made my day. Your code is working fine and matching my expected results. One more request, I want to find if a request is triggered by human or by some scripts like POSTMAN. Is there a way in logstash to find this?

If there is some pattern in the data that shows whether it is a human you can probably get logstash to recognize that, but I have no idea what that pattern would be.

I will check for the pattern. Thanks for the timely response.

Is there a way to find patterns during aggregate. If my log file is like below,

54.38.144.149 - - [06/Nov/2022:07:06:00 +0530] "GET /user/login HTTP/1.1" 200 2709 "-" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
54.38.144.149 - - [06/Nov/2022:07:06:03 +0530] "POST /user/login HTTP/1.1" 200 2843 "-" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
157.49.196.254 - - [06/Nov/2022:07:07:17 +0530] "GET /staff_profile/faculty/bootstrap/css/bootstrap.min.css HTTP/1.1" 200 155845 "-" "Mozilla/5.0 (Linux; Android 10; Redmi 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Mobile Safari/537.36"

I want to extract the uri of the request made and aggregate based on it. Is there a way to use regex in IF condition of the below code

aggregate {
        task_id => "%{IP}"
        code => '
            @trap ||= 0
            request = event.get("request")
            if request == "/process.php"
                @trap += 1
                map["trap"] = @trap
                trap = map["trap"]
            elsif request == "/logout.php"
                trap = map["trap"]
                map["trap"] = 0
            else
                trap = map["trap"] ? map["trap"] : 0
            end
            event.set("trap", trap)
        '
    }

Yes, you can use =~ instead of ==.

Thanks. I tried with below aggregate code.

aggregate {
        task_id => "%{clientip}"
        code => '
            @trap ||= 0
            request = event.get("request")
            if request =~ "login"
                @trap += 1
                map["trap"] = @trap
                trap = map["trap"]
            elsif request =~ "logout"
                trap = map["trap"]
                map["trap"] = 0
            else
                trap = map["trap"] ? map["trap"] : 0
            end
            event.set("trap", trap)
        '
    }
aggregate {
        task_id => "%{clientip}"
        code => '
            @trap ||= 0
            request = event.get("request")
            if request =~ /^login.*/
                @trap += 1
                map["trap"] = @trap
                trap = map["trap"]
            elsif request =~ /^logout.*/
                trap = map["trap"]
                map["trap"] = 0
            else
                trap = map["trap"] ? map["trap"] : 0
            end
            event.set("trap", trap)
        '
    }

Both of the above technique doesn't work.
Could you please rewrite the aggregate code with regex pattern?

@Badger Thanks for your help. I found the solution to my problem. Just posting if anyone finds it helpful.

The below code in logstash conf groups events for every unique login session till logout.

aggregate {
        task_id => "%{clientip}"
        code => '
            @trap ||= 0
            request = event.get("request")
            if request.to_s.include? "login"
                @trap += 1
                map["trap"] = @trap
                trap = map["trap"]
            elsif request.to_s.include? "logout"
                trap = map["trap"]
                map["trap"] = 0
            else
                trap = map["trap"] ? map["trap"] : 0
            end
            event.set("trap", trap)
        '
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.