Logstash CSV Kibana input error

Dear Logstash community,

I have challenged myself to capture my network data using TShark and make custom dashboards in Kibana.
I have the following setup:

  • 1 Ubuntu 18.04 VM with ElasticSearch, Logstash and Kibana dockerized
  • 1 Ubuntu 18.04 VM with TShark and Filebeat running on the host

I run TShark using the specified -T ek flag and export the capture to a rolling CSV file.
Using Filebeat I send the CSV file to Logstash. Logstash will then send the file to Elastic Search which will be queried by Kibana.

My problem is: Kibana shows the Logstash logs as it's main input and per event includes one whole CSV capture line in a single field. How do I solve this that the CSV becomes the main input?

This is my logstash.conf:

input {
  beats {
    port => 5044
  }
}

filter {
	csv {
		source => "message"
		columns => [ "col.Time", "col.Source", "col.Destination", "ip.src", "ip.dst", "tcp.srcport", "tcp.dstport", "col.Protocol", "ip.len", "col.Info" ]
		}
   
	mutate {
      convert => [ "ip-len", "integer"]
	}
	date {
	   match => [ "col.time", "YYYY-MM-DD HH:mm:ss.SSSSSSSSS" ]
       target => "@timestamp"
	}
}  
  
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" 
    document_type => "%{[@metadata][type]}" 
    user => elastic
    password => changeme
  }
}

This is my filebeat.yml:

filebeat.modules:
- module: system
  syslog:
    enabled: false
  auth:
    enabled: true
    var.paths: ["/home/user/Documents/tsharkcap/tshark.csv"]
output.logstash:
  hosts: ["192.168.234.134:5044"]

I run ElasticSearch with default config:

sudo docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.5.2

I run Kibana with default config:

sudo docker run --link docker-cont:elasticsearch -p5601:5601 docker.elastic.co/kibana/kibana:7.5.2

Thanks in advance,
ELK4Life

BTW, This is the Mapping as shown in Kibana, which is I believe auto generated?!

{
  "mapping": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "@version": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "agent": {
        "properties": {
          "ephemeral_id": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "hostname": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "id": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "type": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "version": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "ecs": {
        "properties": {
          "version": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "event": {
        "properties": {
          "dataset": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "module": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "timezone": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "fileset": {
        "properties": {
          "name": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "host": {
        "properties": {
          "name": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "input": {
        "properties": {
          "type": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "log": {
        "properties": {
          "file": {
            "properties": {
              "path": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              }
            }
          },
          "offset": {
            "type": "long"
          }
        }
      },
      "message": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "service": {
        "properties": {
          "type": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "tags": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      }
    }
  }
}

You need to enable multiline configuration in your filebeat.yml

Example:
multiline.pattern: ',\d+,[^",]+$'
multiline.negate: true
multiline.match: before

See this link for multiline configuration => https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html

You can also check your pattern here =>
https://play.golang.org/p/uAd5XHxscu

Yes. Elasticsearch auto detects schema. You can also control this via mapping templates etc.

Thank you for your reply!
Personally I don't think this is a multiline configuration issue. Every Kibana entry has it's own, unique, message field consisting of one, unique, CSV row.
The issue is, that the complete CSV row is inside this message component. Therefore I can't filter on any of the CSV columns

I can see a csvparsefailure in there. You are receiving the data in json format not csv.
Can you confirm?

1 Like

You were completely right!

I checked and I was using the wrong cmd input for TShark which indeed outputted json instead of csv
Thanks for pointing that out to me :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.