Output file configuration


(Athanasios Antonopoulos) #1

Hi,

I have this configuration on 30-elasticsearch-output.conf

output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
file {
path => "/tmp/log/Syslog_Server_Logs/%{host[name]}/%{host[name]}-%{+YYYY-MM-dd}.log"
codec => line { format => "%{source} - %{message}" }
}
}

My output files are something like my_servers_hostname-2018-06-18.log . I would like to have the application_name-date.log. How i can modify my file output to do this.??

My filter configuration is

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

I saw that in the message the application name is related with the %{DATA:syslog_program}.
I changed the file path as
path => "/tmp/log/Syslog_Server_Logs/%{host[name]}/%{DATA:syslog_program}-%{+YYYY-MM-dd}.log"

but it didn't bring me the name of the application on the output .log file.

Do you have any idea on that?

I use elasticsearch 6.3 - kibana 6.3 - logstash 6.3.

Best Regards,
Thanos


(Magnus Bäck) #2

If the program name is stored in the syslog_program field you should include %{syslog_program} in the path option of your file output.

Also, %{host[name]} isn't the right syntax. It might work now but you should change it to %{[host][name]}. See https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#logstash-config-field-references.


(Athanasios Antonopoulos) #3

Thanks a lot for your reply,

I did the change

file {
path => "/tmp/log/Syslog_Server_Logs/%{[host][name]}/%{syslog_program}-%{+YYYY-MM-dd}.log"
codec => line { format => "%{source} - %{message}" }
}

but i get now the file as
%{syslog_program}-2018-06-18.log

do i have to write it as %{[DATA][syslog_program]} ?


(Magnus Bäck) #4

but i get now the file as
%{syslog_program}-2018-06-18.log

Then the event didn't have a syslog_program field.


(Athanasios Antonopoulos) #5

But as i can see from my filter settings
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }

this filter send me e message like this

Jun 18 16:30:01 atmonxh01 systemd: Started Session 7431 of user apache.

From that message i understand that the systemd: comes from %{DATA:syslog_program} from my filter.
Am i right??

How i can see the fields?

BR,
Thanos


(Magnus Bäck) #6

From that message i understand that the systemd: comes from %{DATA:syslog_program} from my filter.

Yes, judging by the configuration you've shown us I'd expect events to have a syslog_program field if the grok filter is successful.

How i can see the fields?

You're storing the complete events in Elasticsearch. During debugging I always recommend people to use a stdout { codec => rubydebug } output.


(Athanasios Antonopoulos) #7

A ok thanks a lot for this!
And where i add this code?


(Magnus Bäck) #8

And where i add this code?

As I said it's an output. They go in the output block.


(Athanasios Antonopoulos) #9

Hi again,

i modified a litle bit my filter to that

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

And the result is the same.
The application is stored all in the %{[message]}. I don't know how to get only this information and put it on the file output .log name


(Magnus Bäck) #10

???

You had a grok filter that looked like it would extract e.g. the syslog_program field and now you replaced it with a grok filter that doesn't do anything useful. Reinstate your old grok filter and report what do resulting events look like.


(Athanasios Antonopoulos) #11

i had this one

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

and now i changed it to this one

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

but the result on the kibana is the same as before.
i get the messages as i got with the first filter without problem

That is strange.

that is mean that all the information for example
"Jun 19 12:03:58 atdevxh104 nrpe[11297]: Client request was invalid, bailing out..."
is stored to the "message"

The problem is that from the "message" i want to take the application name "nrpe" and put it on output file name .log


(Magnus Bäck) #12

So what does the stdout { codec => rubydebug } output produce, i.e. what exactly do the processed events look like?


(Athanasios Antonopoulos) #13

For example that i get from /var/log/messages ELK server

jun 19 13:46:05 atdevxhv03 logstash: [2018-06-19T13:46:05,115][INFO ][logstash.outputs.file ] Opening file {:path=>"/tmp/log/Syslog_Server_Logs/atdevxh104.emea.nsn-net.net/atdevxh104.emea.nsn-net.net-2018-06-19.log"}
Jun 19 13:46:05 atdevxhv03 logstash: {
Jun 19 13:46:05 atdevxhv03 logstash: "message" => "Jun 19 13:46:01 atdevxh104 nrpe[21148]: Error: Request packet type/version was invalid!",
Jun 19 13:46:05 atdevxhv03 logstash: "@timestamp" => 2018-06-19T10:46:05.070Z,
Jun 19 13:46:05 atdevxhv03 logstash: "input" => {
Jun 19 13:46:05 atdevxhv03 logstash: "type" => "log"
Jun 19 13:46:05 atdevxhv03 logstash: },
Jun 19 13:46:05 atdevxhv03 logstash: "beat" => {
Jun 19 13:46:05 atdevxhv03 logstash: "name" => "atdevxh104.emea.nsn-net.net",
Jun 19 13:46:05 atdevxhv03 logstash: "hostname" => "atdevxh104.emea.nsn-net.net",
Jun 19 13:46:05 atdevxhv03 logstash: "version" => "6.3.0"
Jun 19 13:46:05 atdevxhv03 logstash: },
Jun 19 13:46:05 atdevxhv03 logstash: "offset" => 612897,
Jun 19 13:46:05 atdevxhv03 logstash: "tags" => [
Jun 19 13:46:05 atdevxhv03 logstash: [0] "beats_input_codec_plain_applied"
Jun 19 13:46:05 atdevxhv03 logstash: ],
Jun 19 13:46:05 atdevxhv03 logstash: "prospector" => {
Jun 19 13:46:05 atdevxhv03 logstash: "type" => "log"
Jun 19 13:46:05 atdevxhv03 logstash: },
Jun 19 13:46:05 atdevxhv03 logstash: "source" => "/var/log/messages",
Jun 19 13:46:05 atdevxhv03 logstash: "@version" => "1",
Jun 19 13:46:05 atdevxhv03 logstash: "host" => {
Jun 19 13:46:05 atdevxhv03 logstash: "name" => "atdevxh104.emea.nsn-net.net"
Jun 19 13:46:05 atdevxhv03 logstash: }
Jun 19 13:46:05 atdevxhv03 logstash: }
Jun 19 13:46:05 atdevxhv03 logstash: {
Jun 19 13:46:05 atdevxhv03 logstash: "message" => "Jun 19 13:46:01 atdevxh104 nrpe[21148]: Client request was invalid, bailing out...",
Jun 19 13:46:05 atdevxhv03 logstash: "@timestamp" => 2018-06-19T10:46:05.070Z,
Jun 19 13:46:05 atdevxhv03 logstash: "offset" => 612980,
Jun 19 13:46:05 atdevxhv03 logstash: "input" => {
Jun 19 13:46:05 atdevxhv03 logstash: "type" => "log"
Jun 19 13:46:05 atdevxhv03 logstash: },
Jun 19 13:46:05 atdevxhv03 logstash: "beat" => {
Jun 19 13:46:05 atdevxhv03 logstash: "version" => "6.3.0",
Jun 19 13:46:05 atdevxhv03 logstash: "name" => "atdevxh104.emea.nsn-net.net",
Jun 19 13:46:05 atdevxhv03 logstash: "hostname" => "atdevxh104.emea.nsn-net.net"
Jun 19 13:46:05 atdevxhv03 logstash: },
Jun 19 13:46:05 atdevxhv03 logstash: "tags" => [
Jun 19 13:46:05 atdevxhv03 logstash: [0] "beats_input_codec_plain_applied"
Jun 19 13:46:05 atdevxhv03 logstash: ],
Jun 19 13:46:05 atdevxhv03 logstash: "prospector" => {
Jun 19 13:46:05 atdevxhv03 logstash: "type" => "log"
Jun 19 13:46:05 atdevxhv03 logstash: },
Jun 19 13:46:05 atdevxhv03 logstash: "source" => "/var/log/messages",
Jun 19 13:46:05 atdevxhv03 logstash: "@version" => "1",
Jun 19 13:46:05 atdevxhv03 logstash: "host" => {
Jun 19 13:46:05 atdevxhv03 logstash: "name" => "atdevxh104.emea.nsn-net.net"
Jun 19 13:46:05 atdevxhv03 logstash: }
Jun 19 13:46:05 atdevxhv03 logstash: }


(Magnus Bäck) #14

As you can see the type field doesn't contain "syslog" (in fact, the type field isn't set at all) so your filters aren't used.


(Athanasios Antonopoulos) #15

So i have to set type field as "log" and not as "syslog"


(Magnus Bäck) #16

You can set the fields to any values you like as long as you're being consistent with the rest of your configuration.


(Athanasios Antonopoulos) #17

So if on my clients filebeat.yml configuration have

  • type: log

i have to write on filter
if [type] == "log"

Do i need the filter??


(Magnus Bäck) #18

type: log in the Filebeat configuration doesn't have anything to do with the type field in the Logstash messages. If you want to set the type field to e.g. syslog you can do this:

fields:
  type: syslog
fields_under_root: true

See complete example at https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html.


(Athanasios Antonopoulos) #19

Removing just the line if [type] == "syslog" solved my problem.

now i am getting in Kibana this json file from my clients

{
"_index": "filebeat-2018.06.20",
"_type": "doc",
"_id": "f6MPHGQBa73D38yOumhb",
"_version": 1,
"_score": null,
"_source": {
"syslog_timestamp": "Jun 20 10:18:59",
"host": {
"name": "atdevxh104.emea.nsn-net.net"
},
"syslog_facility": "user-level",
"source": "/var/log/messages",
"syslog_pid": "21558",
"syslog_program": "nrpe",
"offset": 757242,
"input": {
"type": "log"
},
"received_from": "{"name":"atdevxh104.emea.nsn-net.net"}",
"syslog_facility_code": 1,
"message": "Jun 20 10:18:59 atdevxh104 nrpe[21558]: Error: Request packet type/version was invalid!",
"received_at": "2018-06-20T07:19:00.884Z",
"syslog_message": "Error: Request packet type/version was invalid!",
"tags": [
"beats_input_codec_plain_applied"
],
"syslog_severity": "notice",
"@version": "1",
"beat": {
"name": "atdevxh104.emea.nsn-net.net",
"hostname": "atdevxh104.emea.nsn-net.net",
"version": "6.3.0"
},
"syslog_severity_code": 5,
"@timestamp": "2018-06-20T07:18:59.000Z",
"syslog_hostname": "atdevxh104",
"prospector": {
"type": "log"
}
},
"fields": {
"received_at": [
"2018-06-20T07:19:00.884Z"
],
"@timestamp": [
"2018-06-20T07:18:59.000Z"
]
},
"sort": [
1529479139000
]

So i can use syslog_program on my output configuration like this

output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
file {
path => "/data/Syslog_Server_Logs/%{[host][name]}/%{[syslog_program]}-%{+YYYY-MM-dd}.log"
codec => line { format => "%{[source]} - %{[message]}" }
}
}

and i am able to write the logs in seperate files using syslog_program name on the log name like this

kernel-2018-06-20.log nrpe-2018-06-20.log rtvscand-2018-06-20.log

The only problem that i have now is on winlogbeat logs from Windows Eventviewer

The name that stores the winlogbeats logs is like this
%{[syslog_program]}-2018-06-20.log

This happens beacause there is not syslog_program field on winlogbeat logs.

Can i use an if statement to seperate the file output? For example if input.type==log have this file output and if type==wineventlog have a different file output.

How i can use these if statements on my output file configuration?

Thanks a lot for all the replies


(Magnus Bäck) #20

Removing just the line if [type] == "syslog" solved my problem.

For now yes, but you don't want to apply syslog filtering for your Winlogbeat messages.

Can i use an if statement to seperate the file output? For example if input.type==log have this file output and if type==wineventlog have a different file output.

How i can use these if statements on my output file configuration?

See https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html for some examples. That page also describes the notation used for subfields ([input][log] not input.type).