Extract fields from multiline message

Hello!
I am new to Elastic stack, and currently I'm trying to set up simple visualization for our backup system which is run on rsync scripts. Those scripts provide a good logging, a single backup job log message looks like this:

_________________________
Backup name: Testbackup_new (from [192.168.1.1] to [linuxbox01])
Result: SUCCESS
Start time:  Wed Oct 23 02:00:01 UTC 2019
End time:  Wed Oct 23 02:00:02 UTC 2019
Time taken: 0 min  1 sec
Statistics:

Number of files: 7 (reg: 6, dir: 1)
Number of created files: 0
Number of deleted files: 0
Number of regular files transferred: 0
Total file size: 262.15M bytes
Total transferred file size: 0 bytes
Literal data: 0 bytes
Matched data: 0 bytes
File list size: 141
File list generation time: 0.001 seconds
File list transfer time: 0.000 seconds
Total bytes sent: 27
Total bytes received: 152

sent 27 bytes  received 152 bytes  119.33 bytes/sec
total size is 262.15M  speedup is 1,464,504.72

I managed to collect "multiline" messages with the following filebeat.yml configuration:

filebeat.inputs:
- type: log 
  paths:
    - /home/guahos/script/rsync.log
  multiline.pattern: '_________________________'
  multiline.negate: true
  multiline.match: after

So I can see a single event in Kibana per single backup log event, which is what I want.
The entire log event is stored under message field in Kibana.
What I really want to do now is to extract new fields from the existing message field.
I used some other tools for log aggregation before in which such things can be easily done right from the web interface, but I run into issues doing this with the Elastic stack.

What I want exactly is to extract the following fields from the line Backup name: Testbackup_new (from [192.168.1.1] to [linuxbox01]):
Testbackup_new (as backup_name)
192.168.1.1 (as backup_src)
linuxbox01 (as backup_dst)

1 s (backup_duration) from the line Time taken: 0 min 1 sec

and

27 (as bytes_sent) and 152 (as bytes_received) from the lines Total bytes sent: 27 and Total bytes received: 152 accordingly.

Is it possible to do so in the Elastic stack and what is the best way of doing it?

Hey @guahos, welcome to the discussion boards!

There are a couple of ways to do this:

I took the second route (ingest pipeline), as it doesn't require you to download or install any additional software.

Step 1: Define an ingest pipeline:

This pipeline has a number of processors. Some of these could technically be combined, but I find it easier to reason about when they're split apart. The processors run in the order shown.

  • The first (grok) processor parses out the backup_name.
  • The second (grok) processor parses out the duration string, which we process further later on.
  • The third (grok) processor parses out the bytes sent/received
  • The fourth (script) processor calculates the duration in seconds, by converting the backup_duration_minutes and backup_duration_seconds fields into integers and doing the simple math to convert the two into seconds. The value is stored in backup_duration
  • The fifth/sixth (convert) processors convert the bytes sent/received into integers (All grok processors return string values, but you likely want this data stored as an integer so you can query, aggregate, and visualize correctly)
  • The seventh/eighth (remove) processors delete the backup_duration_minutes and backup_duration_seconds fields, as they were only used to perform the final backup_duration calculation above, and likely don't need to be stored.
PUT _ingest/pipeline/guahos
{
  "description" : "my pipeline",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{GREEDYMULTILINE}%{ROW_TITLE}%{WORD:backup_name}%{SRC_START}%{IP:backup_src}%{DST_START}%{WORD:backup_dst}"
        ],
        "pattern_definitions": {
          "GREEDYMULTILINE" : "(.|\n)*",
          "ROW_TITLE": "Backup name: ",
          "SRC_START": " \\(from \\[",
          "DST_START": "\\] to \\["
        }
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{GREEDYMULTILINE}%{TIME_TAKEN_ROW_TITLE}%{NUMBER:backup_duration_minutes}%{GREEDYDATA}%{NUMBER:backup_duration_seconds}"
        ],
        "pattern_definitions": {
          "GREEDYMULTILINE" : "(.|\n)*",
          "TIME_TAKEN_ROW_TITLE": "Time taken: "
        }
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{GREEDYMULTILINE}%{BYTES_SENT_ROW_TITLE}%{NUMBER:bytes_sent}%{GREEDYMULTILINE}%{BYTES_RECEIVED_ROW_TITLE}%{NUMBER:bytes_received}"
        ],
        "pattern_definitions": {
          "NEWLINE": "\n",
          "GREEDYMULTILINE" : "(.|\n)*",
          "BYTES_SENT_ROW_TITLE": "Total bytes sent: ",
          "BYTES_RECEIVED_ROW_TITLE": "Total bytes received: "
        }
      }
    },
    {
      "script": {
        "source": "ctx.backup_duration = (Integer.parseInt(ctx.backup_duration_minutes) * 60) + Integer.parseInt(ctx.backup_duration_seconds)"
      }
    },
    {
      "convert": {
        "field": "bytes_sent",
        "type": "integer"
      }
    },
    {
      "convert": {
        "field": "bytes_received",
        "type": "integer"
      }
    },
    {
      "remove": {
        "field": "backup_duration_minutes"
      }
    },
    {
      "remove": {
        "field": "backup_duration_seconds"
      }
    }
  ]
}

Step 2: Index your data

When you index these documents, tell Elasticsearch to use the ingest pipeline you defined above (I used a pipeline id of guahos:

POST guahos/_doc?pipeline=guahos
{
  "message": """
---------------------------------------------------------------
Backup name: Testbackup_new (from [192.168.1.1] to [linuxbox01])
Result: SUCCESS
Start time:  Wed Oct 23 02:00:01 UTC 2019
End time:  Wed Oct 23 02:00:02 UTC 2019
Time taken: 0 min  1 sec
Statistics:

Number of files: 7 (reg: 6, dir: 1)
Number of created files: 0
Number of deleted files: 0
Number of regular files transferred: 0
Total file size: 262.15M bytes
Total transferred file size: 0 bytes
Literal data: 0 bytes
Matched data: 0 bytes
File list size: 141
File list generation time: 0.001 seconds
File list transfer time: 0.000 seconds
Total bytes sent: 27
Total bytes received: 152

sent 27 bytes  received 152 bytes  119.33 bytes/sec
total size is 262.15M  speedup is 1,464,504.72
  """
}

Step 3: Query your index to verify

GET guahos/_search
{
}

Returns:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "guahos",
        "_type" : "_doc",
        "_id" : "mPfl-G0BGHnlifmLZXzK",
        "_score" : 1.0,
        "_source" : {
          "bytes_received" : 152,
          "message" : """
---------------------------------------------------------------
Backup name: Testbackup_new (from [192.168.1.1] to [linuxbox01])
Result: SUCCESS
Start time:  Wed Oct 23 02:00:01 UTC 2019
End time:  Wed Oct 23 02:00:02 UTC 2019
Time taken: 0 min  1 sec
Statistics:

Number of files: 7 (reg: 6, dir: 1)
Number of created files: 0
Number of deleted files: 0
Number of regular files transferred: 0
Total file size: 262.15M bytes
Total transferred file size: 0 bytes
Literal data: 0 bytes
Matched data: 0 bytes
File list size: 141
File list generation time: 0.001 seconds
File list transfer time: 0.000 seconds
Total bytes sent: 27
Total bytes received: 152

sent 27 bytes  received 152 bytes  119.33 bytes/sec
total size is 262.15M  speedup is 1,464,504.72
""",
          "bytes_sent" : 27,
          "backup_dst" : "linuxbox01",
          "backup_name" : "Testbackup_new",
          "backup_duration" : 1,
          "backup_src" : "192.168.1.1"
        }
      }
    ]
  }
}

1 Like

Thank you Larry, I wasn't even expecting to get such a detailed answer!
So, I put the pipeline definition into Kibana console, then did the same with Step 2 and got the same output with Step 3 query.
Also, I created a new index through Management/Index patterns in Kibana web.
After that I was able to Discover the new index. I see all the desired fields so that is quite perfect.
However the only message I see is the one I manually put on Step 2, no new messages appear under this index.
What do I have to do to store all new messages under this index?

However the only message I see is the one I manually put on Step 2 , no new messages appear under this index.

You will unfortunately need to reindex your data using the pipeline argument in order for the existing messages to get parsed. These pipelines only run when the data is first indexed into Elasticsearch; since those documents were indexed before you created the pipeline, they won't have the new fields extracted.

Since I'm just setting up, I don't actually care about existing messages.
What I really want is to get all new messages through this pipeline, so that I can see them in Kibana Discover with all the desired fields extracted.
How do I generally have a data source (backup log file in my case) be ingested through a certain pipeline so that it can later be discoverable in Kibana under certain index?

How do I generally have a data source (backup log file in my case) be ingested through a certain pipeline so that it can later be discoverable in Kibana under certain index?

It looks like you're already using filebeat, so we can teach filebeat about your ingest pipeline. In your output.elasticsearch section of filebeat.yml, configure the pipeline setting with the id of your pipeline:

(example taken from Parse data using an ingest pipeline | Filebeat Reference [master] | Elastic)

output.elasticsearch:
  hosts: ["localhost:9200"]
  pipeline: guahos

I use Logstash as Filebeats output, filebeat.yml looks like that:

filebeat.inputs:
- type: log 
  paths:
    - /home/guahos/script/rsync.log
  multiline.pattern: '_________________________'
  multiline.negate: true
  multiline.match: after
    
output.logstash:
  hosts: ["192.168.21.24:5044"]
         
setup.kibana:
  host: "192.168.21.24:5601"

In Logstash I use a test_pipeline.conf looking like so:

input {
beats {
port => 5044
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
}

I tried to add pipeline: guahos to filebeat.yml like so:

output.logstash:
hosts: ["192.168.21.24:5044"]
pipeline: guahos

but that does not seem to work, no new messages can be discovered in Kibana.

I guess I have to edit Logstash test_pipeline.conf to be able to see the messages in proper index with the proper extracted fields in Kibana?

Ah, ok. Try this in your test_pipeline.conf:

input {
  beats {
    port => 5044
  }
}
output {
  elasticsearch { 
    hosts => ["localhost:9200"]
    pipeline => "guahos"
  }
}

Ok, I tried to add the pipeline config to my test_pipeline.conf , but Logstash seems to be starting ignoring those settings, even if I run it manually with -f test_pipeline.conf option, it still does not load my pipeline config, loading the default main pipeline:

Thread.exclusive is deprecated, use Thread::Mutex
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2019-10-24 10:36:06.187 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2019-10-24 10:36:06.210 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"7.4.0"}
[INFO ] 2019-10-24 10:36:09.193 [Converge PipelineAction::Create<main>] Reflections - Reflections took 84 ms to scan 1 urls, producing 20 keys and 40 values 
[INFO ] 2019-10-24 10:36:10.754 [[main]-pipeline-manager] elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[WARN ] 2019-10-24 10:36:11.126 [[main]-pipeline-manager] elasticsearch - Restored connection to ES instance {:url=>"http://localhost:9200/"}
[INFO ] 2019-10-24 10:36:11.413 [[main]-pipeline-manager] elasticsearch - ES Output version determined {:es_version=>7}
[WARN ] 2019-10-24 10:36:11.418 [[main]-pipeline-manager] elasticsearch - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[INFO ] 2019-10-24 10:36:11.480 [[main]-pipeline-manager] elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[INFO ] 2019-10-24 10:36:11.643 [Ruby-0-Thread-5: :1] elasticsearch - Using default mapping template
[WARN ] 2019-10-24 10:36:11.757 [[main]-pipeline-manager] LazyDelegatingGauge - A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[INFO ] 2019-10-24 10:36:11.767 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, :thread=>"#<Thread:0x680d126e run>"}
[INFO ] 2019-10-24 10:36:11.846 [Ruby-0-Thread-5: :1] elasticsearch - Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1, "index.lifecycle.name"=>"logstash-policy", "index.lifecycle.rollover_alias"=>"logstash"}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[INFO ] 2019-10-24 10:36:12.983 [[main]-pipeline-manager] beats - Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[INFO ] 2019-10-24 10:36:13.003 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2019-10-24 10:36:13.130 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2019-10-24 10:36:13.271 [[main]<beats] Server - Starting server on port: 5044
[INFO ] 2019-10-24 10:36:13.698 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}

I also tried to edit pipelines.yml like so:

- pipeline.id: guahos
  path.config: "/etc/logstash/test_pipeline.conf"

And tried to edit systemd Logstash config like so:

ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash" "-f" "/etc/logstash/test_pipeline.conf"

But even after that Logstash loads default main pipeline config.
Do I have to manually force Logstash to re-parse config files somehow in order for new config to be applied?

I'm not a Logstash expert, but I think --path.settings and -f are aliases of each other, so they're both expecting a path to your logstash.conf file, rather than your pipeline file. It should only be necessary to use at most one of those flags.

Second, it appears (based on the docs) that Logstash requires the pipeline file to be called pipelines.yml. I see you tried modifying this file, so that's a great start.

Can you post the full contents of logstash.yml and pipelines.yml? Please redact any sensitive information such as usernames and passwords.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.