Parse and ingest email files using logstack -- help needed


#1

I have multiple files each containing a single email in the following format:

Message-ID: <18435268.1075855378308.JavaMail.evans@thyme>
Date: Mon, 7 May 2001 12:28:00 -0700 (PDT)
From: phillip@xxx.com
To: matthew@xxx.com, jay@xxx.com, matt@xxx.com
Subject: fun and games
Mime-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
X-From: Phillip K
X-To: 
X-cc: 
X-bcc: 
X-Folder: \Phillip-Jan2002_1\Sent Mail
X-Origin: Philip
X-FileName: phillip (Non-Privileged).pst
[1]
[2]some text
[3]some more text
[4]
[5]
[6]--- 
[7]
[8]😎😎
[9]
[10]---- 
[11]
[12] even more text  

I have created the following Grok patterns:

Message-ID: <%{NOTSPACE:messageID}>
Date: %{DATA:dateString}
From: %{NOTSPACE:from}
To: %{DATA:to}
Subject: (?<subject>(?:(.*)|\n|\r\n?)((?:(?:\n|\r\n?).*)*))
Mime-Version: %{NOTSPACE:mimeVersion}
Content-Type: %{DATA:contentType}
Content-Transfer-Encoding: %{NOTSPACE:contentTypeTransferEncoding}
X-From: %{DATA:xFrom}
X-To: %{DATA:xTo}
X-cc: %{DATA:xCC}
X-bcc: %{DATA:xBCC}
X-Folder: %{DATA:xFolder}
X-Origin: %{DATA:xOrigin}
X-FileName: %{DATA:xFilename}
(?<messageBody>(?:(.*)|\n|\r\n?)((?:(?:\n|\r\n?).*)*))

which all together successfully parses the above email example using the 'Grok Debugger without any issues.

Now being new to logstash (and elastic search) I need some help to convert this pattern into a logstash config file that will parse each email file and create a single event with this data (and then ingest as JSON document etc)

I have tried the following but no joy -- it doesn't out seem to output the lines nor give me any errors with --verbose or --debug options on

input {
  stdin { 
    codec => multiline {	
      pattern => "(^Message-ID: <%{NOTSPACE:messageID}>)|(^Date: %{DATA:dateString})|(^From: %{NOTSPACE:from})|(^To: %{DATA:to})|(^Subject: (?<subject>(?:(.*)|\n|\r\n?)((?:(?:\n|\r\n?).*)*)))|(^Mime-Version: %{NOTSPACE:mimeVersion})|(^Content-Type: %{DATA:contentType})|(^Content-Transfer-Encoding: %{NOTSPACE:contentTypeTransferEncoding})|(^X-From: %{DATA:xFrom})|(^X-To: %{DATA:xTo})|(^X-cc: %{DATA:xCC})|(^X-bcc: %{DATA:xBCC})|(^X-Folder: %{DATA:xFolder})|(^X-Origin: %{DATA:xOrigin})|(^X-FileName: %{DATA:xFilename})|((?<messageBody>(?:(.*)|\n|\r\n?)((?:(?:\n|\r\n?).*)*)))"
      negate => true
      what => "previous"
  }
   }
}
output {
    stdout { }
    stdout { codec => rubydebug }
}

Any help would be appreciated -- obviously I am missing something here ....


#2

A multiline codec configured like that terminates the first event when it starts the second event. That is, you have to have a second Message-ID: line. If you want to consume a file as an event then something like exec with cat might work.


#3

Here is the output from:

cat ../../test-files/test-example1 | ./bin/logstash -f ../../conf/test-email-v1.conf --debug

... a whole bunch of logging ....

then not in the right order I get stdout:

2018-02-09T17:52:46.332Z MMBP1.local Mime-Version: 1.0
2018-02-09T17:52:46.333Z MMBP1.local X-Origin: Philip
2018-02-09T17:52:46.335Z MMBP1.local [7]
2018-02-09T17:52:46.332Z MMBP1.local X-From: Phillip K
2018-02-09T17:52:46.334Z MMBP1.local [2]some text
2018-02-09T17:52:46.336Z MMBP1.local [10]---- 
2018-02-09T17:52:46.331Z MMBP1.local Subject: fun and games
2018-02-09T17:52:46.333Z MMBP1.local X-Folder: \Phillip-Jan2002_1\Sent Mail
2018-02-09T17:52:46.335Z MMBP1.local [6]--- 
2018-02-09T17:52:46.329Z MMBP1.local Date: Mon, 7 May 2001 12:28:00 -0700 (PDT)
2018-02-09T17:52:46.332Z MMBP1.local X-To: 
2018-02-09T17:52:46.335Z MMBP1.local [3]some more text
2018-02-09T17:52:46.336Z MMBP1.local [11]
2018-02-09T17:52:46.290Z MMBP1.local Message-ID: <18435268.1075855378308.JavaMail.evans@thyme>
2018-02-09T17:52:46.331Z MMBP1.local From: phillip@xxx.com
2018-02-09T17:52:46.333Z MMBP1.local X-cc: 
2018-02-09T17:52:46.335Z MMBP1.local [4]
2018-02-09T17:52:46.332Z MMBP1.local Content-Transfer-Encoding: 7bit
2018-02-09T17:52:46.334Z MMBP1.local [1]
2018-02-09T17:52:46.336Z MMBP1.local [9]
2018-02-09T17:52:46.331Z MMBP1.local To: matthew@xxx.com, jay@xxx.com, matt@xxx.com
2018-02-09T17:52:46.333Z MMBP1.local X-bcc: 
2018-02-09T17:52:46.335Z MMBP1.local [5]
2018-02-09T17:52:46.332Z MMBP1.local Content-Type: text/plain; charset=us-ascii
2018-02-09T17:52:46.334Z MMBP1.local X-FileName: phillip (Non-Privileged).pst
2018-02-09T17:52:46.336Z MMBP1.local [8]😎😎

then I am guessing parsed data but the message body doesn't seem to groupped together but individual (out of order) snippets:

{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.332Z,
       "message" => "Mime-Version: 1.0"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.333Z,
       "message" => "X-Origin: Philip"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.335Z,
       "message" => "[7]"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.332Z,
       "message" => "X-From: Phillip K"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.334Z,
       "message" => "[2]some text"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.336Z,
       "message" => "[10]---- "
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.331Z,
       "message" => "Subject: fun and games"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.333Z,
       "message" => "X-Folder: \\Phillip-Jan2002_1\\Sent Mail"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.335Z,
       "message" => "[6]--- "
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.290Z,
       "message" => "Message-ID: <18435268.1075855378308.JavaMail.evans@thyme>"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.331Z,
       "message" => "From: phillip@xxx.com"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.333Z,
       "message" => "X-cc: "
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.335Z,
       "message" => "[4]"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.329Z,
       "message" => "Date: Mon, 7 May 2001 12:28:00 -0700 (PDT)"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.332Z,
       "message" => "X-To: "
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.335Z,
       "message" => "[3]some more text"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.336Z,
       "message" => "[11]"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.332Z,
       "message" => "Content-Transfer-Encoding: 7bit"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.334Z,
       "message" => "[1]"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.336Z,
       "message" => "[9]"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.331Z,
       "message" => "To: matthew@xxx.com, jay@xxx.com, matt@xxx.com"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.333Z,
       "message" => "X-bcc: "
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.335Z,
       "message" => "[5]"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.332Z,
       "message" => "Content-Type: text/plain; charset=us-ascii"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.334Z,
       "message" => "X-FileName: phillip (Non-Privileged).pst"
}
{
      "@version" => "1",
          "host" => "MMBP1.local",
    "@timestamp" => 2018-02-09T17:52:46.336Z,
       "message" => "[8]😎😎"
}

#4

Did the 'cat xx | ..." as you see in the above output but it seems to me (with the little knowledge I have) that the event has messed up message body (lines 1 ... 12) and the actual field names like 'to', 'from', ...'messageBody' are missing in the output


#5

Show the input config that resulted in that output. It seems unlikely to me that it was the multiline codec configured as you had it in your first post.


#6

@Badger

Just changed the email data to add [x] in the start of the message body for easier debug and updated this posting so please refresh,

The input config is unchanged:

input {
  stdin { 
    codec => multiline {	
      pattern => "(^Message-ID: <%{NOTSPACE:messageID}>)|(^Date: %{DATA:dateString})|(^From: %{NOTSPACE:from})|(^To: %{DATA:to})|(^Subject: (?<subject>(?:(.*)|\n|\r\n?)((?:(?:\n|\r\n?).*)*)))|(^Mime-Version: %{NOTSPACE:mimeVersion})|(^Content-Type: %{DATA:contentType})|(^Content-Transfer-Encoding: %{NOTSPACE:contentTypeTransferEncoding})|(^X-From: %{DATA:xFrom})|(^X-To: %{DATA:xTo})|(^X-cc: %{DATA:xCC})|(^X-bcc: %{DATA:xBCC})|(^X-Folder: %{DATA:xFolder})|(^X-Origin: %{DATA:xOrigin})|(^X-FileName: %{DATA:xFilename})|((?<messageBody>(?:(.*)|\n|\r\n?)((?:(?:\n|\r\n?).*)*)))"
      negate => true
      what => "previous"
    }
   }
}
output {
    stdout { }
    stdout { codec => rubydebug }
}

#7

A stdin input generates events. A basis stdin input creates an event for each line. A multiline codec will (sometimes) concatenate multiple lines to form a single event. Each event is then run through the filters. You are trying to smoosh together a filter and an input, which does not work.

And I think you must have set negate to true to get it to issue each line as a separate event, but no matter.

If your file contains multiple mail messages, you can get all except the last one using

input{
  stdin{
    codec => multiline {
      pattern => "^Message-ID:"
      negate => true
      what => "previous"
    }
  }
}
You cannot get the last one because that event never ends. You might think this is a bug, and the EOF should trigger the emission of an event. I think that, but such is life. So lets try something else.

We can append a line to the file contain some string that will never occur in a file. Yeah, I know it is hard to say what that string should be. But for now we can do

(cat file.txt; echo "bonjour, mon cher Hoffmann") | ./logstash -f ...

then use a stdin input with a multiline codec

input{
  stdin {
    codec => multiline {
      pattern => "^bonjour, mon cher Hoffmann"
      negate => "true"
      what => "previous"
    }
  }
}
That will get us an event that looks like this
       "message" => "Message-ID: <18435268.1075855378308.JavaMail.evans@thyme>\nDate: Mon, 7 May 2001 12:28:00 -0700 (PDT)\nFrom: phillip@xxx.com\nTo: matthew@xxx.com, jay@xxx.com, matt@xxx.com\nSubject: fun and games\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K\nX-To: \nX-cc: \nX-bcc: \nX-Folder: \\Phillip-Jan2002_1\\Sent Mail\nX-Origin: Philip\nX-FileName: phillip (Non-Privileged).pst\n[1]\n[2]some text\n[3]some more text\n[4]\n[5]\n[6]--- \n[7]\n[8]????\n[9]\n[10]---- \n[11]\n[12] even more text",
          "tags" => [
        [0] "multiline"
    ],

You can now start working on a grok filter, or more likely several grok filters to parse items out of that. Start very simple. Get one piece of data out, then try to get a second. Do not try to start with a grok pattern that matches the entire event.


#8

@Badger thanks for this - will work through it.


#9

@Badger

I am working through the new regular expression using 'Grok Debugger' on that whole event string as you included in your post and I see written out after the new input finds the '$$' at the bottom of the file:

input {
  stdin {
    codec => multiline {	
      pattern => "^\$\$"
      negate => true
      what => "previous"
    }
   }
}
filter {
  grok {
    match => { "message" => "^Message-ID: <%{NOTSPACE:messageID}>" }
  }
    grok {
    match => { "message" => "Date: (?<date>.*(?=(\\nFrom:)))" }
  }
  grok {
    match => { "message" => "From: (?<from>.*(?=(\\nTo:)))" }
  }
}
output {
    stdout { }
    stdout { codec => rubydebug }
} 

I am seeing the following output

{
    "@timestamp" => 2018-02-09T21:01:37.228Z,
       "message" => "Message-ID: <18435268.1075855378308.JavaMail.evans@thyme>\nDate: Mon, 7 May 2001 12:28:00 -0700 (PDT)\nFrom: phillip@xxx.com\nTo: matthew@xxx.com, jay@xxx.com, matt@xxx.com\nSubject: fun and games\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K\nX-To: \nX-cc: \nX-bcc: \nX-Folder: \\Phillip-Jan2002_1\\Sent Mail\nX-Origin: Philip\nX-FileName: phillip (Non-Privileged).pst\n[1]\n[2]some text\n[3]some more text\n[4]\n[5]\n[6]--- \n[7]\n[8]😎😎\n[9]\n[10]---- \n[11]\n[12] even more text  ",
     "messageID" => "18435268.1075855378308.JavaMail.evans@thyme",  <<--- WORKING
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ],
          "host" => "MMBP1.local",
      "@version" => "1"
}

As you can see it matches the first 'Message-ID:' pattern but I guess fails on the next or ... ?? How do you do multiple grok calls or am I missing something very obvious?

The grok regular expressions are all working fine in the Grok Debugger and the 'Grok Constructor Matcher' against the whole string

Any pointers to what I am obviously doing wrong here


#10

There are no fancy quoting or escaping features in logstash configs :slight_smile: Try this, with a literal newline embedded in the string

  grok {
    match => { "message" => "Date: (?<date>.*)
From: " }
  } 

Note that using .* sometimes grabs a lot more than you want. These two variants may help you understand what it is doing. The first one says .* followed by a newline, which ends up consuming the entire message. The second says not-newline followed by a newline, which consumes the rest of the line. If the order of headers ever varies, you will need this one.

  grok {
    match => { "message" => "Date: (?<date1>.*)
" }
  }
  grok {
    match => { "message" => "Date: (?<date2>[^
]*)
" }
  }

#11

Fails ..the first one - with an error

 Error registering plugin {:pipeline_id=>"main", :plugin=>"#<LogStash::FilterDelegator:0x632a0068 @metric_events_out=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 -  name: out value:0, @metric_events_in=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 -  name: in value:0, @logger=#<LogStash::Logging::Logger:0x216c42e2 @logger=#<Java::OrgApacheLoggingLog4jCore::Logger:0x43071772>>, @metric_events_time=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 -  name: duration_in_millis value:0, @id=\"49ef64ac0cae3158f02a4495c3c8fdb3d2f85d583548c9fd0500a77dc0fbf8c2\", @klass=LogStash::Filters::Grok, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x2918084 @metric=#<LogStash::Instrument::Metric:0x4129b966 @collector=#<LogStash::Instrument::Collector:0x3a6e8ab6 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x54f7fd71 @store=#<Concurrent::Map:0x00000000000fc4 entries=3 default_proc=nil>, @structured_lookup_mutex=#<Mutex:0x1877a935>, @fast_lookup=#<Concurrent::Map:0x00000000000fc8 entries=76 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :filters, :\"49ef64ac0cae3158f02a4495c3c8fdb3d2f85d583548c9fd0500a77dc0fbf8c2\", :events]>, @filter=<LogStash::Filters::Grok match=>{\"message\"=>\"Date: (?.*)\\nFrom: \"}, id=>\"49ef64ac0cae3158f02a4495c3c8fdb3d2f85d583548c9fd0500a77dc0fbf8c2\", enable_metric=>true, periodic_flush=>false, patterns_files_glob=>\"*\", break_on_match=>true, named_captures_only=>true, keep_empty_captures=>false, tag_on_failure=>[\"_grokparsefailure\"], timeout_millis=>30000, tag_on_timeout=>\"_groktimeout\">>", :error=>"undefined group option: /Date: (?.*)\nFrom: /m", :thread=>"#<Thread:0xc144f65 run>"}
[2018-02-09T13:26:40,044][ERROR][logstash.pipeline        ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<RegexpError: undefined group option: /Date: (?.*)
From: /m>, :backtrace=>["org/jruby/RubyRegexp.java:928:in `initialize'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/vendor/bundle/jruby/2.3.0/gems/jls-grok-0.11.4/lib/grok-pure.rb:127:in `compile'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/vendor/bundle/jruby/2.3.0/gems/logstash-filter-grok-4.0.1/lib/logstash/filters/grok.rb:286:in `block in register'", "org/jruby/RubyArray.java:1734:in `each'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/vendor/bundle/jruby/2.3.0/gems/logstash-filter-grok-4.0.1/lib/logstash/filters/grok.rb:280:in `block in register'", "org/jruby/RubyHash.java:1343:in `each'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/vendor/bundle/jruby/2.3.0/gems/logstash-filter-grok-4.0.1/lib/logstash/filters/grok.rb:275:in `register'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:343:in `register_plugin'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:354:in `block in register_plugins'", "org/jruby/RubyArray.java:1734:in `each'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:354:in `register_plugins'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:744:in `maybe_setup_out_plugins'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:364:in `start_workers'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:288:in `run'", "/Users/mlofquist/Desktop/elastic-enron/servers/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:248:in `block in start'"], :thread=>"#<Thread:0xc144f65 run>"}

#12

I fixed my post (replaced < with &lt;) so that it renders as <), can you check what it says now?


#13

second one gives me from the date to the end of the string -- hold on testing second version with 2

-- my mistake it worked !!!

@Badger - amazing -- okay this seems to be the way or can I make my own custom pattern with my fancy regular expression using Oniguruma ?


#14

give me a sec :wink:

you amended the first or the second ?


#15

I edited all three, but I think the third one is what you will need. The not-newline followed by a newline expression.


#16

ok - will try the third one


#17

Third one works well - @Badger thank you so much for your patience & help


(system) #18

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.