Logstash Configuration File

Hi Guys,
first of all, I'm sorry in case I'm asking stupid questions - I'm pretty new to this ELK Topic, and I'm trying to get my head around it.

At the moment I'm working on a Logstash config file, that should import a CSV File into Kibana - this Config File however, seems not to work properly the way I have written it, or basically, it does sometimes work, to actually upload the CSV File the way I want it, but since I have added some addtional "features" to this File, I do not see any progress, whatsoever.

This is the Code I'm using:

input 
{
	file 
	{
		path => ["C:/Users/User/Downloads/Logs_Dec_9__2021.csv"]
		start_position => "beginning"
		sincedb_path => "NULL"
	}
}
filter 
{
	csv
	{
		columns => [ "Time","icmp_type","Destination","Interface Direction","icmp","Type","Interface","Policy Date","Action","icmp_code","ID","Interface Name","Product Family","Blade","Sequence Number","Source","Policy Name","id_generated_by_indexer","Database Tag","Log Server Origin","Service","Message Information","Origin","Marker","Protocol","logid","first","Policy Management","Performance Impact","Protection ID","Confidence Level","Attack Information","Industry Reference","Attack Name","Severity","Threat Profile","Protection Type","Suppressed Logs","Protection Name","Total Logs","inspection_settings_log","Rule","rule_uid","Service ID","Layer Name","Source Port","Source Zone","Access Rule Name","Destination Zone","Destination Port" ]
		separator => ","
	}
	grok
	{
		match => 
			{ 
			"message" => ' "%{CATALINA_DATESTAMP:Extra_Timestamp}",%{INT:ICMP_TYPE},%{HOSTNAME:Destination_Server}\(%{IP:Destination_Server_IP}\),%{CISCO_DIRECTION:Interface_Direction},%{DATA:Firewall_Packet_MSG},%{DATA:Type},%{DATA:Src_Interface},%{TIMESTAMP_ISO8601:Policy_Matched_Time},%{CISCO_REASON:Action},%{INT:Icmp_Code},%{DATA:Traffic_ID},%{DATA:Src_Interface_Name},%{DATA:Product_Family},%{DATA:Blade},%{DATA:Sequence_Number},%{IP:Source_Client},%{DATA:Firewall_Policy},.*?,%{DATA:Database_Tag},%{HOSTNAME:Log_Server} \(%{IP:Log_Server_IP}\),%{DATA:Message_from_Service},%{DATA:Message_Information},%{DATA:Origin},%{DATA:Marker},.*?,%{DATA:LogID},.*?,%{DATA:Policy_Management},%{DATA:Performance_Impact},%{DATA:Protection_ID},%{DATA:Confidence_Level},%{DATA:Attack_Information},%{DATA:Industry_Reference},%{DATA:Attack_Name},%{DATA:Severity},%{DATA:Threat_Profile},%{DATA:Protection_Type},%{DATA:Suppressed_Logs},%{DATA:Protection_Name},%{DATA:Total_Logs},%{DATA:Inspection_Settings_Log},%{DATA:Rule},%{DATA:Rule_ID},%{DATA:Service_ID},%{DATA:Layer_Name},%{DATA:Source_Port},%{DATA:Source_Zone},%{DATA:Access_Rule_Name},%{DATA:Destination_Zone},%{INT:Destination_Port}'
			}
	}
	mutate
	{
		remove_field => ["host"]
	}
}
output 
{
  elasticsearch 
  {
    hosts => ["http://localhost:9200"]
    index => "fw_09_12_21"
  }
}

I will also upload a Screenshot from the Powershell where we can see, that the code seemingly seems to work apparently, but the upload to Kibana isn't working. Before I added the GROK Pattern to this File, it was basically a hit or miss whether or not the csv was uploaded (most of the time it was, though).

In the attached Screenshot, we can see that nothing further happens, after "[2022-01-13T12:29:18,631][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[ ]}" - checking this link http://localhost:9600/_node/stats/pipelines?pretty - tells me that at least something is happening, yet in Kibana I do not see the index being created.

I hope that this does make sense somehow, and that someone might have an idea what I'm missing here, or what I'm actually doing wrong.

Any help will be greatly appreciated.

BR,
Thomas

Just some troubleshoot ideas:

  • Can you see any errors in the Elasticsearch logs?
  • If you simplify your filter, for example, only one column, do you get a different result?

Are you refering to the "logstash-plain.log"? (this was the only log that updated the moment, I triggered a new upload attempt.

Simplified my filter and also modified the grok pattern to only match 2 entries - output however remains the same - after the last line "logstash.agent" nothing happens.

I am refering to the Elasticsearch.log located on the Elasticsearch node/server. If there is something wrong with the indexing it should be displayed within the log.

That will create a file called NULL in the working directory of logstash and persist the in-memory sincedb there across restarts. You should use

sincedb_path => "NUL"

Thx - I'll have a look in there.

Thx - I'll go ahead and try this.

EDIT: I have changed this to NUL as suggested as well as amended some other lines, such as adding "skip_header => "true" above the csv function/section, in my config file. this has done the trick. this does now work with and without the grok pattern section.