Combined grok pattern for customized logs

i am looking some help and guidenace for parsing the customized logs in one file. i have httpd access logs which have two format and i need to prepare the logstash config/filtering the data.
so i tried two different patterns to combined in one grok and as well different grok method under the filter but both of them are not getting filters data.

is there any way, that i can put two grok patterns for filtering the customized httpd access logs ? please guide . thanks in advance.

Hello,

You need to share sample of your logs and also the grok configuration you are using.

Please use the preformatted text option, the </> button while shargin configurations and sample messages.

thanks mate for the response,

samples below for logs
one trainsications

10.92.10.11 - - [12/Jul/2023:08:00:02 +0800] - "GET /isalive HTTP/1.1" 200 15bytes "-" "ZK3tAiGP-VpKu4Cf5HWLGgxxxxxx" [-] 0ms

another logs

10.10.110.10 - - [12/Jul/2023:08:00:37 +0800] appxxx2-ig "GET /am/oauth2/.well-known/openid-configuration HTTP/1.1" 200 3897bytes "10.10.110.10" "ZK3tJSGP-VpKu4Cf5xxxxxxxxxx" [-] 37ms

another logs

10.52.11.10 - - [12/Jul/2023:08:09:05 +0800]  "POST /am/json/sessions/?_action=getSessionInfo HTTP/1.1" 200 6440bytes "10.52.5.16" "ZK3vIR_wxxxx3QRmANzxxxxxxxx" [-] 17ms

these are the types of logs avilable in access_logs which needs to parse and apply the filtering.

i tried to follow multiple grok pattern one is below

%{IP:clientIP} \- \- \[%{HTTPDATE:timestamp}\] %{DATA:client_name} "%{WORD:httpMethod} %{URIPATHPARAM:httpPath} HTTP/%{NUMBER:httpVersion}" %{NUMBER:httpStatusCode} %{NUMBER:responseSize}bytes "%{IP:httpReferer}" "%{DATA:httpUserAgent}" \[%{DATA:requestId}\] %{NUMBER:responseTime}ms```

another one is also 

'%{IP:client_ip} - - [%{HTTPDATE:timestamp}] - "%{WORD:client_method} %{URIPATHPARAM:request_path} %{DATA:http_version}" %{NUMBER:status_code} %{NUMBER:response_size}bytes "%{DATA:extra}" "%{DATA:user_agent}" [-] %{NUMBER:response_time.ms}' }

but all of them are failing.

This works in grokconstructor for all three cases:

%{IPORHOST:[source][address]} (?:-|%{HTTPDUSER:[access][user][identity]}) (?:-|%{HTTPDUSER:[user][name]}) \[%{HTTPDATE:timestamp}\] (%{HTTPDUSER:[user][name]})? \"(?:%{WORD:[http][request][method]} %{NOTSPACE:[url][original]}(?: HTTP/%{NUMBER:[http][version]})?|%{DATA})\" (?:-|%{INT:[http][response][status_code]:int}) (?:-|%{INT:[http][response][body][bytes]:int}bytes) \"(?:-|%{IPORHOST:[destination][address]})\" \"%{DATA:session}\" \[%{DATA:something}\] (%{INT:time_taken:int})ms

Pls review column names.

thanks for the response.

i tried to follow the grok pattern to match my all 3 types of logs in my pipeline filter but its giving an failure as below

error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [host] of type [text] in document with id 'Tis0aIkBqDh-xxxxxxx'. Preview of field's value: '{name=x01xxxxxxxxxx7a.vsi.uat.abc.com}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:327"}}}}

also below is my logstash filter in pipeline.

                if [application] in [ "CXXX-ENT-X-S" ]
                {
                grok {
                        match => { "messag" => '%{IPORHOST:[source][address]} (?:-|%{HTTPDUSER:[access][user][identity]}) (?:-|%{HTTPDUSER:[user][name]}) \[%{HTTPDATE:timestamp}\] (%{HTTPDUSER:[user][name]})? \"(?:%{WORD:[http][request][method]} %{NOTSPACE:[url][original]}(?: HTTP/%{NUMBER:[http][version]})?|%{DATA})\" (?:-|%{INT:[http][response][status_code]:int}) (?:-|%{INT:[http][response][body][bytes]:int}bytes) \"(?:-|%{IPORHOST:[destination][address]})\" \"%{DATA:session}\" \[%{DATA:something}\] (%{INT:time_taken:int})ms' }
                        }
                }

Kibana data view is not OK. Try with a new index name or delete and create a new data pattern.

i have changed the data view in the kibana itself but its giving me the same errors as below -

[2023-07-18T18:11:07,350][WARN ][logstash.outputs.elasticsearch][csec][failed] Could not index event to Elasticsearch. status: 400, action: ["index", {:_id=>nil, :_index=>"csec_logstash_fail_events-2023.07", :routing=>nil}, {"tags"=>["_jsonparsefailure"], "input"=>{"type"=>"filestream"}, "message"=>"10.1.1.10 - - [12/Jul/2023:08:03:54 +0800] - \"GET /isalive HTTP/1.1\" 200 15bytes \"-\" \"ZK3t6h_wNPwl3QRmAxxxxxxx\" [-] 0ms", "ecs"=>{"version"=>"8.0.0"}, "host"=>{"name"=>"x01xxxxx7a.vsi.uat.abc.com"}, "metadata"=>{"timezone"=>"Asia/APAC", "application"=>"CSEC-ENT-HD-ACC"}, "@timestamp"=>2023-07-18T10:11:00.143Z, "@version"=>"1", "topic"=>"testtopic", "agent"=>{"ephemeral_id"=>"fad60b7a-286f-4e4c-b2da-d9c75fb5077c", "id"=>"bb93e0a2-e822-4033-aa8a-d905cdxxxxxx", "name"=>"x01xxxxxx7a.vsi.uat.abc.com", "version"=>"8.5.3", "type"=>"filebeat"}, "log"=>{"file"=>{"path"=>"/tmp/csec/am-logs/access_log1.log"}, "offset"=>13416}}], response: {"index"=>{"_index"=>"csec_logstash_fail_events-2023.07", "_id"=>"H5l7aIkBDQg7IgHxxxxx", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [host] of type [text] in document with id 'H5l7aIkBDQg7IgHlbzPM'. Preview of field's value: '{name=x01xxxxx7a.vsi.uat.abc.com}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:238"}}}}```

Grok is OK:

input {
 generator { 
        message => [ '10.1.1.10 - - [12/Jul/2023:08:03:54 +0800] - "GET /isalive HTTP/1.1" 200 15bytes "-" "ZK3t6h_wNPwl3QRmAxxxxxxx" [-] 0ms' ]
        count => 1
 }
} 
filter {

    grok { 
       match => { "message" => '%{IPORHOST:[source][address]} (?:-|%{HTTPDUSER:[access][user][identity]}) (?:-|%{HTTPDUSER:[user][name]}) \[%{HTTPDATE:timestamp}\] (%{HTTPDUSER:[user][name]})? \"(?:%{WORD:[http][request][method]} %{NOTSPACE:[url][original]}(?: HTTP/%{NUMBER:[http][version]})?|%{DATA})\" (?:-|%{INT:[http][response][status_code]:int}) (?:-|%{INT:[http][response][body][bytes]:int}bytes) \"(?:-|%{IPORHOST:[destination][address]})\" \"%{DATA:session}\" \[%{DATA:something}\] (%{INT:time_taken:int})ms' }
       id => "grok"  
    }

}

output {
    stdout {  }
}

You have error:

  • "tags"=>["_jsonparsefailure"] - Do you have a json conversion?
  • "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [host] of type [text] in document with id 'H5l7aIkBDQg7IgHlbzPM'. Preview of field's value: '{name=x01xxxxx7a.vsi.uat.abc.com}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:238"}}}}```
    You have an issue with the host field, either you are inserting host as JSON in text field either you are trying to do something with JSON transformation.

Result:

{
     "something" => "-",
     "timestamp" => "12/Jul/2023:08:03:54 +0800",
       "message" => "10.1.1.10 - - [12/Jul/2023:08:03:54 +0800] - \"GET /isalive HTTP/1.1\" 200 15bytes \"-\" \"ZK3t6h_wNPwl3QRmAxxxxxxx\" [-] 0ms",
       "session" => "ZK3t6h_wNPwl3QRmAxxxxxxx",
          "user" => {
        "name" => "-"
    },
           "url" => {
        "original" => "/isalive"
    },
          "http" => {
         "request" => {
            "method" => "GET"
        },
        "response" => {
            "status_code" => 200,
                   "body" => {
                "bytes" => 15
            }
        },
         "version" => "1.1"
    },
    "time_taken" => 0,
      "@version" => "1",
    "@timestamp" => 2023-07-18T12:08:42.630534700Z,
        "source" => {
        "address" => "10.1.1.10"
    }
}

And change the something field according to column names for access_log.

ahh, understood . i found that my index have the host entry field already which is having type text.

            "host": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256

here the type is different, for particularly these logs samples.
is there any other ways we can update the values in the host type in indices or any other better options.

This thread expands upon the problem.

You cannot change the type of a field in an existing index without re-indexing. And all the events need to be consistent about what type a field is.

1 Like

Thank you , Badger and Rios for your response.
yeah, that make sense to me now. i have changed indices for another test of my pipelines, during test, its giving me parsing error code 46 for some transactions. below are two samples error in my logstash logs -

[2023-07-19T16:35:15,488][WARN ][logstash.filters.json    ][csec][312699f9f6dc8aebf11f8827008e30d3c3f562469142d2f278d4db7bd72177cb] Error parsing json {:source=>"message", :raw=>"10.10.20.10 - - [12/Jul/2023:08:00:34 +0800] - \"GET /isalive HTTP/1.1\" 200 15bytes \"-\" \"ZK3tIngceATC133RawX14xxxxx\" [-] 0ms", :exception=>#<LogStash::Json::ParserError: Unexpected character ('.' (code 46)): Expected space separating root-level values
 at [Source: (byte[])"10.10.20.10 - - [12/Jul/2023:08:00:34 +0800] - "GET /isalive HTTP/1.1" 200 15bytes "-" "ZK3tIngceATC133RawX14wAAAJQ" [-] 0ms"; line: 1, column: 7]>}


[2023-07-19T16:35:15,488][WARN ][logstash.filters.json    ][csec][312699f9f6dc8aebf11f8827008e30d3c3f562469142d2f278d4db7bd72177cb] Error parsing json {:source=>"message", :raw=>"10.52.245.67 - - [12/Jul/2023:08:00:37 +0800] appbau2a-ig \"GET /am/oauth2/.well-known/openid-configuration HTTP/1.1\" 200 3897bytes \"10.10.10.10\" \"ZK3tJSGP-VpKu4Cf5HWxxxxxxx\" [-] 37ms", :exception=>#<LogStash::Json::ParserError: Unexpected character ('.' (code 46)): Expected space separating root-level values
 at [Source: (byte[])"10.10.20.20 - - [12/Jul/2023:08:00:37 +0800] appbau2a-ig "GET /am/oauth2/.well-known/openid-configuration HTTP/1.1" 200 3897bytes "10.12.11.10" "ZK3tJSGP-VpKu4Cf5HWxxxxxxX" [-] 37ms"; line: 1, column: 7]>}

it also seems that i have two type of logs format - .json and .log so wherever the json format is there its getting parsed correctly but if its .log format its not parsing all filed from my 3 type of log samples.
is there any other better ways to filter the data into my pipeline. thanks in advance .

@Rios , may you help me for the grok pattern. it seems all messages are going into one liners only. i am testing my .log pipelines for httpd access logs

 "message" => "{\"@timestamp\":\"2023-07-21T10:34:01.270Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.5.3\"},\"ecs\":{\"version\":\"8.0.0\"},\"host\":{\"name\":\"x01stsxxxx7a.vsi.uat.abc.com\"},\"agent\":{\"version\":\"8.5.3\",\"ephemeral_id\":\"76c0d20b-8087-4585-b625-70044a945678z4\",\"id\":\"ef7c7f39-97c9-4945-a331-16999999\",\"name\":\"x01stsxxxx7a.vsi.uat.abc.com\",\"type\":\"filebeat\"},\"log\":{\"offset\":2846928,\"file\":{\"path\":\"/tmp/csec/ds/ldap-access.audit1.json\"}},\"message\":\"{\\\"eventName\\\":\\\"DJ-LDAP\\\",\\\"client\\\":{\\\"ip\\\":\\\"10.52.64.166\\\",\\\"port\\\":43120},\\\"server\\\":{\\\"ip\\\":\\\"10.52.64.177\\\",\\\"port\\\":1636},\\\"request\\\":{\\\"protocol\\\":\\\"LDAPS\\\",\\\"operation\\\":\\\"SEARCH\\\",\\\"connId\\\":10,\\\"msgId\\\":41781,\\\"dn\\\":\\\"\\\",\\\"scope\\\":\\\"base\\\",\\\"filter\\\":\\\"(objectClass=*)\\\",\\\"attrs\\\":[\\\"1.1\\\"]},\\\"transactionId\\\":\\\"9109e171-b875-4f61-ab88-5984e8d952f6-5504997\\\",\\\"response\\\":{\\\"status\\\":\\\"SUCCESSFUL\\\",\\\"statusCode\\\":\\\"0\\\",\\\"elapsedTime\\\":0,\\\"elapsedTimeUnits\\\":\\\"MILLISECONDS\\\",\\\"nentries\\\":1},\\\"userId\\\":\\\"uid=am-identity-bind-account,ou=admins,ou=identities,dc=cybersecure,dc=abc,dc=com\\\",\\\"timestamp\\\":\\\"2023-07-12T05:52:26.168Z\\\",\\\"_id\\\":\\\"9109e171-b875-4f61-ab88-5984e8d952f6-5504999\\\"}\",\"metadata\":{\"application\":\"CSEC-DS-LDP-ACC\",\"timezone\":\"Asia/Asia\"},\"topic\":\"testtopic2006\",\"input\":{\"type\":\"filestream\"}}",

seems its not parsing all fields separately.

Grok is OK, however somewhere in .conf you are adding message from grok and other fields to the message field. Review your .conf, if is not clear, go step by step.

The message filed inside the message field.

{
  "eventName": "DJ-LDAP",
  "client": {
    "ip": "10.52.64.166",
    "port": 43120
  },
  "server": {
    "ip": "10.52.64.177",
    "port": 1636
  },
  "request": {
    "protocol": "LDAPS",
    "operation": "SEARCH",
    "connId": 10,
    "msgId": 41781,
    "dn": "",
    "scope": "base",
    "filter": "(objectClass=*)",
    "attrs": [
      "1.1"
    ]
  },
  "transactionId": "9109e171-b875-4f61-ab88-5984e8d952f6-5504997",
  "response": {
    "status": "SUCCESSFUL",
    "statusCode": "0",
    "elapsedTime": 0,
    "elapsedTimeUnits": "MILLISECONDS",
    "nentries": 1
  },
  "userId": "uid=am-identity-bind-account,ou=admins,ou=identities,dc=cybersecure,dc=abc,dc=com",
  "timestamp": "2023-07-12T05:52:26.168Z",
  "_id": "9109e171-b875-4f61-ab88-5984e8d952f6-5504999"
}

here is my .conf file attached.


![image|690x446](upload://qYR2ibiQyG2p5R4rQEJnHaOUXtY.png)


the problem is that i am seeking all my parsing coming into one block under the message fields.

{
       "message" => "{\"@timestamp\":\"2023-07-24T03:21:27.404Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.5.3\"},\"host\":{\"name\":\"x01stxxxxxxx7a.vsi.uat.abc.com\"},\"agent\":{\"version\":\"8.5.3\",\"ephemeral_id\":\"b09569a6-f161-4b7c-8ffb-2d57f7xxxxx5\",\"id\":\"e5f672fc-b02d-4725-bf1f-22e20697436e\",\"name\":\"x01stxxxxxxx7a.vsi.uat.abc.com\",\"type\":\"filebeat\"},\"message\":\"{\\\"eventName\\\":\\\"DJ-LDAP\\\",\\\"client\\\":{\\\"ip\\\":\\\"10.52.64.17\\\",\\\"port\\\":38834},\\\"server\\\":{\\\"ip\\\":\\\"10.52.64.177\\\",\\\"port\\\":1636},\\\"request\\\":{\\\"protocol\\\":\\\"LDAPS\\\",\\\"operation\\\":\\\"SEARCH\\\",\\\"connId\\\":17324,\\\"msgId\\\":18237,\\\"dn\\\":\\\"\\\",\\\"scope\\\":\\\"base\\\",\\\"filter\\\":\\\"(objectClass=*)\\\",\\\"attrs\\\":[\\\"1.1\\\"]},\\\"transactionId\\\":\\\"9109e171-b875-4f61-ab88-5984e8d952f6-5499843\\\",\\\"response\\\":{\\\"status\\\":\\\"SUCCESSFUL\\\",\\\"statusCode\\\":\\\"0\\\",\\\"elapsedTime\\\":0,\\\"elapsedTimeUnits\\\":\\\"MILLISECONDS\\\",\\\"nentries\\\":1},\\\"userId\\\":\\\"uid=am-identity-bind-account,ou=admins,ou=identities,dc=cybersecure,dc=abc,dc=com\\\",\\\"timestamp\\\":\\\"2023-07-12T05:47:43.274Z\\\",\\\"_id\\\":\\\"9109e171-b875-4f61-ab88-5984e8d952f6-5499847\\\"}\",\"log\":{\"offset\":1885305,\"file\":{\"path\":\"/tmp/csec/ds/ldap-access.audit1.json\"}},\"input\":{\"type\":\"filestream\"},\"metadata\":{\"application\":\"CSEC-DS-LDP-ACC\",\"timezone\":\"Asia/Asia\"},\"topic\":\"testtopic2006\",\"ecs\":{\"version\":\"8.0.0\"}}",
    "@timestamp" => 2023-07-24T03:21:27.971946364Z,
      "@version" => "1"

i think image was not uploaded clearly of my logstash config . i am sharing it with filter which i am using

        filter {
                ## Another application filtering
                        if [application] in [ "CSEC-ENT-HD-ACC" ]
                        {
                                ruby { code => 'host = event.get("host")
                                        if host.is_a? Hash
                                                event.set("host", host.to_s)
                                        end'
                                }
                grok {
                        match => { "message" => '%{IPORHOST:[source][address]} (?:-|%{HTTPDUSER:[access][user][identity]}) (?:-|%{HTTPDUSER:[user][name]}) \[%{HTTPDATE:timestamp}\] (%{HTTPDUSER:[user][name]})? \"(?:%{WORD:[http][request][method]} %{NOTSPACE:[url][original]}(?: HTTP/%{NUMBER:[http][version]})?|%{DATA})\" (?:-|%{INT:[http][response][status_code]:int}) (?:-|%{INT:[http][response][body][bytes]:int}bytes) \"(?:-|%{IPORHOST:[destination][address]})\" \"%{DATA:session}\" \[%{DATA:extra}\] (%{INT:time_taken:int})ms' }
                        }
           }
        }
output
        {
        stdout { codec => rubydebug }
        if "_jsonparsefailure" in [tags] {
                file {
                         path => "/logs/mozu/logstash/events/csec_failed_events-%{+YYYY-MM-dd}.json"
                }
        elasticsearch {
            user => "logstash_output"
            password => "${LOGSTASH_OUTPUT_PWD}"
            id => "failed"
            hosts => [ "https://x01sxxxx1a.vsi.uat.abc.com:9210", "https://x01sxxxx2a.vsi.uat.abc.com:9210", "https://x01sxxxxx3a.vsi.uat.abc.com:9210" ]
            index => "csec_failed_events-%{+YYYY.MM}"

Hello @Badger @Rios , mates. do you have any suggestions for me.

Thanks in advance.

In one place you say there are two types of logs, in another you suggest there are three. Whichever it is, please provide examples of each type. Then we can try to reproduce the issue.

Also, what is the actual error you are trying to fix? It seems you understand the mapping exception, and also the json filter error parsing code 46, so what is your current question?

thank you for response. I am just newbie though in this ELK world.
So, basically, i am creating a one pipeline where i am trying to capture the different type of logs in it. ie. httpd, OS, app logs which have different formats say .json and .log .
so, in pipeline using codec=json is parsing but for the httpd logs parsing is happing using grok but its all-parsed data are going in message field only.

i am not sure if its good idea to use one pipeline for different logs under one application (different logs) or is there any other better approach can be useful.

i think i fixed for a type of logs but my grok pattern is getting failed in the below logs formats since i have custom logs in my beats

10.52.245.67 - - [12/Jul/2023:08:08:52 +0800] uibau1a "GET /login/assets/fonts/OpenSans/OpenSans-SemiBold.woff HTTP/1.1" 200 18696bytes "10.168.224.18, 10.52.80.51" "ZK3vFHgceATC133RawX2NgAAAJE" [-] 0ms

10.52.245.67 - - [12/Jul/2023:08:08:59 +0800] app2a-bau "POST /employee HTTP/1.1" 200 215bytes "10.168.224.18, 10.52.80.51" "ZK3vGyGP-VpKu4Cf5HWLUwAAAMI" [-] 708ms

here with my logstash.conf which is giving me grokparsefailure for these logs.,

filter {

#cleaning up the data #
     json { source => "[@metadata][body]" remove_field => [ "message" ] }
     ruby {
           code => '
           if event
                event_hash = event.to_hash
                  if event_hash.kind_of?(Hash) and event_hash != {}
                          event_hash.each { |k, v|
                                    if v == "-"
                                    event.set(k, nil)
                                    end
                                 }
                            end
                      end'
                  }
        ruby { code => 'event.set("@timestamp_logstash", event.get("@timestamp"))' }
        json { source => "message" }

# CSEC Filtering the data

        grok { match => { "message" => "%{IPORHOST:[source][address]} (?:-|%{HTTPDUSER:[access][user][identity]}) (?:-|%{HTTPDUSER:[user][name]}) \[%{HTTPDATE:eventtime}\] (%{HTTPDUSER:[user][name]})? \"(?:%{WORD:[http][request][method]} %{NOTSPACE:[url][original]}(?: HTTP/%{NUMBER:[http][version]})?|%{DATA})\" (?:-|%{INT:[http][response][status_code]:int}) (?:-|%{INT:[http][response][body][bytes]:int}bytes) \"(?:-|%{IPORHOST:[destination][address]})\" \"%{DATA:session}\" \[%{DATA:extra}\] (%{INT:time_taken:int})ms" } }

## date conversion ##
        date { match => [ "timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "dd/MMM/yyyy:HH:mm:ss ZZZZ", "yyyy-MM-dd HH:mm:ss", "ISO8601" ]
        timezone => "Asia/Singapore"
        #target => "parsed_times"
                }
       }
output
{
        stdout { codec => rubydebug }
}

pl suggest if I am missing any format or i can use any other grok patterns for these logs.