Filters in logstash for sending the logs to elastic search index

Hi
I have configured syslog-ng which in turn receiving syslogs from different sources/devices like Windows, Linux, firewall devices etc. in port no 514. Syslog-ng is then forwarding all syslogs to Logstash server in JSON format and sending it to single elastic search index. Now i want to send the logs to different elastic search indexes based on its source so that i can view the indexes of each devices separately.

Now, i can add a tag in filter section but based on what ? I don't want to separate the syslogs based on IPADDRESS or a pattern but rather than source type and name.

How can i achieve this ?

Thanks

1 Like

What keys are available in your Json object?

You can parse the json object and set tags based on certain key values.

Then you could tag based on certain cases and send to elasticsearch indexes conditionally based on those tags.

This is my JSON object

{
"_index": "syslog-all-rq-2016.12.17",
"_type": "syslog-all",
"_id": "AVkNjl7jus_W3GFgAZNh",
"_score": null,
"_source": {
"TAGS": ".source.syslog_tcp",
"SOURCEIP": "172.19.8.210",
"PROGRAM": "369",
"PRIORITY": "notice",
"MESSAGE": "<14>1 2016-12-17T17:12:29+01:00 the logs...",
"LEGACY_MSGHDR": "369 ",
"HOST_FROM": "172.19.8.210",
"HOST": "172.19.8.210",
"FACILITY": "user",
"DATE": "Dec 17 17:12:29",
"@version": "1",
"@timestamp": "2016-12-17T16:12:29.507Z",
"host": "127.0.0.1",
"port": 35186,
"type": "syslog-all",
"tags": [
"Syslog-All"
]
},
"fields": {
"@timestamp": [
1481991149507
]
},
"sort": [
1481991149507
]
}
May be based on host_from ?

I guess I really did not need a sample of your json file as a generic response will be more useful to others.

You should be able to do something based on the sample config below to make it work (note I have not tested this exact config):

input {...}

filter {
  json {
    source => "message"
    target => "parsedjson"
  }
  mutate {
     add_field => {"somefield" => "%{[parsedjson][fieldkey]}}"}
  }
}

output {
 if [somefield] == "X" {
    elasticsearch {
      hosts => ["es-host"]
      index => "logs-X-%{+YYYY.MM.dd}"
    }
  }
  else if [somefield] == "Y" {
    elasticsearch {
      hosts => ["es-host"]
      index => "logs-Y-%{+YYYY.MM.dd}"
    }
  }
  else {
    elasticsearch {
      hosts => ["es-host"]
      index => "logs-%{+YYYY.MM.dd}"
    }
  }
}

Right, what i really want to know, how can i differentiate the syslogs of different ...may be hundreds of devices as the 'somefiled' is not uniform for these huge no of devices where syslogs are ingested in the syslog-ng server.

Thanks for your reply

This is going to be difficult if you do not have any keys that will exist in all of the json objects that can help identify unique systems.

One approach that is ideal in my mind is to use filebeat to collect/send the logs. This will allow you to add tags before sending to logstash. That being said, I completely understand that you may not be able to do this for a variety of reasons.

I would like to repeat back to you what my understanding of your troubles are so far so I can continue to help as much as I am able:

  1. You would like to send logs to separate indices based on their host or logtype.
  2. logstash is getting a Json object from your central syslog server.
  3. The Json files are not guaranteed to have a identifying key that can be used for filtering. (so you cannot use the key "HOST_FROM" because it will not be in every Json object)

Edit: I think you may mean something else for 3. is it that there will be a "HOST_FROM" key in every Json object but there will be 100s of different values for the key. This would make it difficult to add the indices with static conditionals in the output section of the config.

If any of the three statements above are wrong, let us know. Also please let me know which statement for 3 is correct.

Hi
Troy
First of all, I really appreciate you for providing valuable inputs :smiley:. Now coming back to the problem, let me answer these 3 points-

  1. You would like to send logs to separate indices based on their host or logtype.
    Yes, i want to send logs to separate indices. All logs are of type syslogs from Windows10, AD, many network devices.
  2. logstash is getting a Json object from your central syslog server.
    Yes, Syslog-ng server is sending logs to the logstash as JSON object.
  3. The Json files are not guaranteed to have a identifying key that can be used for filtering.
    I have these three fields SOURCEIP, HOST_FROM, HOST all have the same value.
    Can i add the filed SOURCEIP in filter which can be used as a key for identifying source objects and then in the output section using if else equal to SOURCEIP then go to this index . I may need to write lot of if else and have to know the SOURCEIP. looks this way it will be inefficient.

Lastly, i have added the add_field in the filter section, but testing it through command prompt with the following

echo 'JSONOBJECT' | /opt/logstash/bin/logstash -f test2.conf

the above command place the string SOURCEIP in the output instead of its value.
This is the mutate section.
mutate {
# add_field => { "SOURCEIP" => "%{[1.0][SOURCEIP]}" }
add_field => { "foo_%{SOURCEIP}" => "Hello world, from %{SOURCEIP}" }
}
What's wrong with the above section?

Thanks again

Great, I think I understand the problem and goal now.

You can extract the SOURCEIP from the json object in two steps. You first need to have the filter parse the Json object, then you can use that parsed Json to add the new field. To do this use the the following:

filter {
  json {
    source => "message"
    target => "parsedjson"
  }
  mutate {
     add_field => {"sourceip" => "%{[parsedjson][SOURCEIP]}"}
  }
}

The above will take whatever is in the message parse it as a Json object and store that parsed Json in a field named "parsedjson". The mutate section will then get the value of the key "SOURCEIP" from the "parsedjson" field and store this in a new field called "sourceip".

If I were you, I would start by having all of the logs go to a single index. From there, you can use kibana to filter based on the field "sourceip" to view logs from specific hosts. This method would avoid dynamically creating indicies.

Hi
Troy
I have already sent all syslogs from different sources to a single ES index and are visible in Kibana.
I am modifying the filter section to create different indexes for different sources based on sourceip. I will let you know about the outcome.
Thanks again

Hi Troy,
The method that we have discussed for differentiating log sources did work as i can see the ES index of the last condition ( in the output section ) since the mutate section unable to place the right IP. Instead it is inserting the string "%{[parsedjson][SOURCEIP]}" in the Source_IP field. Does JSON section is needed in the filter section ? or a simple add_field => Source_IP will do the job ?

input {
tcp {
port => 9999
type => "syslog-all"
tags => [ "Syslog-All" ]
codec => json
}
udp {
port => 9999
type => "syslog-all"
tags => [ "Syslog-All" ]
codec => json
}
}

filter {
json {
source => "message"
target => "parsedjson"
}
mutate {
add_field => {"Source_IP" => "%{[parsedjson][SOURCEIP]}"}
}
}

output {
if [Source_IP] == "172.19.117.4" {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "syslog-172.19.117.4-%{+YYYY.MM.dd}"
}
}

    else if [Source_IP] == "172.19.8.220" {     
           elasticsearch {
                             hosts => ["localhost:9200"]
                             sniffing => true
                             manage_template => false
                             index =>  "172.19.8.220-%{+YYYY.MM.dd}"
            }
     }
    
    else if [Source_IP] == "172.19.241.163" {
           elasticsearch {
                             hosts => ["localhost:9200"]
                             sniffing => true
                             manage_template => false
                             index =>  "172.19.241.163-%{+YYYY.MM.dd}"
            }
     }

    else  {
           elasticsearch {
                             hosts => ["localhost:9200"]
                             sniffing => true
                             manage_template => false
                             index =>  "syslog-rest-%{+YYYY.MM.dd}"
            }
     }

}

Thanks

Hi
Troy
Issue remains when the logstash reads the logs from syslog-ng server . instead of IP address , this value => 'Source_IP:%{[_source][SOURCEIP]}' is being added added. whereas the the same config from command line ( -f option) added the source IP. The difference is that in the test.conf the input is-
input {
stdin { codec => json }

}

as described in Add field from JSON / logstash filter
whereas in the dev. environment the input section is->

input {
tcp {
port => 9999
type => "syslog-all"
tags => [ "Syslog-All" ]
codec => json
}
udp {
port => 9999
type => "syslog-all"
tags => [ "Syslog-All" ]
codec => json
}
}

logs did arrive in the kibana dashboard for the above input but with this => Source_IP:%{[_source][SOURCEIP]}

Really puzzling

Makra,

Would you please run the same test and remove the codec => json settings from the input. I think this may be the issue.

If it works, great. If not please post the output (just as you did above) with the json codec removed.

Troy
Once i removed the codec => json its not working.

Now we are actually creating the parsedjson field as expected. So to get the value of SOURCEIP, we should use the following for the mutate section:

mutate {
  add_field => {"Source_IP" => "%{[parsedjson][_source][SOURCEIP]}"}
}

Makra,

I have a test instance up with the exact configuration that we have discussed and it is functioning fine.

If you are continuing to experience problems, here is the next step (I am positive we will solve it with this). Would you please delete all of the filter settings, leave the json codec off of the input section, and send a few logs to elasticsearch. With this, copy the exact string that appears in the "message" field, sanatize it and share here.

The goal of this is to get the input you are receiving from your syslog servers (before it is processed by logstash) and it will allow me to create a correct filter to accomplish what you need.

Hi
Troy
mutate {
add_field => {"Source_IP" => "%{[parsedjson][_source][SOURCEIP]}"}
}

Adding the above in filter section results in addition of a string in the output which is -
Source_IP:
%{[parsedjson][_source][SOURCEIP]}

The JSON after removing the filter section and commenting out the codec => json in the input section results the following in the kibana dashboard.

{
"_index": "172.18.242.165-2016.12.19",
"_type": "syslog-all",
"_id": "AVkVOiM3maokFYeRDl4z",
"_score": null,
"_source": {
"TAGS": ".source.syslog_tcp",
"SOURCEIP": "172.18.242.165",
"PROGRAM": "MSWinEventLog\t1\tMicrosoft-Windows-TaskScheduler/Maintenance\t281\tMon",
"PRIORITY": "info",
"MESSAGE": "Dec 19 04:57:36 2016\t800\tMicrosoft-Windows-TaskScheduler\tSYSTEM\tUser\tInformation\tvServer01\tMaintenance state has changed\t\tMaintenance state changed to 1 (Last Run: ‎19-‎12-‎2016 04:57).\t89",
"LEGACY_MSGHDR": "MSWinEventLog\t1\tMicrosoft-Windows-TaskScheduler/Maintenance\t281\tMon ",
"HOST_FROM": "172.18.242.165",
"HOST": "vServer01",
"FACILITY": "user",
"DATE": "Dec 19 04:57:36",
"@version": "1",
"@timestamp": "2016-12-19T03:57:21.613Z",
"host": "127.0.0.1",
"port": 35488,
"type": "syslog-all",
"tags": [
"Syslog-All"
]
},
"fields": {
"@timestamp": [
1482119841613
]
},
"sort": [
1482119841613
]
}


Thanks again

Interesting, by the looks of your output here, it appears whatever you have sending or ingesting is already creating the fields. This means you may not need to parse the Json object to work with it.

If logstash was going to have to parse the object separately, I would have expected to see the entire josn object in the "message" section of source. Here is what I would have expected to see if your syslog server was sending a raw Json object to logstash:

{
      "_index": "testing-2016.12.19",
      "_type": "logs",
      "_id": "AVkVmmhWcV8jDiSX-JrU",
      "_score": null,
      "_source": {
        "message": "{ \"TAGS\": \".source.syslog_tcp\", \"SOURCEIP\": \"---\", \"PROGRAM\": \"369\", \"PRIORITY\": \"notice\", \"MESSAGE\": \"<14>1 2016-12-17T17:12:29+01:00 TirougaII WinFileService - - [synolog@6574 synotype=\\\"WinFileService\\\" ip=\\\"---\\\" luser=\\\"pc\\\" event=\\\"read\\\" isdir=\\\"File\\\" fsize=\\\"6.00 KB\\\" fname=\\\"/DATA/800 ProductionTST/Thumbs.db\\\"][meta sequenceId=\\\"62\\\"] Event: read, Path: /DATA01/SOC/800 ProductionTST/Thumbs.db, File/Folder: File, Size: 6.00 KB, User: dtu, IP: ---\", \"LEGACY_MSGHDR\": \"369 \", \"HOST_FROM\": \"---\", \"HOST\": \"---\", \"FACILITY\": \"user\", \"DATE\": \"Dec 17 17:12:29\", \"@version\": \"1\", \"@timestamp\": \"2016-12-17T16:12:29.507Z\", \"host\": \"127.0.0.1\", \"port\": 35186, \"type\": \"syslog-all\", \"tags\": [ \"Syslog-All\" ] }",
        "@version": "1",
        "@timestamp": "2016-12-19T05:42:37.237Z",
        "host": "---",
        "Source_IP": "SOURCEIP"
      },
      "fields": {
        "@timestamp": [
          1482126157237
        ]
      },
      "sort": [
        1482126157237
      ]
    }

When you sent this to elasticsearch without the filter settings, and without the json codec of the input sections, is there a "SOURCEIP" field in kibana?

What you provided is what I asked for and it had some valuable information but we still do not have the "raw" log/format that is being sent to logstash. If you are able, we need to see the actual raw message that is being sent from syslog to logstash.

You are right troy, I have the field which i need but still why add_field section is not working, i may need to find later.

Thanks for your time and replies

That is good news Makra ( I think).

This what I am understanding from your response and the earlier screenshot. When you deleted the contents of the filter section and the json codec, then sent a message from the syslog server, you were able to see the SOURCEIP field and an IP address in Kibana. What this means is you do not need to add the field and instead it is there and ready to use.

If the field is already being added by default behavior, it is not necessary to do anything special in the filter block. and you can just use it in your conditionals using the SOURCEIP field, so it will look like:

 else if [SOURCEIP] == "<IP ADDRESS>" {...}

The reason we were unable to add the Source_IP field before is that we were telling it to get the value from a place that did not exist.

Edit: Let me know if you would like a deeper explanation of why our add_field directives were not working.

HI
Troy
Can you please tell me why the add field is not working for the following JSON ?

{
"_index": "syslog-other-udp-2016.12.28",
"_type": "nxlogWin10",
"_id": "AVlGGGkLql2uOcut2Qba",
"_score": null,
"_source": {
"TAGS": ".source.syslog_nxlog",
"SOURCEIP": "172.18.242.160",
"PROGRAM": "{"EventTime"",
"PRIORITY": "notice",
"MESSAGE": ""2016-12-28 07:41:59","Hostname":"DESKTOP-7OTLF9V","Keywords":36028797018963968,"EventType":"INFO","SeverityValue":2,"Severity":"INFO","EventID":1,"SourceName":"MYEVENTSOURCE","Task":0,"RecordNumber":2053,"ProcessID":0,"ThreadID":0,"Channel":"Application","Domain":"DESKTOP-7OTLF9V","AccountName":"devtst","UserID":"S-1-5-21-2657980916-529253927-1869581887-1001","AccountType":"User","Message":"My first log","Opcode":"Info","EventReceivedTime":"2016-12-28 07:42:00","SourceModuleName":"eventlog","SourceModuleType":"im_msvistalog","MAC":"00-0c-29-C3-E7-CE","Customer":"Contoso+test","Location":"DK-West"}",
"LEGACY_MSGHDR": "{"EventTime":",
"HOST_FROM": "172.18.242.160",
"HOST": "172.18.242.160",
"FACILITY": "user",
"DATE": "Dec 28 16:41:54",
"@version": "1",
"@timestamp": "2016-12-28T15:41:59.830Z",
"host": "127.0.0.1",
"port": 41241,
"type": "nxlogWin10",
"tags": [
"Windows10",
"_jsonparsefailure"
],
"macadd": "MAC"
},
"fields": {
"@timestamp": [
1482939719830
]
},
"sort": [
1482939719830
]
}

..................
.................
filter {
json {
source => "MESSAGE"
# target => "parsedjson"
}
mutate {
add_field =>
{
"macadd" => "%{[MESSAGE][MAC]}"
}
}
}


What i get after applying the above filter is.....
"macadd": "MAC"

but i need mcaadd: 00-0c-29-C3-E7-CE

Where is the problems ? Is it due to MESSAGE time stamp at the beginning does not contain a field name ?

Thanks in advance