Multiline grok

Hello,

I have a problem. This is how my log looks like :

dn: cn=groupe1,cn=Groups,cn=8770 administration,o=nmc
groupname: groupe1
displayname: groupe1
objectclass: top
objectclass: groupOfUniqueNames
objectclass: NmcSecurityGroup
description: Administrateurs de la taxation
uniquemember: uid=toto,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto1,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto2,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto3,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto4,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto5,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto6,cn=Administrators,cn=8770 administration,o=nmc
cn: groupe1

dn: cn=groupe2,cn=Groups,cn=8770 administration,o=nmc
groupname: groupe2
displayname: groupe2
objectclass: top
objectclass: groupOfUniqueNames
objectclass: NmcSecurityGroup
description: Gestionnaires de l'annuaire
uniquemember: uid=toto7,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto8,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto9,cn=Administrators,cn=8770 administration,o=nmc
uniquemember: uid=toto10,cn=Administrators,cn=8770 administration,o=nmc
cn: groupe2

I want indexes like that with 3 fields :

Groupename ; Members; description

For example, these log should return two documents.

Groupename : groupe1 ; Members : toto,toto1;toto2,toto3,toto4,toto5,toto6 ; description : Administrateurs de la taxation

Groupename : groupe 2 ; Members : toto7,toto8,toto9,toto10 ; description : Gestionnaires de l'annuaire d'entreprise

I imagine that I have to use multiline event to make logstash understand that one block of log describes information about one group so that one block of log is one event. The problem is that the number of members for each group is variable and can't be catch with a grok filter. How can I catch each member and add them into the list "Members"?

I would do it using something like

filter {
    if [message] =~ "^dn:" {
        dissect { mapping => { "message" => "dn: cn=%{groupname},%{}" } }
        ruby { code => '@@groupname = event.get("groupname")' }
        aggregate {
            task_id => "%{groupname}"
            code => "map['namelist'] = []"
            map_action => "create"
        }
        drop {}
    } else if [message] =~ "^uniquemember:" {
        dissect { mapping => { "message" => "uniquemember: uid=%{membername},%{}" } }
        ruby { code => 'event.set("groupname", @@groupname)' }
        aggregate {
            task_id => "%{groupname}"
            code => "map['namelist'] << event.get('membername')"
            map_action => "update"
        }
        drop {}
    } else if [message] =~ "^cn:" {
        ruby { code => 'event.set("groupname", @@groupname)' }
        aggregate {
            task_id => "%{groupname}"
            code => "event.set('namelist', map['namelist'])"
            map_action => "update"
            end_of_task => true
            timeout => 120
        }
    } else {
        drop {}
    }
}

Note that you must use a single pipeline worker thread when using aggregate.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.