Join two log files in logstash conf

i have an activity log like:

2020-06-02 03:30:50,712 USERID="(not logged in)" IP="103.226.146.203" SESSION="" HOST="h2t57-hrm.gh.pri" HOMEDIR="8.57" SITE="ps" METHOD="GET" HTTP_USER_AGENT="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36" MENU="" COMPONENT="" PAGE="ABC"

And a page log like:

2020-04-07 12:52:30,PAGE="ABC",DESCRIPTION="Dependent Detail",USAGE="HGB",PPI_LEVEL="1"

These log files are parsed using logstash grok and can be seen in kibana

What i need is the page log metadata should get clubbed with activity log when page has same value, please guide, it should be like :

should be able to see activity + page metadata in one line in kibana if page value is same, so join should be on page value

Use an aggregate filter.

Hi Badger

I tried:

aggregate{
task_id => "%{PAGE}"
code => "
map['PAGE'] ||= event.get('PAGE')
map['HTTPAGENT'] ||= event.get('HTTPAGENT')
map['PAGEDESCRIPTION'] ||= event.get('PAGEDESCRIPTION')

    "
    push_previous_map_as_event => true
    timeout => 300

}

But still the logs are coming differently in kibana

Its showing in seperate lines, can this be shown in one line, am i doing anything wrong in aggregate?

You will still have the separate events (if you do not want them you can call event.cancel in the code option of the aggregate filter) but should now have additional events with the combination of fields.

To have multiple events is fine, but i dont get all fields in a single line for one event

The code snippet in logstash is :slight_smile:
aggregate{
task_id => "%{PAGE}"
code => "
map['PAGE'] ||= event.get('PAGE')
map['HTTPAGENT'] ||= event.get('HTTPAGENT')
map['PAGEDESCRIPTION'] ||= event.get('PAGEDESCRIPTION')
"
map_action => "create_or_update"

}

And Kibana dashboard looks like below where PAGEDESCRIPTION is empty for activityevent where httpagent is(3rd line) is there any mistake in code snippet or should i add something:

If the 3rd line arrives at 21:41:26, and the 2nd at 21:42:56 then there should be an event that combines them at 21:47:56, when the 300 second timeout expires (assuming no more events arrive for that value of [PAGE]).

The combine event is not coming, i have the aggregate in filter as:

aggregate{
task_id => "%{PAGE}"
code => "
map['PAGEDESCRIPTION'] ||= event.get('PAGEDESCRIPTION')
map['HTTPAGENT'] ||= event.get('HTTPAGENT')
"
map_action => "create_or_update"
timeout => 36
}

My output is elasticsearch, should i mention anything like codec => or so? i have not kept anything like codec in the output

And i have two if clause in filter{}

like:

filter{
if[log=activity]{}
if[log=page]{}
aggregate{}
}

should i keep aggregate in any of the if clause or out of both if clauses and inside filter, please guide with the placement of aggregate and codec part and if my code snippet for aggregate is correct? After timeout of 36 seconds i dont get any combine event on a common page field value

You have not set either push_map_as_event_on_timeout or push_previous_map_as_event, so the map contents will never get flushed as an event.

aggregate{
task_id => "%{PAGE}"
code => "
map['PAGEDESCRIPTION'] ||= event.get('PAGEDESCRIPTION')
map['HTTPAGENT'] ||= event.get('HTTPAGENT')
"
map_action => "create_or_update"
timeout => 10
push_map_as_event_on_timeout => true
}

Added push_map_as_event_on_timeout still no change dont see the joined event in kibana

My complete logstash conf file, please suggest where i am going wrong as there is no change after adding push_map_as_event_on_timeout:

input{
beats{
port => "5043"
}
}

filter{
if [log_type] == "fullactivelogs" {

    grok {
        match => { "message" => "%{TIMESTAMP_ISO8601:Activity-Time}%{SPACE}USERID=%{QUOTEDSTRING:USERID}%{SPACE}IP=\"%{IP:IP}\"%{SPACE}SESSION=%{QUOTEDSTRING}%{SPACE}HOST=%{QUOTEDSTRING:HOST}%{SPACE}HOMEDIR=%{QUOTEDSTRING:HOMEDIR}%{SPACE}SITE=%{QUOTEDSTRING:SITE}%{SPACE}METHOD=%{QUOTEDSTRING:METHOD}%{SPACE}HTTP_USER_AGENT=%{QUOTEDSTRING:HTTPAGENT}%{SPACE}MENU=%{QUOTEDSTRING:MENU}%{SPACE}COMPONENT=%{QUOTEDSTRING:COMPONENT}%{SPACE}PAGE=%{QUOTEDSTRING:PAGE}%{SPACE}KEYS=%{QUOTEDSTRING:KEYS}%{SPACE}ACTION=%{QUOTEDSTRING:ACTION}%{SPACE}URL=%{QUOTEDSTRING:URL}%{SPACE}DEVICETYPE=%{QUOTEDSTRING:DEVICETYPE}%{SPACE}RENDERMODE=%{QUOTEDSTRING:RENDERMODE}%{SPACE}PHYSICALHOST=%{QUOTEDSTRING:PHYSICALHOST}%{SPACE}WEBAPPDOMAIN=%{QUOTEDSTRING:WEBAPPDOMAIN}%{SPACE}ELAPSEDSECONDS=\"%{NUMBER:ELAPSEDTIMEINSECONDS:float}\"" }}

geoip{
source => "IP"

    fields => ["longitude","latitude"]
    target => "client.geo.location"

}

mutate { gsub => [ "Activity-Time", ",.*", "" ] }

}
if [log_type] == "pages" {

grok{
match => {"message" => "%{TIMESTAMP_ISO8601:Activity-Time},PAGE=%{QUOTEDSTRING:PAGE},DESCRIPTION=%{QUOTEDSTRING:PAGEDESCRIPTION},USAGE=%{QUOTEDSTRING:PAGEUSAGE},PPI_LEVEL=%{QUOTEDSTRING:PPI_LEVEL}"}
}

mutate { gsub => [ "Activity-Time", ",.*", "" ] }

}
aggregate{
task_id => "%{PAGE}"
code => "
map['PAGEDESCRIPTION'] ||= event.get('PAGEDESCRIPTION')
map['HTTPAGENT'] ||= event.get('HTTPAGENT')
"
map_action => "create_or_update"
timeout => 10
push_map_as_event_on_timeout => true
}
}

output{
elasticsearch {
hosts => ["10.0.62.184:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
template_name => "testtemplate"
}
}
Please guide if aggregate is correct?

Is the placement and syntax correct ? Am i missing anything

@Badger Can you please help on the above topic, we are very near to the solution i believe, please do let me know if you need any further details, i have set pipeline.workes: 1 in pipeline.yml as well as logstash.yml, still the combined event is not getting generated post time out

I have run out of suggestions.

@Badger is the placement of aggregate correct?

And i should see the combined event after timeout right?

What i do is i manually enter a line in one log file and then into another log file, i see both events appearing in kibana but the aggregated is not coming, can you specifically check if placement of aggregate is correct and syntax wise is it ok, just one last round and then i am on my own :slight_smile:

The placement and syntax look OK to me.

@Badger. A Generic question can i place multiple aggregate plugin inside filter {}

And in the above case ||=. is fine or should i keep =, Anything i need to do in elasticsearch template side or any settings in any of the yml?

How to check or debug the logs or events where its failing or whether its going inside aggregate or not?

You can have multiple aggregate filters in the filter section. In some use cases you are required to (see example 1 in the documentation).

||= looks OK. That will only do an assignment if the left hand side is nil, which should work for your use case.

If you are running on the command line you can call puts to print something in the code option. Alternatively

File.open("/tmp/foo.txt", "w") {}

and verify the file gets created.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.