Logstash split string issues when parsing text

Hi,

I have message is in bellow format

TIME: 2019-10-19T11:48:40.896687Z

USER@HOST: TEST[TEST] @ [172.30.1.71] ID: 1130779

QUERY_TIME: 1.611727 LOCK_TIME: 0.000079 ROWS_SENT: 5 ROWS_EXAMINED: 13056

SET TIMESTAMP=1571485720;
SELECT SQL_CALC_FOUND_ROWS TEST.ID FROM TEST WHERE 1=1 AND (((TEST.POST_TITLE LIKE '%TEST%') OR (WP_EUBT_POSTS.TEST LIKE '%TEST%') OR (WP_EUBT_POSTS.POST_CONTENT LIKE '%TEST%'))) DESC LIMIT 0, 18;

I wanted to extract out data after "SET TIMESTAMP=1571485720;" parameter.

Abive message are dynamic and values and additional values will come in any where, only thing won't change is SET TIMESTAMP and after that parameters.

i am able to extract out most of parameters except query parameters

My filter is

mutate {
uppercase => [ "message" ]
}

grok {
    match => { "message" => ["^# USER@HOST: %{USER:mysql_user}(?:\[[^\]]+\])?\s+@\s+%{HOSTNAME:mysql_client_host}?\s+\[%{IP:mysql_client_ip}?\]"]}
}
grok {
    match => { "message" => ["^# QUERY_TIME: %{NUMBER:query_duration_s:float}\s+LOCK_TIME: %{NUMBER:lock_wait_s:float} ROWS_SENT: %{NUMBER:query_sent_rows:int} \s*ROWS_EXAMINED: %{NUMBER:query_examined_rows:int}"]}
}
grok {
    match => { "message" => ["^SET TIMESTAMP=%{NUMBER:timestamp};"]}
}

mutate {
    copy => { "message" => "message_val" }
}

mutate {
    gsub => [ "message_val", "\n"," " ]
}

date {
    match => [ "timestamp", "UNIX" ]
    target => "@timestamp"
}
# Capture the tablename
grok {
  match => {
    "message" => [
      "FROM %{NOTSPACE:query_table}.*",
      "^UPDATE %{NOTSPACE:query_table}.*",
      "^INSERT INTO %{NOTSPACE:query_table}.*",
      "^DELETE FROM %{NOTSPACE:query_table}.*"
    ]
  }
}

mutate {
    add_field => { "testfield" => "SET TIMESTAMP=%{timestamp};" }
}

split {
    field => "message_valnew"
    terminator => "SET TIMESTAMP=%{timestamp};"
    add_field => { "finalquery" => "%{message_valnew[1]}" }
}

So in above split, i am not able to split the query parameters, any help is highly appreciated

Hi,

I can give you an idea how to solve it.
My tests were done with logstash version 7.3.

Let me just by the add some explanations:
I) Just let me have a look at your example message:
I was not sure, if your message is coming in as a single multiline message like that:

TIME: 2019-10-19T11:48:40.896687Z
USER@HOST: TEST[TEST] @ [172.30.1.71] ID: 1130779
QUERY_TIME: 1.611727 LOCK_TIME: 0.000079 ROWS_SENT: 5 ROWS_EXAMINED: 13056
SET TIMESTAMP=1571485720;
SELECT SQL_CALC_FOUND_ROWS TEST.ID FROM TEST WHERE 1=1 AND (((TEST.POST_TITLE LIKE '%TEST%') OR (WP_EUBT_POSTS.TEST LIKE '%TEST%') OR (WP_EUBT_POSTS.POST_CONTENT LIKE '%TEST%'))) DESC LIMIT 0, 18;

or otherwise every message is coming in line by line every event like that:

TIME: 2019-10-19T11:48:40.896687Z
USER@HOST: TEST[TEST] @ [172.30.1.71] ID: 1130779
QUERY_TIME: 1.611727 LOCK_TIME: 0.000079 ROWS_SENT: 5 ROWS_EXAMINED: 13056
SET TIMESTAMP=1571485720;
SELECT SQL_CALC_FOUND_ROWS TEST.ID FROM TEST WHERE 1=1 AND (((TEST.POST_TITLE LIKE '%TEST%') OR (WP_EUBT_POSTS.TEST LIKE '%TEST%') OR (WP_EUBT_POSTS.POST_CONTENT LIKE '%TEST%'))) DESC LIMIT 0, 18;

So for both cases, just use the multiline codec, which is possible for every input filter
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html

input {
  stdin {
    codec => multiline {
      pattern => "^TIME: "
      negate => true
      what => "previous"
    }
  }
}

###############

II) Your Grok patterns if you keep them like they are at the moment are applied to every single event in line, because of that I modified the pipeline to match both cases, not important if they some in with one event or step by step.
The only use case that is not covered at the moment is, that the events come in mixed in, but if this would be the case, that would be more complex, please reach out and clarify, if needed.

###############

III) mysql_client_host made optional in GROK snippet, because without that, there will be a _grokparsefailure

###############

Ok, so now to your questions:
IV) You wanted to extract out everything after "SET TIMESTAMP ...". This is possible, but not with the split filter like you wanted to do.
Some descriptions to that:
a) https://www.elastic.co/guide/en/logstash/current/plugins-filters-split.html#plugins-filters-split-field
Your split configuration was wrong, field configuration is for the source. If you would like to move the results to another field, use the target config parameter.
The split filter does nothing more than splitting an event into multiple events depending on the split conditions separator.
b) https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-split
I though still to use the mutate split variant, but this isn't the purpose as I understand for your use case as well. This variant just splits in one event one field into multiple fields, depending on a separator.
c) I still did some tests with kv filter, but that doesn't work as well, as this splits a message field with quite fixed conditions.
https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html
d) --> So the solution was, just use the mutate filter with gsub and exclude the first part with a regex.
https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-gsub

###############

V) And now, I still was not sure, what you mean by extracting the quey parameter?
I understood you want to isolate the part after the WHERE part, which means, you have the SQL query part isolated, that extracts what exactly you queried.
So to just give an idea how to do that is simply by replacing with gsub the query combination parts like AND, OR, or maybe other parts with a standard pattern, that you could use then for splitting into multiple array field like e.g. use |.
So here the converted example.

filter {
  mutate {
    uppercase => [ "message" ]
  }

  if [message] =~ "^\s+USER@HOST:" {
    grok {
      match => { "message" => ["^USER@HOST: %{USER:mysql_user}(?:\[[^\]]+\])?\s+@\s+(%{HOSTNAME:mysql_client_host}\s+)?\[%{IP:mysql_client_ip}?\]"]}
      add_tag => [ "grokuser" ]
    }
  } else if [message] =~ "^\s+QUERY_TIME:" {
    grok {
      match => { "message" => ["^QUERY_TIME: %{NUMBER:query_duration_s:float}\s+LOCK_TIME: %{NUMBER:lock_wait_s:float} ROWS_SENT: %{NUMBER:query_sent_rows:int} \s*ROWS_EXAMINED: %{NUMBER:query_examined_rows:int}"]}
      add_tag => [ "grokquerytime" ]
    }
  } else if [message] =~ "^\s+SET TIMESTAMP" {
    grok {
      match => { "message" => ["^SET TIMESTAMP=%{NUMBER:timestamp};"]}
      add_tag => [ "groktimestamp" ]
    }
  } else if [message] =~ "FROM|^UPDATE |^INSERT INTO |^DELETE FROM " {
    grok {
      match => {
        "message" => [
          "FROM %{NOTSPACE:query_table}.*",
          "^UPDATE %{NOTSPACE:query_table}.*",
          "^INSERT INTO %{NOTSPACE:query_table}.*",
          "^DELETE FROM %{NOTSPACE:query_table}.*",
          "UPDATE %{NOTSPACE:query_table}.*",
          "INSERT INTO %{NOTSPACE:query_table}.*",
          "DELETE FROM %{NOTSPACE:query_table}.*"
        ]
        add_tag => [ "grokquerytable" ]
      }
    }
  }

  if [message] =~ "^TIME:" and "SET TIMESTAMP" in [message] {
    grok {
      match => { "message" => ["^TIME: %{TIMESTAMP_ISO8601}.+USER@HOST: %{USER:mysql_user}(?:\[[^\]]+\])?\s+@\s+(%{HOSTNAME:mysql_client_host}\s+)?\[%{IP:mysql_client_ip}?\].+QUERY_TIME: %{NUMBER:query_duration_s:float}\s+LOCK_TIME: %{NUMBER:lock_wait_s:float} ROWS_SENT: %{NUMBER:query_sent_rows:int} \s*ROWS_EXAMINED: %{NUMBER:query_examined_rows:int}.+SET TIMESTAMP=%{NUMBER:timestamp};"]}
      add_tag => [ "groktime" ]
    }
  }

  mutate {
    copy => { "message" => "message_val" }
  }
  mutate {
    gsub => [ "message_val", "\n"," ", "message_val", "\r"," " ]
  }
  date {
    match => [ "timestamp", "UNIX" ]
    target => "@timestamp"
  }
  # Capture the tablename
  mutate {
      add_field => { "testfield" => "SET TIMESTAMP=%{timestamp};" }
  }

  mutate {
    gsub => [ "message_val","^.*SET TIMESTAMP=\d+;\s","" ]
  }

  mutate {
    copy => { "message_val" => "message_query" }
  }
  mutate {
    gsub => [ "message_query","^.*WHERE\s","" ]
  }
  mutate {
    copy => { "message_query" => "message_parameter" }
  }
  mutate {
    gsub => ["message_parameter", " AND ", "|AND:"]
    gsub => ["message_parameter", " OR ", "|OR :"]
    gsub => ["message_parameter", " DESC", "|DESC"]
  }


  mutate {
    split => ["message_parameter","|" ]
  }

  mutate {
    strip => [
      "message_val"
    ]
  }

}

If you have questions, just reach out again.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.