KV field_split prevents logstash ingesting data


(Sjaak) #1

Hi,

I found something strange going on with the field_split option. I'm trying to parse fortigate logfiles.

Default log:

status=deny policyid=0 dst_country="Reserved" src_country="Reserved" service=1947/udp proto=17 duration=61871 sent=0 rcvd=0 msg="iprope_in_check() check failed, drop"

Comma separate log:

EDIT for some reason you cannot paste code with commas? The log is the same as the first one only spaces are replaced with commas(,).

Now I figured that with the comma separate log it should be easy to split everything up using field_split => ","

But when I add the field_split option to my config, logstash stops ingesting data. Logstash is running fine and other logs still get ingested so its not a config error that prevents logstash from running but something else is. When I turn off comma separate values on the fortigate then logstash ingests just fine but obviously everything is put in the message field so that doesn't help me much.

input {
        udp {
                type => "fortigate"
                port => 9994
        }
}

filter {
  if [type] == "fortigate" {
 
 
 
        kv {
        source => "message" 
        field_split => ","
}
}
}

(Nachiket) #2

Hi sjaak,

Did you try using the value_split option and setting it to "="?

N


(Sjaak) #3

Same problem but the other way round.

Default logs (no comma) and value_split => "=" results in no logs coming in. Enabling comma separate logs and logs come in but fields obviously are not split up.

filter {
  if [type] == "fortigate" {
 
 
 
        kv {
        source => "message" 
        value_split => "="
}
}
}

(Sjaak) #4

Strange.

        kv {
        source => "message"
        field_split => "=" 
        value_split => " "

Leads to some values becoming a field and what should be a field becomes a value.

    "Forti-hostname": "device_id",
    "other": "pri",
    "255.255.255.255": "dst_port",
    "17500/udp": "proto",
    "type": "fortigate",
    "FGT": "log_id",
    "10.0.0.208": "src_port",

I've tried moving value_split above/below field_split and change " " and "=" between them but I can only get inverted results.


(Nachiket) #5

Hi,

Your log files contain both a comma and equal to symbols.
Your keys pairs are separated by comma, therefore field split should be a comma. While kV pairs themselves are separated by an equal to sign. Thus value split should be an equal to.

This is mentioned in the documentation for kV filter.

Regards
N


(Sjaak) #6

The weird thing is that does not seem to work.

filter {
  if [type] == "fortigate" {
 
 
 
        kv {
        source => "message"
        field_split => "," 
        value_split => "="
}
}
}

If I turn on comma separated logs the above code results in nothing being sent to elastic but logstash is running fine. When I turn off comma separated logs (space instead of ,), the above code, without restarting logstash, does get sent to elastic but everything is in the message field so fields are not getting split.

The best I can do is the example above where the field name becomes the field value. So I'm not sure what is going on if the config above is correct.


(Magnus Bäck) #7

I can't reproduce what you're reporting.

$ cat test.config 
input { stdin { } }
output { stdout { codec => rubydebug } }
filter {
  kv {
    field_split => ","
    value_split => "="
  }
}
$ echo 'status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=1947/udp proto=17,duration=61871,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"' | /opt/logstash/bin/logstash -f test.config
Settings: Default pipeline workers: 8
Pipeline main started
{
        "message" => "status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=1947/udp proto=17,duration=61871,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"",
       "@version" => "1",
     "@timestamp" => "2017-09-14T05:34:43.561Z",
           "host" => "lnxolofon",
         "status" => "deny",
       "policyid" => "0",
    "dst_country" => "Reserved",
    "src_country" => "Reserved",
        "service" => "1947/udp proto=17",
       "duration" => "61871",
           "sent" => "0",
           "rcvd" => "0",
            "msg" => "iprope_in_check() check failed, drop"
}
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}

(Sjaak) #8

How did you do that?

ELK-test:/usr/share/logstash# sudo bin/logstash --path.settings /etc/logstash -f /etc/logstash/conf.d/test.conf
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
{
    "dst_country" => "Reserved",
            "msg" => "iprope_in_check() check failed, drop",
           "rcvd" => "0",
        "message" => "status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=1947/udp proto=17,duration=61871,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"",
           "type" => "fortigate",
           "sent" => "0",
       "duration" => "61871",
           "path" => "/home/test/Desktop/test/test.txt",
     "@timestamp" => 2017-09-14T05:53:01.126Z,
       "policyid" => "0",
    "src_country" => "Reserved",
        "service" => "1947/udp proto=17",
       "@version" => "1",
           "host" => "ELK-test",
         "status" => "deny"
}

Is what I get with

input {
  file {
    type => "fortigate"
    path => "/home/test/Desktop/test/test.txt"
    sincedb_path => "/dev/null"
    start_position => "beginning"
  }
}


filter {
  if [type] == "fortigate" {
 
 
 
        kv {
        source => "message"
        field_split => "," 
        value_split => "="
}
}
}


output {
  stdout {
    codec => rubydebug
  }
}

(Magnus Bäck) #9

How did you do that?

What do you mean? You seem to be getting the same results as I am, and it looks fine.


(Sjaak) #10

Sorry, I didn't look properly.

However running that config with the actual fortigate and logstash results in nothing getting into elastic. As soon as I turn off comma separated data does come in but doesn't get handled by the KV filter.

So what works with stdout doesn't work when shipping to elastic. Logstash itself is running and does not produce errors.


(Sjaak) #11

I've done some further testing but logstash keeps refushing to send data to elastic when stdout shows data is parsed correctly. When data is NOT parsed correctly, logstash IS sending data into elastic.

Example:
As mentioned before The fortigate can output logs in normal format or comma separate format. Apart from that the logs look exactly the same.

Comma separated:

<132>date=2017-09-19,time=07:03:51,devname=Fortigatetest,device_id=FGT60C11111111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int="wan1",dst=10.0.0.255,dst_port=1947,dst_int="root",SN=11111,status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=1947/udp,proto=17,duration=342889,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"

Normal:

<132>date=2017-09-19 time=07:06:43 devname=Fortigatetest device_id=FGT60C11111111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.165 src_port=138 src_int="wan1" dst=10.0.0.255 dst_port=138 dst_int="root" SN=11111 status=deny policyid=0 dst_country="Reserved" src_country="Reserved" service=138/udp proto=17 duration=343060 sent=0 rcvd=0 msg="iprope_in_check() check failed, drop"

Test config (for normal logs)

input {
  file {
    type => "fortigate"
    path => "/home/test/Desktop/test/nocomma.txt"
    sincedb_path => "/dev/null"
    start_position => "beginning"
  }
}


filter {
  if [type] == "fortigate" {

		grok {
			match => ["message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}"]
			overwrite => [ "message" ]
			tag_on_failure => [ "failure_grok_fortigate" ]
		}

        kv {

    value_split => "="
}
}
}


output {
  stdout {
    codec => rubydebug
  }
}

Gives the correct result.

/usr/share/logstash# sudo bin/logstash --path.settings /etc/logstash -f /etc/logstash/configs/test.conf
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
{
              "date" => "2017-09-19",
           "src_int" => "wan1",
               "msg" => "iprope_in_check() check failed, drop",
               "dst" => "10.0.0.255",
              "type" => "traffic",
           "dst_int" => "root",
          "duration" => "343060",
              "path" => "/home/test/Desktop/test/nocomma.txt",
          "policyid" => "0",
           "subtype" => "other",
    "syslog5424_pri" => "132",
          "@version" => "1",
              "host" => "ELK-test",
           "devname" => "Fortigatetest",
                "SN" => "11111",
       "dst_country" => "Reserved",
            "log_id" => "0038000007",
         "device_id" => "FGT60C11111111",
               "src" => "10.0.0.165",
               "pri" => "warning",
              "rcvd" => "0",
           "message" => "date=2017-09-19 time=07:06:43 devname=Fortigatetest device_id=FGT60C11111111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.165 src_port=138 src_int=\"wan1\" dst=10.0.0.255 dst_port=138 dst_int=\"root\" SN=11111 status=deny policyid=0 dst_country=\"Reserved\" src_country=\"Reserved\" service=138/udp proto=17 duration=343060 sent=0 rcvd=0 msg=\"iprope_in_check() check failed, drop\"",
              "sent" => "0",
                "vd" => "root",
          "src_port" => "138",
        "@timestamp" => 2017-09-19T06:19:34.788Z,
      "syslog_index" => "<132>",
       "src_country" => "Reserved",
           "service" => "138/udp",
             "proto" => "17",
          "dst_port" => "138",
              "time" => "07:06:43",
            "status" => "deny"

HOWEVER, when trying to put this into elastic, no data is ingested. Logstash produces no errors and works fine (other configs are ingesting data).

input {
        syslog {
                type => "fortigate"
                port => 9994
        }
}

filter {
  if [type] == "fortigate" {

		grok {
			match => ["message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}"]
			overwrite => [ "message" ]
			tag_on_failure => [ "failure_grok_fortigate" ]
		}

        kv {

    value_split => "="
}
}
}


output {
  if [type] == "fortigate" {
    elasticsearch {
    hosts => localhost
    index => "fortigate-%{+YYYY.MM.dd}"
    }
  }
}

The moment I change the settings on the Fortigate to do comma separated logs, elasticsearch is receiving data but obviously everything is put into the "message" field and the KV filter isn't doing anything.

The same happens when I try things the other way around and change the config to something like this.

        kv {
    field_split => ","
    value_split => "="

With the Fortigate doing comma separate values again nothing is sent to elastic. As soon as I turn off comma separated values data does get into elastic but again everything is in the "message" field and the KV filter isn't doing its job.

As logstash is running without errors and stdout runs the config as intended I have absolutely no idea why it isn't working.


(Magnus Bäck) #12

Have you looked in the Logstash log for clues? What if you crank up Logstash's log level? Any problems pushing to ES will be reported there.


(Sjaak) #13

Hi Magnus,

I've set the log level to debug. All looks like this. Let me know if there is another piece of the debug log that would offer more information.

[2017-09-19T16:42:40,334][DEBUG][logstash.pipeline        ] filter received {"event"=>{"@timestamp"=>2017-09-19T07:42:40.325Z, "@version"=>"1", "host"=>"1.1.1.1", "message"=>"<132>date=2017-09-19,time=08:42:32,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int=\"wan1\",dst=10.0.0.255,dst_port=1947,dst_int=\"root\",SN=211860,status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=1947/udp,proto=17,duration=348809,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"", "type"=>"fortigate"}}
[2017-09-19T16:42:40,335][DEBUG][logstash.filters.grok    ] Running grok filter {:event=>2017-09-19T07:42:40.325Z 1.1.1.1 <132>date=2017-09-19,time=08:42:32,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int="wan1",dst=10.0.0.255,dst_port=1947,dst_int="root",SN=211860,status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=1947/udp,proto=17,duration=348809,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"}
[2017-09-19T16:42:40,336][DEBUG][logstash.filters.grok    ] Event now:  {:event=>2017-09-19T07:42:40.325Z 1.1.1.1 date=2017-09-19,time=08:42:32,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int="wan1",dst=10.0.0.255,dst_port=1947,dst_int="root",SN=211860,status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=1947/udp,proto=17,duration=348809,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"}
[2017-09-19T16:42:40,340][DEBUG][logstash.pipeline        ] output received {"event"=>{"date"=>"2017-09-19", "src_int"=>"wan1", "msg"=>"iprope_in_check() check failed, drop", "dst"=>"10.0.0.255", "type"=>"traffic", "dst_int"=>"root", "duration"=>"348809", "policyid"=>"0", "subtype"=>"other", "syslog5424_pri"=>"132", "@version"=>"1", "host"=>"1.1.1.1", "devname"=>"fortitest", "SN"=>"211860", "dst_country"=>"Reserved", "log_id"=>"0038000007", "device_id"=>"FGT60C11111", "src"=>"10.0.0.173", "pri"=>"warning", "rcvd"=>"0", "message"=>"date=2017-09-19,time=08:42:32,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int=\"wan1\",dst=10.0.0.255,dst_port=1947,dst_int=\"root\",SN=211860,status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=1947/udp,proto=17,duration=348809,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"", "sent"=>"0", "vd"=>"root", "src_port"=>"51452", "@timestamp"=>2017-09-19T07:42:40.325Z, "syslog_index"=>"<132>", "src_country"=>"Reserved", "service"=>"1947/udp", "proto"=>"17", "dst_port"=>"1947", "time"=>"08:42:32", "status"=>"deny"}}
[2017-09-19T16:42:40,945][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-19T16:42:45,945][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-19T16:42:50,947][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-19T16:42:55,946][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline

I don't see any change whether I turn on or off comma separated logs on the Fortigate.


(Sjaak) #14

This one might be easier:

Comma separated ON

kv {

value_split => "="
field_split => ","

[2017-09-19T16:58:52,098][DEBUG][logstash.pipeline        ] filter received {"event"=>{"@timestamp"=>2017-09-19T07:58:52.087Z, "@version"=>"1", "host"=>"1.1.1.1", "message"=>"<132>date=2017-09-19,time=08:58:44,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int=\"wan1\",dst=10.0.0.255,dst_port=1947,dst_int=\"root\",SN=212789,status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=1947/udp,proto=17,duration=349781,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"", "type"=>"fortigate"}}
[2017-09-19T16:58:52,100][DEBUG][logstash.filters.grok    ] Running grok filter {:event=>2017-09-19T07:58:52.087Z 1.1.1.1 <132>date=2017-09-19,time=08:58:44,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int="wan1",dst=10.0.0.255,dst_port=1947,dst_int="root",SN=212789,status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=1947/udp,proto=17,duration=349781,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"}
[2017-09-19T16:58:52,105][DEBUG][logstash.filters.grok    ] Event now:  {:event=>2017-09-19T07:58:52.087Z 1.1.1.1 date=2017-09-19,time=08:58:44,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int="wan1",dst=10.0.0.255,dst_port=1947,dst_int="root",SN=212789,status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=1947/udp,proto=17,duration=349781,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"}
[2017-09-19T16:58:52,108][DEBUG][logstash.pipeline        ] output received {"event"=>{"date"=>"2017-09-19", "src_int"=>"wan1", "msg"=>"iprope_in_check() check failed, drop", "dst"=>"10.0.0.255", "type"=>"traffic", "dst_int"=>"root", "duration"=>"349781", "policyid"=>"0", "subtype"=>"other", "syslog5424_pri"=>"132", "@version"=>"1", "host"=>"1.1.1.1", "devname"=>"fortitest", "SN"=>"212789", "dst_country"=>"Reserved", "log_id"=>"0038000007", "device_id"=>"FGT60C11111", "src"=>"10.0.0.173", "pri"=>"warning", "rcvd"=>"0", "message"=>"date=2017-09-19,time=08:58:44,devname=fortitest,device_id=FGT60C11111,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.173,src_port=51452,src_int=\"wan1\",dst=10.0.0.255,dst_port=1947,dst_int=\"root\",SN=212789,status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=1947/udp,proto=17,duration=349781,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"", "sent"=>"0", "vd"=>"root", "src_port"=>"51452", "@timestamp"=>2017-09-19T07:58:52.087Z, "syslog_index"=>"<132>", "src_country"=>"Reserved", "service"=>"1947/udp", "proto"=>"17", "dst_port"=>"1947", "time"=>"08:58:44", "status"=>"deny"}}
[2017-09-19T16:58:54,621][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-19T16:58:59,620][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline


Nothing logged to elastic.

and

Comma separated OFF

kv {

value_split => "="
field_split => ","

[2017-09-19T16:53:31,506][DEBUG][logstash.pipeline        ] filter received {"event"=>{"@timestamp"=>2017-09-19T07:53:31.490Z, "@version"=>"1", "host"=>"1.1.1.1", "message"=>"<132>date=2017-09-19 time=08:53:25 devname=fortitest device_id=FGT60C11111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.208 src_port=17500 src_int=\"wan1\" dst=10.0.0.255 dst_port=17500 dst_int=\"root\" SN=212550 status=deny policyid=0 dst_country=\"Reserved\" src_country=\"Reserved\" service=17500/udp proto=17 duration=349461 sent=0 rcvd=0 msg=\"iprope_in_check() check failed, drop\"", "type"=>"fortigate"}}
[2017-09-19T16:53:31,510][DEBUG][logstash.filters.grok    ] Running grok filter {:event=>2017-09-19T07:53:31.490Z 1.1.1.1 <132>date=2017-09-19 time=08:53:25 devname=fortitest device_id=FGT60C11111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.208 src_port=17500 src_int="wan1" dst=10.0.0.255 dst_port=17500 dst_int="root" SN=212550 status=deny policyid=0 dst_country="Reserved" src_country="Reserved" service=17500/udp proto=17 duration=349461 sent=0 rcvd=0 msg="iprope_in_check() check failed, drop"}
[2017-09-19T16:53:31,511][DEBUG][logstash.filters.grok    ] Event now:  {:event=>2017-09-19T07:53:31.490Z 1.1.1.1 date=2017-09-19 time=08:53:25 devname=fortitest device_id=FGT60C11111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.208 src_port=17500 src_int="wan1" dst=10.0.0.255 dst_port=17500 dst_int="root" SN=212550 status=deny policyid=0 dst_country="Reserved" src_country="Reserved" service=17500/udp proto=17 duration=349461 sent=0 rcvd=0 msg="iprope_in_check() check failed, drop"}
[2017-09-19T16:53:31,512][DEBUG][logstash.pipeline        ] output received {"event"=>{"date"=>"2017-09-19 time=08:53:25 devname=fortitest device_id=FGT60C11111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.208 src_port=17500 src_int=\"wan1\" dst=10.0.0.255 dst_port=17500 dst_int=\"root\" SN=212550 status=deny policyid=0 dst_country=\"Reserved\" src_country=\"Reserved\" service=17500/udp proto=17 duration=349461 sent=0 rcvd=0 msg=\"iprope_in_check() check failed", "@timestamp"=>2017-09-19T07:53:31.490Z, "syslog_index"=>"<132>", "syslog5424_pri"=>"132", "@version"=>"1", "host"=>"1.1.1.1", "message"=>"date=2017-09-19 time=08:53:25 devname=fortitest device_id=FGT60C11111 log_id=0038000007 type=traffic subtype=other pri=warning vd=root src=10.0.0.208 src_port=17500 src_int=\"wan1\" dst=10.0.0.255 dst_port=17500 dst_int=\"root\" SN=212550 status=deny policyid=0 dst_country=\"Reserved\" src_country=\"Reserved\" service=17500/udp proto=17 duration=349461 sent=0 rcvd=0 msg=\"iprope_in_check() check failed, drop\"", "type"=>"fortigate"}}
[2017-09-19T16:53:33,341][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-19T16:53:38,348][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-19T16:53:43,352][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline

Everything logged to message field in elastic

Edit: The above is with an actual Fortigate sending data to logstash real time.

Edit2: Changing the test config to output to elastic instead of stdout (reading from .txt) results in the same problem.

With

value_split => "="
field_split => ","

and NO comma separated logs everything is pushed in the message field.

With

value_split => "="
field_split => ","

and comma separated values, which gives the correct results in stdout, nothing gets into elastic.


Elastic not creating fields
(Sjaak) #15

Any hints or tips what I could look at next?

The silly thing is that having my config look like this

value_split => "="
field_split => ","

and comma separated logs turned off, the moment I turn them on the last log message ingested by elastic actually shows turning on comma separated logs...


(Christian Dahlqvist) #16

What does the mappings for that index look like?


(Sjaak) #17

I'm having Elastic do the mapping generation.

GET _mapping/fortigate
{
  "fortigate2017.09.20": {
    "mappings": {
      "fortigate": {
        "properties": {
          "@timestamp": {
            "type": "date"
          },
          "facility": {
            "type": "long"
          },
          "facility_label": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "host": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "icmp,proto": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "port": {
            "type": "long"
          },
          "priority": {
            "type": "long"
          },
          "severity": {
            "type": "long"
          },
          "severity_label": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "tags": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "test": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "type": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "udp,proto": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      }
    }
  }
}

Looks like some of those fields should be text instead of long but most like fine to me?


(Christian Dahlqvist) #18

The fields in the mapping only matches some of your fields. If you enable the kv parsing and alter the elasticsearch output to write to a new index, does any data get inserted then? If so, what does the mappings look like?

You could also try to write records to a file using a json_lines codec. This would allow you to inspect the data and see if there are any fields that are generated and might cause a mapping conflict.


5.6 fundamentally broken
(Sjaak) #19

If I use the config that correctly outputs in stdout for elastic no mapping is generated and no data gets indexed.

Does it matter if all fields are in the mapping? The weird thing is that if I switch value split to , and field split to = fields do get split only the field value becomes the field name and vica versa but it does create fields.

I have never created mappings before but data is always ingested correctly. I don't understand why in this case it doesn't.

The json line coded doc pages is not very helpful.


(Sjaak) #20

So with reversed (incorrect) values the mapping looks like this

{
  "fortigate-2017.09.20": {
    "mappings": {
      "fortigate": {
        "properties": {
          "0": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "\"iprope_in_check() check failed": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          """"root"""": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          """"wan1"""": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },

With the correct KV values nothing is generated.