Парсинг логов в Logstash с помощью grok

Всем привет, пытаюсь в grok debugger распарсить лог, и впринципе все получается, но...
Есть лог:
"11/01/2018 11:41:36","767","<8F753F80AA7E5221FDF8B1086CD4FC58@domain.ru>","info@domain.ru","ma@domain.ru","",**"Аудиокниги ""Модель для сборки"" - Самая полная и качественная коллекция. 09_09_2018 02_10 201694"**,"68870","KAS_STATUS_SPAM","","","01.11.2018 11:36:25","01.11.2018 11:41:36","SmtpAntispam","AntispamScanner","Block","Email","01.11.2018 11:28:00","","Shikari rule","","External"

Каждый параметр выделен в кавычки и разделен запятой.
1 проблема: то что выделено это тема письма и она уникальная, столкнулся с проблемой, например в теме есть ковычки, как мне их игнорировать и забирать значение до запятой?
2 проблема: некоторые параметры пустые, это просто двойные кавычки, например ,"",
как это обойти и что бы значение было просто пустым?
Спасибо.

A ** вокруг **"Аудиокниги ""Модель для сборки"" - Самая полная и качественная коллекция. 09_09_2018 02_10 201694"** это у вас в логе, или это вы тут добавили?

Это тема письма, она всегда разная.
Лог выглядит так:
"11/01/2018 11:41:36","767","8F753F80AA7E5221FDF8B1086CD4FC58@domain.ru","info@domain.ru","ma@domain.ru","","Аудиокниги ""Модель для сборки"" - Самая полная и качественная коллекция. 09_09_2018 02_10 201694","68870","KAS_STATUS_SPAM","","","01.11.2018 11:36:25","01.11.2018 11:41:36","SmtpAntispam","AntispamScanner","Block","Email","01.11.2018 11:28:00","","Shikari rule","","External"

Создал кастомный паттерн %{RUS_MSG:MessageId}
Но он парсит только до второй кавычки:
{
"MessageId": "Аудиокниги "
}

То есть это нормальный CSV файл? Если это так, то вам принципиально использовать только grok? Если нет, то может лучше воспользоваться CSV фильтром и потом обрабатывать индивидуальные поля через grok?

Это csv файл.
Нет, не принципиально, если есть способ распарсить лог, то без разницы чем это делать)
Кстати, нужно еще учесть, что в поле темы сообщения, тоже возможны запятые.

С такой конфигурацией:

input { stdin { } }

filter {
  csv {
  }
}

output {
  stdout { codec => rubydebug }
}

если заменить и на , в теме сообщения то получается:

{
       "message" => "\"11/01/2018 11:41:36\",\"767\",\"<8F753F80AA7E5221FDF8B1086CD4FC58@domain.ru>\",\"info@domain.ru\",\"ma@domain.ru\",\"\",\"Аудиокниги \"\"Модель для сборки\"\" - Самая полная, качественная коллекция. 09_09_2018 02_10 201694\",\"68870\",\"KAS_STATUS_SPAM\",\"\",\"\",\"01.11.2018 11:36:25\",\"01.11.2018 11:41:36\",\"SmtpAntispam\",\"AntispamScanner\",\"Block\",\"Email\",\"01.11.2018 11:28:00\",\"\",\"Shikari rule\",\"\",\"External\"",
          "host" => "bumblebee",
       "column8" => "68870",
      "column19" => "",
      "column15" => "AntispamScanner",
       "column7" => "Аудиокниги \"Модель для сборки\" - Самая полная, качественная коллекция. 09_09_2018 02_10 201694",
      "column21" => "",
      "@version" => "1",
      "column18" => "01.11.2018 11:28:00",
      "column14" => "SmtpAntispam",
      "column16" => "Block",
       "column6" => "",
      "column12" => "01.11.2018 11:36:25",
      "column20" => "Shikari rule",
       "column2" => "767",
       "column3" => "<8F753F80AA7E5221FDF8B1086CD4FC58@domain.ru>",
      "column10" => "",
       "column5" => "ma@domain.ru",
       "column9" => "KAS_STATUS_SPAM",
      "column11" => "",
       "column4" => "info@domain.ru",
      "column13" => "01.11.2018 11:41:36",
       "column1" => "11/01/2018 11:41:36",
      "column22" => "External",
      "column17" => "Email",
    "@timestamp" => 2018-11-02T13:57:47.154Z
}
1 Like

А почему input stdin?
Данные передаю через Filebeat.

Потому что мне так проще тестировать :slight_smile:

То есть input beats?

Input и output может быть как угодно.

Ок, спасибо!

Можно еще вопрос?

Я уже использую winlogbeat, и мне нужно еще использовать filebeat, как мне его завернуть в elasticsearch через logstash?
Вот мой конфиг, но в elasticsearch вижу только индекс winlog beat:

input {
  beats {
    port => 5044
    tags => ["winlogbeat"]
  }
  beats {
    port => 5045
    tags => ["filebeat"]
  }
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}
output {
   if "winlogbeat" in [tags] {
      elasticsearch {
      hosts => "IP:9200"
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
   }
 }
}

output {
   if "filebeat" in [tags] {
      elasticsearch {
      hosts => "IP:9200"
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
   }
 }
}

output {
   if [type] == "syslog" {
      elasticsearch {
      hosts => "IP:9200"
   }
 }
}

A как filebeat настроен?

Передача логов в логсташ

На какой порт?

5045

Вы уверены, что filebeat настроен правильно и пытается что-нибудь послать в logstash? В его логах что-нибудь есть?

Уверен на 100%, так как если выводить логи в эластиксерч логи видно.

Составил конфиг, правильный ли он?

input {
  beats {
    port => 5045
    tags => ["filebeat"]
  }
}

#Filebeat
filter {
  if "filebeat" in [tags] {
    csv {
      convert => {
      "column1" => "UTC time"
      "column2" => "Duration"
      "column3" => "Message Id"
      "column4" => "From"
      "column5" => "To"
      "column6" => "Cc"
      "column7" => "Subject"
      "column8" => "Message Size"
      "column9" => "Verdict"
      "column10" => "Verdict Name"
      "column11" => "File name"
      "column12" => "Submit time"
      "column13" => "Delivery time"
      "column14" => "Interceptor"
      "column15" => "Handler"
      "column16" => "Action"
      "column17" => "Object type"
      "column18" => "Base issue date"
      "column19" => "Path"
      "column20" => "Detect Method"
      "column21" => "Path"
      "column22" => "Vsapi Scan Type"
      "column23" => "Sender Type"
      }
    }
  }
}

output {
   if "filebeat" in [tags] {
      elasticsearch {
      hosts => "IP5:9200"
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
   }
 }
}