Применение csv фильтра после grok

Здравствуйте!
Есть необходимость парсить csv фильтром то, что не распарсил grok.
Я сделал конфиг:

filter {

  if "ise" in [tags] and ([message] !~ "NetworkDeviceName=ASA1-VPN" and [message] !~ "NetworkDeviceName=ASA2-VPN") {
    drop { }
  }

  if "ise" in [tags] {
    grok {
      patterns_dir => ["/etc/logstash/paterns"]
      match => { "message" => "<181>%{WORD:month}\s*%{NUMBER:date} %{TIME:time} %{DATA:Server} %{DATA:CISE_logging_category} %{NUMBER:trash3} %{NUMBER:trash4} %{NUMBER:trash5} %{DATE_12:Logged_at_start} %{DATA:trash6} %{NUMBER:seq_num} %{NUMBER:cisco_message_code} %{WORD:facility} Radius-Accounting: RADIUS Accounting start request, ConfigVersionId=%{NUMBER:config_version_id}, Device IP Address=%{IP:device_ip}, RequestLatency=%{NUMBER:request_latency}, NetworkDeviceName=%{DATA:device_name_start}, User-Name=%{DATA:vpn_user_start}, %{GREEDYDATA:message2}" }
      match => { "message" => "<181>%{WORD:month}\s*%{NUMBER:date} %{TIME:time} %{DATA:Server} %{DATA:CISE_logging_category} %{NUMBER:trash3} %{NUMBER:trash4} %{NUMBER:trash5} %{DATE_12:Logged_at_stat} %{DATA:trash6} %{NUMBER:seq_num} %{NUMBER:cisco_message_code} %{WORD:facility} Radius-Accounting: RADIUS Accounting stop request, ConfigVersionId=%{NUMBER:config_version_id}, Device IP Address=%{IP:device_ip}, RequestLatency=%{NUMBER:request_latency}, NetworkDeviceName=%{DATA:device_name_stop}, User-Name=%{DATA:vpn_user_stop}, %{GREEDYDATA:message2}" }
      remove_field => [ "message" ]
    }

    mutate {
      gsub => ["message2", " ", ""]
    }
    csv {
      separator => "," 
      autodetect_column_names => "true"
    }
  }
}

Но почему то не работает, возможно ли сделать парсинг с помощью csv после grok?
Спасибо!

Какое поле вы пытаетесь парсить с помощью фильтра csv?

1 Like

message2

CVS работает с message по умолчанию.

1 Like

То есть нужно вот так:

csv {
  source => "message2"
  separator => "," 
  autodetect_column_names => "true"
}

Да, заработало, спасибо!

Игорь, подскажите пожалуйста, что не так. Мне нужно из @timestamp вычесть другую отметку времени для этого написал код:

  if "ise" in [tags] and [Acct_Session_Time] {
    date {
      match => [ "Acct_Session_Time", "UNIX" ]
      target => "Duration_temp"
    }

    ruby {
      code => "
        event.set('Duration', event.get('Duration_temp').time.strftime('%H:%M:%S'))
        event['Session_start'] = (event('@timestamp') - event('Duration_temp'))
        "
    }

Фильтр date преобразует число в дату после 1970 года и записывает её в поле "Duration_temp", это преобразование работает.

Фильтр ruby:
Первый код вырезает из поля Duration_temp часы, минуты и секунды, это тоже работает.
Второй код вычитает из @timestamp, Duration_temp по сути должно появиться поле Session_start, но не появляется - не работает.

Подскажите, как мне получить результат вычитания @timestamp из Duration_temp?

Спасибо!

У event('@timestamp') скобки не те, или .get() не хватает.

И так и так пробовал, результат ноль..

    event.set('Session_start', event.get['@timestamp'] - event.get['Duration_temp'])
    event['Session_start'] = (event['@timestamp'] - event['Duration_temp'])

Не могу воспроизвести проблему. Пытался вот так:

test.conf:

input {
  stdin {

  }
}

filter {
  csv {
    separator => ","
    autodetect_column_names => "true"
  }

  date {
    match => [ "Acct_Session_Time", "UNIX" ]
    target => "Duration_temp"
  }

  ruby {
    code => "
      event.set('Duration', event.get('Duration_temp').time.strftime('%H:%M:%S'))
      event.set('Session_start', (event.get('@timestamp') - event.get('Duration_temp')))
    "
  }

  date {
    match => [ "Session_start", "UNIX" ]
    target => "Session_start_date"
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

test.log:

Acct_Session_Time
2000
3000

Запускаю:

 bin/logstash -f config/test.conf < test.log

Получаю:

{
              "Duration" => "00:33:20",
            "@timestamp" => 2020-01-28T21:31:03.981Z,
              "@version" => "1",
                  "host" => "grasshopper",
               "message" => "2000",
         "Session_start" => 1580245063.981,
     "Acct_Session_Time" => "2000",
    "Session_start_date" => 2020-01-28T20:57:43.981Z,
         "Duration_temp" => 1970-01-01T00:33:20.000Z
}
{
              "Duration" => "00:50:00",
            "@timestamp" => 2020-01-28T21:31:03.982Z,
              "@version" => "1",
                  "host" => "grasshopper",
               "message" => "3000",
         "Session_start" => 1580244063.982,
     "Acct_Session_Time" => "3000",
    "Session_start_date" => 2020-01-28T20:41:03.982Z,
         "Duration_temp" => 1970-01-01T00:50:00.000Z
}

Все, вроде, как надо.

1 Like

Игорь спасибо!
Как всегда, Вы на высоте!)

Единственное что, у меня почему то еще минус 3 часа вычитает)
Походу с часовым поясом, что-то..

@timestamp                   Jan 29, 2020 @ 20:34:37.197
Acct_Session_Time        5637
Duration                         01:33:57
Duration_temp             1970-01-01T01:33:57.000Z
Session_start                 1580313640.197
Session_start_date       2020-01-29T16:00:40.197Z

Как вы этот вывод получили?

Добавил Ваш конфиг в свой и просто скопировал из кибаны.

Из какой части кибаны? Если из Discover, то там поля с типом date показываются в вашей зоне. Остильные поля, скорее всего текстовые, поэтому они в той зоне, в которой вы их обрабатывали. Все что оканчивается на Z - в UTC, если временной зоны в конце нет - то это ваше местное время.