Logstash backfilling two type of logs, problem!

Hello, peopleeeee !

sorry to disturb you with probably such an easy question for you, veteran... BUT ... im stuck, and I mean it...
heres the situation, i need to backfill a few days back log from /var/log/network.log

sample log :

VERSION 1 that need to be backfilled.

2016-02-01T10:44:13-05:00 chrgft.ca date=2016-02-01 time=10:44:13 devname=FG-200D-MASTER devid=FG200D3915877554 logid=0000000013 type=traffic subtype=forward level=notice vd=root srcip=10.24.136.141 srcport=58626 srcintf="port1" dstip=174.252.90.36 dstport=443 dstintf="wan1" poluuid=9499a3ae-87e3-53e5-05b9-1e6e2db9c5c3 sessionid=39393540 proto=6 action=close user="BCA11380" group="SocialMedia" policyid=63 dstcountry="United States" srccountry="Reserved" trandisp=snat transip=10.24.214.5 transport=58626 service="HTTPS" appid=15832 app="Facebook" appcat="Social.Media" apprisk=medium applist="APP-SocialApp" appact=detected duration=115 sentbyte=12948 rcvdbyte=3186 sentpkt=21 rcvdpkt=20 utmaction=allow countapp=1

VERSION 2 that needs to be backfilled

itime=1448930548 date=2015-11-30 time=19:42:28 devid=FG200D3912801116 logid=0001000014 type=traffic subtype=local level=notice vd=root srcip=172.116.14.22 srcport=51680 srcintf="wan2" dstip=172.16.15.255 dstport=137 dstintf="root" sessionid=632299376 status=deny policyid=0 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service=137/udp proto=17 app=137/udp duration=0 sentbyte=0 rcvdbyte=0

So . for a start, lets focus on the first version ... I tried different solution found on the web, playing with date, trying to make my own timestamp etc.. but unfortunately it seem im too stupid to figure it out alone ... so guys, I NEED your help ...

See the problem is HERE :
i NEED to take the timestamp from the "message" and use it as my OWN timestamp, not the "currentday" it indexes in ES...

"message" => "2016-01-27T14:13:25-05:00 10.24.214.33 date=2016-01-27 time=14:13:21 devname=CTR device_id=FGT60B34507623279 log_id=0021000002 subtype=allowed type=traffic pri=notice status=accept vd=root dir_disp=org tran_disp=noop src=10.214.181.15 srcname=10.214.181.15 src_port=51042 dst=10.214.224.16 dstname=10.214.224.16 dst_country=Reserved src_country=Reserved dst_port=8080 tran_ip=0.0.0.0 tran_port=0 tran_sip=0.0.0.0 tran_sport=0 service=CHRG-8080 proto=6 app_type=N/A duration=278 rule=1 policyid=1 identidx=0 sent=3048 rcvd=2456 shaper_drop_sent=0 shaper_drop_rcvd=0 perip_drop=0 shaper_sent_name=N/A shaper_rcvd_name=N/A perip_name=N/A sent_pkt=14 rcvd_pkt=13 vpn=N/A vpn_tunnel=N/A src_int=internal dst_int=wan1 SN=75670534 app=N/A app_cat=N/A user=N/A group=N/A carrier_ep=N/A profilegroup=N/A subapp=N/A subappcat=N/A",
            "@version" => "1",
*********************          "@timestamp" => "2016-02-01T15:22:52.187Z", ********************

and heres the actual config file :

input {
tcp {
'port' => 3333
type => "syslog_backfill"
}
}

filter{

kv {
  source => "message"
  field_split => " "
  value_split => "="

trimkey=>[ "<[0-9][0-9][0-9]>" ]
}

date{
locale => "en"
match => ["message" , "%{TIMESTAMP_ISO8601:logtimestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"]
}

geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

}

unfortunately ... not working

Please. help ?

You do need the date filter to parse the timestamp in a string field into the @timestamp field, but you're using the date filter like a grok filter. You need to first use a grok filter to extract the timestamp, the second field (whatever it is) and the rest of the message. Then use a date filter to parse the extracted timestamp and a kv filter to parse the key/value pairs. Something like this:

grok {
  match => [
    "message",
    "%{TIMESTAMP_ISO8601:logtimestamp} %{WORD:level} %{GREEDYDATA:kv}"
  ]
  remove_field => ["message"]
}
kv {
  source => "message"
  field_split => " "
  value_split => "="
}
date {
  match => ["logtimestamp", "ISO8601"]
  locale => "en"
  remove_field => ["logtimestamp"]
}

Notes:

  • In the grok filter I've replaced LOGLEVEL with WORD because the second column in your first example isn't matched by LOGLEVEL.
  • I've added a removal of the message field if grok is successful because there's no point in keeping that field around. Same thing with logtimestamp.

input {
tcp {
'port' => 3333
type => "syslog_backfill"
}
}

filter{
if [type] == "syslog_backfill" {

grok {
match => [
"message",
"%{TIMESTAMP_ISO8601:logtimestamp} %{WORD:level} %{GREEDYDATA:kv}"
]
remove_field => ["message"]
}

kv {
source => "message"
field_split => " "
value_split => "="
}

date {
match => ["logtimestamp", "ISO8601"]
locale => "en"
remove_field => ["logtimestamp"]
}

geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

}

unfortunately it gives me error on configtest while all my braces are ok ?

service logstash configtest
Error: Expected one of #, => at line 44, column 17 (byte 653) after filter{
if [type] == "syslog_backfill" {

grok {
match => [
"message",
"%{TIMESTAMP_ISO8601:logtimestamp} %{WORD:level} %{GREEDYDATA:kv}"
]
remove_field => ["message"]
}

kv {
source => "message"
field_split => " "
value_split => "="
}

date {
match => ["logtimestamp", "ISO8601"]
locale => "en"
remove_field => ["logtimestamp"]
}

geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

}

output {
elasticsearch

Since you're not showing the whole file it's hard to help. Logstash is usually pretty good at pinpointing the line where the problem is, so I'd look closely in the vicinity of line 44. To narrow down problems like this, comment out blocks until the problem disappears.

I use a config style : 10-whateverinput+filter.conf 50-output.

so yep, the whole file is shown.
in any case, ive managed to get it to configtest ok with removing if [type] == ......

unfortunately, result is still the same. wrong timestamp

input {
tcp {
'port' => 3333
type => "syslog_backfill"
}
}

filter{

grok {
match => [
"message",
"%{TIMESTAMP_ISO8601:logtimestamp} %{WORD:level} %{GREEDYDATA:kv}"
]
remove_field => ["message"]
}

kv {
source => "message"
field_split => " "
value_split => "="
}

date {
match => ["logtimestamp", "ISO8601"]
locale => "en"
remove_field => ["logtimestamp"]
}

geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

}

im happy its you, who try to help me, almost every subject ive checked been resolved by you.

thank you for your time and patience.

I think the source specification for the kv filter should read source => "kv".

tried it. unfortunately, not working ...

I must be missing something obivious as f*ck ....

What do you get from Logstash then? Output from stdout { codec => rubydebug } would be helpful.

tailf logstash.stdout
"message" => "2016-01-27T14:17:47-05:00 10.24.224.33 date=2016-01-27 time=14:17:46 devname=FTG100A-Invite device_id=FG100A3907513434 log_id=0021000002 subtype=allowed type=traffic pri=notice status=accept vd=root dir_disp=org tran_disp=snat src=192.168.1.65 srcname=192.168.1.65 src_port=49947 dst=135.19.0.18 dstname=135.19.0.18 dst_country=Canada src_country=Reserved dst",
"@version" => "1",
"@timestamp" => "2016-02-01T18:06:45.144Z",
"host" => "127.0.0.1",
"port" => 44394,
"type" => "syslog_backfill",
"tags" => [
[0] "_grokparsefailure"
]
}

I assume, it is the output you are looking for ?
that is , after using the sugestion to use source "kv" instead of message

The presence of the _grokparsefailure tag indicates that the grok expression is bad. I now realize that WORD wasn't a good choice for the token after the timestamp. You could e.g. use NOTSPACE instead.

Following your last reply , ive suceeded in using this :

input {
tcp {
'port' => 3333
type => "syslog_backfill"
}
}

filter{

grok {
match => [
"message",
"%{TIMESTAMP_ISO8601:logtimestamp} %{GREEDYDATA:kv}"
]
remove_field => ["message"]
}

kv {
source => "kv"
field_split => " "
value_split => "="
}

date {
match => ["logtimestamp", "ISO8601"]
locale => "en"
remove_field => ["logtimestamp"]
}

geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

}

apparentely , the "level" wasnt present nor needed.
Lastly, how would I proceed for that second part ? ( around ... 3years of old logs to load in ... )

itime=1448930548 date=2015-11-30 time=19:42:28 devid=FG200D3912801116 logid=0001000014 type=traffic subtype=local level=notice vd=root srcip=172.116.14.22 srcport=51680 srcintf="wan2" dstip=172.16.15.255 dstport=137 dstintf="root" sessionid=632299376 status=deny policyid=0 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service=137/udp proto=17 app=137/udp duration=0 sentbyte=0 rcvdbyte=0

the itime is irrevelent, everything else is needed. ( message might differ to but will always start with itime

Looks like the first kind of log, except you it's only key/value pairs. The itime field looks like something the date filter could parse with the UNIX pattern.

the first kind of log, is solved thanks to you.

im not too familiar with the whole parsing process. ive read a lot, and followed most of your reply on other topics. unfortunately , didnt succeed in having something working yet.

you are basically saying that I should be able to use the same filter i used for the first kind ?

( I wont lie. I learn mostly using other people code. and adapting it to my needs )

Not exactly the same set of filters, but they're quite similar. You don't need the grok filter since the whole message is a series of key/value pairs, the date filter needs to look differently, and the field the kv filter should parse will have a different name.

ill give it another go today.

quick question "again"
if ive stoped my Netcat for X Y reason. and I happen to restart it. will it duplicate my log entry OR is there a way for it to know where it stop ?

Thank you

What Netcat are you talking about?

Ok.. im back

Configuration is as follow :

10-network_log.conf
matches log of that type :

2016-02-01T10:44:13-05:00 chrgft.ca date=2016-02-01 time=10:44:13 devname=FG-200D-MASTER devid=FG200D3915877554 logid=0000000013 type=traffic subtype=forward level=notice vd=root srcip=10.24.136.141 srcport=58626 srcintf="port1" dstip=174.252.90.36 dstport=443 dstintf="wan1" poluuid=9499a3ae-87e3-53e5-05b9-1e6e2db9c5c3 sessionid=39393540 proto=6 action=close user="BCA11380" group="SocialMedia" policyid=63 dstcountry="United States" srccountry="Reserved" trandisp=snat transip=10.24.214.5 transport=58626 service="HTTPS" appid=15832 app="Facebook" appcat="Social.Media" apprisk=medium applist="APP-SocialApp" appact=detected duration=115 sentbyte=12948 rcvdbyte=3186 sentpkt=21 rcvdpkt=20 utmaction=allow countapp=1

code :
> input {

      file {
        path => ["/var/log/network.log"]
        start_position => "beginning"
        type => "syslog"
            }
    }
    filter{
    grok {
      match => [
        "message",
        "%{TIMESTAMP_ISO8601:logtimestamp} %{GREEDYDATA:kv}"
      ]
      remove_field => ["message"]
    }
    kv {
          source => "kv"
          field_split => " "
          value_split => "="
    }
    date {
      match => ["logtimestamp", "ISO8601"]
      locale => "en"
      remove_field => ["logtimestamp"]
    }
    geoip{
    source =>"dstip"
    database =>"/opt/logstash/GeoLiteCity.dat"
     }
    }

work as intented BUT everything is a string ... wich leave me little to no liberty in aggregation
in the best world. I would of needed field converting like :

mutate {
convert => ["srcip" , "IP adress format"]
convert => ["dstip" , "IP adress format"]
convert => ["sentbyte" , "number format"]
convert => ["rcvdbyte" , "number format"]
convert => ["sentpkt" , "number format"]
convert => ["rcvdpkt" , "number format"]
}

unfortunately ... didnt succed in doing it. and from what ive come to understand, even if I do suceed. ill be forced to trash my data received so far cause they wont be usable anymore.. ?

Now, to the second format of log ( the backfills one )

matches that kind of log :

itime=1448930548 date=2015-11-30 time=19:42:28 devid=FG200D3912801116 logid=0001000014 type=traffic subtype=local level=notice vd=root srcip=172.116.14.22 srcport=51680 srcintf="wan2" dstip=172.16.15.255 dstport=137 dstintf="root" sessionid=632299376 status=deny policyid=0 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service=137/udp proto=17 app=137/udp duration=0 sentbyte=0 rcvdbyte=0

code :
11-fgt_backfill.conf

input {
  file {
    path => ["/var/log/fortigate/*.log"]
    start_position => "beginning"
    type => "fgt-backfill"
        }
}
filter{
grok {
  match => [
    "message",
    "%{NUMBER:epoch-unixms} %{GREEDYDATA:kv}"
  ]
  remove_field => ["message"]
}
kv {
      source => "kv"
      field_split => " "
      value_split => "="
}
mutate {
  convert => ["srcip" , "integer"]
  convert => ["dstip" , "integer"]
  convert => ["sentbyte" , "integer"]
  convert => ["rcvdbyte" , "integer"]
  convert => ["sentpkt" , "integer"]
  convert => ["rcvdpkt" , "integer"]
}
date {
  match => ["epoch-unixms", "UNIX_MS"]
  locale => "en"
  remove_field => ["epoch_unixms"]
}
geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
 }
}

finaly, the output file :

50-output.conf

code :

output {
if [type] == "fgt-backfill" {
  elasticsearch {
  hosts => ["localhost:9200"]
  index => "fgt-backfill-%{+YYYY.MM.dd}"
 }
 stdout { codec => rubydebug }
}
else {
  elasticsearch {
  hosts => ["localhost:9200"]
 }
}
}

Only the second part of the output file work .. I mean by that, that no index fgt-backfill-* is created. nor my stdout debug file populated with actual trafic or error. and only my network.log is actually populating the index logstash-*

now .. once again, im lost. and asking for guidance, please ?

For obvious reason I would like the 11-conf to output in a new index. ( that way if I mess up thing, ill be able to search the web for a solution to delete only the wrong index and keep my actual log intact )

thank you. I hope im clear enought in what I ask ... :\

unfortunately ... didnt succed in doing it.

  • The mutate filter's convert option will convert the field in the Logstash even to the desired type, but in the end it's the mapping in Elasticsearch that decides whether aggregations work as expected. Mappings can't be changed after the fact, so if e.g. rcvdpkt was a string at some point it will continue to be a string in that index. If you use daily indexes tomorrow's index should have the correct mapping assuming that the conversion is done correctly. You can use an index template to "prime" indexes with particular data types for each field.
  • I assume your configuration doesn't actually say "number format".
  • There is no IP address type here. Logstash events are JSON documents and there is no IP address data type there. That data type lively solely on the ES side.

and from what ive come to understand, even if I do suceed. ill be forced to trash my data received so far cause they wont be usable anymore.. ?

You don't have to trash it, but you will have to reindex (i.e. copy the data into a new index where the mappings are correct from the start).

Only the second part of the output file work .. I mean by that, that no index fgt-backfill-* is created. nor my stdout debug file populated with actual trafic or error. and only my network.log is actually populating the index logstash-*

The configuration looks correct. What does an event actually look like? Move the stdout output outside the conditional so it always fires.

the part about convert option with mutate filter I understood already.
Still, I dont know how to proceed to create a new index ( yeah .. I know. .... sorry for my lameness )

Nop, my configuration doesnt say number format. its actually "integer" right now
But maybe you can point me to the right direction.
The reason why I need those field converted is to be able to "field edit" em in setting to lets say : BYTES
to be able to do a scripted field doc.rcvdbyte + sentbyte like I saw in another "blog"

right now, cause they are "string" I cannot do that, it give me an error that it expect it to be numeric

( sorry for my english )
ill give a go for the stdout ( but it will generate a LOT Of logs since my network.log is already working correctly .. )

brb

its a no go. simlpy nothing coming from /var/log/fortigate/*.log coming out ...