Logstash split one message into multiple

Hi,

I've the following message and I managed to create the output from only message,

{"@timestamp":"2022-12-01T13:30:00.004Z","message":"<190>Dec  1 14:29:59 10.62.161.199 AA-AMG3U: 0950198238 NN [MDA 8/4]: 
LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 | LN44 SD 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 | LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1 \n","@version":"1","host":"100.62.161.XXX"}

Output

<190>Dec  1 14:29:59 10.62.161.199 AA-AMG3U: 0950198238 NN [MDA 8/4]: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 | LN44 SD 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 | LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1 

What I need to do it, split this into multiple chunks as below, Basically pick the AA-AMG3U: from the header and split the message based on the pipe separator,

AA-AMG3U: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 
AA-AMG3U: LN44 SD 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 
AA-AMG3U: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1 

You could start with

    grok { match => { "message" => "<%{NUMBER}>%{SYSLOGTIMESTAMP} %{IPV4} %{HOSTNAME:someField}: %{NUMBER} %{WORD} \[%{DATA}\]: %{GREEDYDATA:[anotherField]} \n" } }
    # Create array of strings
    mutate { split => { "[anotherField]" => "| " } }
    # Create a separate event for each array entry
    split { field => "[anotherField]" }

which will get you events like

"anotherField" => "LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1",
   "someField" => "AA-AMG3U"
1 Like

@Badger thanks! Much appreciated!

I want to output 3 lines to a CSV file. Is that possible?

You could use a csv output.

1 Like

Thanks,

I've tried to add the below config,

input {
file{
	path => "/Users/..../Input/*.txt"
    start_position => beginning
    codec => "json"
    type => "data"
    sincedb_path => "NUL"
   }
}

grok { match => { "message" => "<%{NUMBER}>%{SYSLOGTIMESTAMP} %{IPV4} %{HOSTNAME:someField}: %{NUMBER} %{WORD} \[%{DATA}\]: %{GREEDYDATA:[anotherField]} \n" } }
    # Create array of strings
    mutate { split => { "[anotherField]" => "| " } }
    # Create a separate event for each array entry
    split { field => "[anotherField]" }

   
output {
 file {
   path => "/Users/..../ELK/Logstash/test.csv"
   codec => line { format => "%{message}"}
 }
}

But it shows an error,

 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 11, column 1 (byte 189) 

You need to add filter { and } around the grok, mutate, and split filters.

Sorry! yes, silly mistake! Thank you so much.

I am going to send this to either Elasticsearch or to another system. However I wanted to see if I can merge the first two records into one. Except the "SA" and "Timestamp" everything else is similar. SA and SD events can be in any of the input files, I read in about 20B records in the process spread across several files.

Input data

{"@timestamp":"2022-12-01T13:30:00.004Z","message":"<190>Dec  1 14:29:59 10.62.161.199 AA-AMG3U: 0950198238 NN [MDA 8/4]: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 | LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 | LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1 \n","@version":"1","host":"100.62.161.XXX"}

Current output -

AA-AMG3U LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 
AA-AMG3U LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 
AA-AMG3U LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1

I would like the output to be, first record is merged between SA and SD. Is it not advisable to do such merges in logstash instead should or can those be done in Elasticsearch?

Desired output

AA-AMG3U LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 2022 Dec  1 14:29:59:89 CET
AA-AMG3U LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1

Config file currently,

input {
file{
	path => "/Users/.../Logstash/Input/*.txt"
    start_position => beginning
    codec => "json"
    type => "data"
    sincedb_path => "NUL"
   }
}

filter { 
	grok { match => { "message" => "<%{NUMBER}>%{SYSLOGTIMESTAMP} %{IPV4} %{HOSTNAME:nodeName}: %{NUMBER} %{WORD} \[%{DATA}\]: %{GREEDYDATA:[anotherField]} \n" } }
	    # Create array of strings
	    mutate { split => { "[anotherField]" => "| " } }
	    # Create a separate event for each array entry
	    split { field => "[anotherField]" }
	}

output {
 file {
   path => "/Users/..../ELK/Logstash/test.csv"
   codec => line { format => "%{nodeName} %{anotherField}"}
 }
}

Cheers,
Shanth Kumar

@Badger I am trying to further parse the strings in the array,

Input -

{"@timestamp":"2022-12-01T13:30:00.004Z","message":"<190>Dec  1 14:29:59 10.62.161.199 AA-AMG3U: 0950198238 NN [MDA 8/4]: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 | LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 | LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1 \n","@version":"1","host":"100.62.161.XXX"}

Current Output -

AA-AMG3U,LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 
AA-AMG3U,LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 
AA-AMG3U,LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1

Desired Output -

AA-AMG3U,LN44,SA,2022 Dec  1 14:29:59:87 CET,17,4001,10.XX.133.XX 56560,401 91.235.10.25 15179,2400160261XXXXX,467000XXXX1,35292011220XXXXX,string1 
AA-AMG3U,LN44,SD,2022 Dec  1 14:29:59:87 CET,17,4001,10.XX.133.XX 56560,401 91.235.10.25 15179,2400160261XXXXX,467000XXXX1,35292011220XXXXX,string1 
AA-AMG3U,LN44,SA,2022 Dec  1 14:29:59:87 CET,17,4001,10.XX.133.XX 56560,401 91.235.10.25 15179,2400160261XXXXX,467000XXXX2,35292011220XXXXX,string1 

Is it possible to parse and match the contents in the anotherField?

Config file -

input {
file{
	path => "/Users/.../Work/Projects/ELK/Logstash/Input/*.txt"
    start_position => beginning
    codec => "json"
    type => "data"
    sincedb_path => "NUL"
   }
}

filter { 
	grok { match => { "message" => "<%{NUMBER}>%{SYSLOGTIMESTAMP} %{IPV4} %{HOSTNAME:nodeName}: %{NUMBER} %{WORD} \[%{DATA}\]: %{GREEDYDATA:[anotherField]} \n" } }
	    # Create array of strings
	    mutate { split => { "[anotherField]" => "| " } }
	    # Create a separate event for each array entry
	    split { field => "[anotherField]" }	
	}

output {
stdout{}
 file {
   path => "/Users/.../Work/Projects/ELK/Logstash/test.csv"
   codec => line { format => "%{nodeName},%{anotherField}"}
 }
}

You could start with

    grok {
        pattern_definitions => { "customTimestamp" => "%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME} %{WORD}" }
        match => { "anotherField" => "%{WORD:[@metadata][code]} %{customTimestamp:[@metadata][timestamp]}%{DATA} %{NOTSPACE:[@metadata][taskId]}_string\d+\s*$" }
    }
    aggregate {
        task_id => "%{[@metadata][taskId]}"
        timeout => 5
        code => '
            code = event.get("[@metadata][code]")
            if code == "SA"
                map["anotherField"] = event.get("anotherField")
            elsif code == "SD"
                timestamp = event.get("[@metadata][timestamp]")
                if timestamp
                    anotherField = map["anotherField"]
                    event.set("anotherField", "#{anotherField} #{timestamp}")
                end
            end
        '
    }

It's ugly and probably fragile but just about does what you asked for.

1 Like

Hi @Badger. thanks will try and test that! Not sure if logstash can/will scan all 20B records from input source files before merging the two events. I'll experiment a bit.
I am thinking of alternative solutions.

Below is my config

input {
file{
	path => "/Users/.../Work/Projects/ELK/Logstash/Input/*.txt"
    start_position => beginning
    codec => "json"
    type => "data"
    sincedb_path => "NUL"
   }
}

filter { 
	grok { match => { "message" => "<%{NUMBER}>%{SYSLOGTIMESTAMP} %{IPV4} %{HOSTNAME:nodeName}: %{NUMBER} %{WORD} \[%{DATA}\]: %{GREEDYDATA:[anotherField]} \n" } }
	    # Create array of strings
	    mutate { split => { "[anotherField]" => "| " } }
	    # Create a separate event for each array entry
	    split { field => "[anotherField]" }	
	}

output {
stdout{}
 file {
   path => "/Users/...../Work/Projects/ELK/Logstash/test.csv"
   codec => line { format => "%{nodeName},%{anotherField}"}
 }
}

Below is the input

{"@timestamp":"2022-12-01T13:30:00.004Z","message":"<190>Dec  1 14:29:59 10.62.161.199 AA-AMG3U: 0950198238 NN [MDA 8/4]: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 | LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 | LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1 \n","@version":"1","host":"100.62.161.XXX"}

Below is the current output

AA-AMG3U,LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string1 
AA-AMG3U,LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467000XXXX1_35292011220XXXXX_string2 
AA-AMG3U,LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.XX.133.XX 56560 401 91.235.10.25 15179 2400160261XXXXX_467679XXXX2_35292011220XXXXX_string1

How do I change to get the below output?

AA-AMG3U,LN44,SA,2022 Dec  1 14:29:59:87 CET,17,4001,10.XX.133.XX 56560,401 91.235.10.25 15179,2400160261XXXXX,467000XXXX1,35292011220XXXXX,string1 
AA-AMG3U,LN44,SD,2022 Dec  1 14:29:59:87 CET,17,4001,10.XX.133.XX 56560,401 91.235.10.25 15179,2400160261XXXXX,467000XXXX1,35292011220XXXXX,string1 
AA-AMG3U,LN44,SA,2022 Dec  1 14:29:59:87 CET,17,4001,10.XX.133.XX 56560,401 91.235.10.25 15179,2400160261XXXXX,467000XXXX2,35292011220XXXXX,string1

With this input

{"@timestamp":"2022-12-01T13:30:00.004Z","message":"<190>Dec  1 14:29:59 10.62.161.199 AA-AMG3U: 0950198238 NN [MDA 8/4]: LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.00.133.00 56560 401 91.235.10.25 15179 2400160261XXXXX_4670000001_35292011220XXXXX_string1 | LN44 SD 2022 Dec  1 14:29:59:89 CET 17 4001 10.33.133.00 56560 401 91.235.10.25 15179 2400160261XXXXX_4670000001_35292011220XXXXX_string2 | LN44 SA 2022 Dec  1 14:29:59:87 CET 17 4001 10.00.133.00 56560 401 91.235.10.25 15179 2400160261XXXXX_4670000002_35292011220XXXXX_string1 \n","@version":"1","host":"100.62.161.XXX"}

Config file -

input {
file{
	path => "/Users/..../Work/Projects/ELK/Logstash/Input/*.txt"
    start_position => beginning
    codec => "json"
    type => "data"
    sincedb_path => "NUL"
   }
}

filter { 
	grok { match => { "message" => "<%{NUMBER}>%{SYSLOGTIMESTAMP} %{IPV4} %{HOSTNAME:nodeName}: %{NUMBER} %{WORD} \[%{DATA}\]: %{WORD:R1F1} %{WORD:R1F2} %{GREEDYDATA:R1F3} %{INT:R1F4} %{INT:R1F5} %{IP:R1F6} %{INT:R1F7} %{INT:R1F8} %{IP:R1F9} %{INT:R1F10} %{GREEDYDATA:R1F11}_%{GREEDYDATA:R1F12}_%{GREEDYDATA:R1F13}_%{GREEDYDATA:R1F14} \| %{GREEDYDATA:Record2} \| %{GREEDYDATA:Record3} \n" } }
	    # Create array of strings
	    #mutate { split => { "[anotherField]" => "| " } }
	    # Create a separate event for each array entry
	    #split { field => "[anotherField]" }	
	
	mutate {
		remove_field => ["@version", "@timestamp"]
		remove_field => [ "message" ]
		remove_field => ["[event][original]"]
	}

}

I realize I need to send to both to Elasticsearch and as CSV output

Sample CSV file,

AA-AMG3U:,LN44,SA,2022 Dec  1 14:29:59:87 CET,17,4001,10.00.133.00,56560,401,91.235.10.25,15179,2400160261XXXXX,4670000001,35292011220XXXXX,string1 
AA-AMG3U:,LN44,SD,2022 Dec  1 14:29:59:89 CET,17,4001,10.33.133.00,56560,401,91.235.10.25,15179,2400160261XXXXX,4670000001,35292011220XXXXX,string2 
AA-AMG3U:,LN44,SA,2022 Dec  1 14:29:59:87 CET,17,4001,10.00.133.00,56560,401,91.235.10.25,15179,2400160261XXXXX,4670000002,35292011220XXXXX,string1 

So in Elasticsearch should be able to able search for numbers "4670000001" and "4670000002" in Elasticsearch and for a specific timestamp. Value after the SA/SD is my timestamp period.

How do I modify my config file to support this? In the grok match expression, should I change for the Record2 and Record3 in the same way as Record1? Much appreciate if you can advice.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.