Add field from csv to event log send by logstash

Hi friends, I have a question about filters.

I have a conf file that works perfectly for sending active directory logs to elasticsearch.
In my log, I have a field named "event_data.TargetUserName and it has a registration number.

In my company, all users have a registration number.

So, this is a sample of my csv file :

sAMAccountName;displayName
C583;jane Doe
C090;John Doe
C587;JMichael Jackson

and my logstash conf file (I use logstash 2.4 and elasticsearch 5.2.2)

input {
kafka {
zk_connect => "192.168.18.15:2181"
group_id => "logstash-application"
topic_id => "ActiveDirectory-Application-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
output {
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-application-%{+YYYY.MM.dd}"
}

I don't know which filter using for matching the log field "event_data.TargetUserName" with the field "sAMAccountName" of my csv file and how to add a field named "userName"

I tried this but without effect :

filter {
if [event_data.TargetUserName] == "*" {
csv {
source => "/etc/logstash/mutate/ExportADLDS.csv"
columns => ["sAMAccountName","displayName"]
separator => ";"}
add_tag => ["userName"]
source => "[event_data][TargetUserName]"
target => "userName"
add_field => ["{[sAMAccountName]}", "%{[displayName]}"]
}
}

A great thanks for your help

Fayce

So you have events with [event_data][TargetUserName] set to e.g. C090 and you want to put "John Doe" into another field? Use the translate filter.

translate instead of csv ?

Yes.

Is there a way to make an offline install of that plugin ? There is an error :

./logstash-plugin install logstash-filter-translate

Validating logstash-filter-translate
Unable to download data from https://rubygems.org - Errno::ECONNREFUSED: Connection refused - Connection refused (https://api.rubygems.org/latest_specs.4.8.gz)
ERROR: Installation aborted, verification failed for logstash-filter-translate

https://www.elastic.co/guide/en/logstash/current/offline-plugins.html

Hi, I finally succeed to install logstash-filter-translate plugin.

Here is my conf file :

input {
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-2017.03.07"
}
}

filter {
mutate {
add_field => {
"[userName_string]" => "%{sAMAccountName},%{displayName}"
}
}
translate {
dictionary_path => "/etc/logstash/mutate/ExportADLDS.csv"
field => "[event_data.TargetUserName]"
destination => "userName"
}
}

output {
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-test"
}
stdout {}
}

And the error :

A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Elasticsearch hosts=>["192.168.18.15:9200"], index=>"logstash-security-2017.03.07", codec=><LogStash::Codecs::JSON charset=>"UTF-8">, query=>"{"query": { "match_all": {} } }", scan=>true, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No search type for [scan]"}],"type":"illegal_argument_exception","reason":"No search type for [scan]"},"status":400} {:level=>:error}

I am close to find the solution. A little help pleeaaasee :slight_smile:
Thanks

Fayce

I tried this with no more success

input {
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-2017.03.07"
}
}

filter {
translate {
dictionary_path => "/etc/logstash/mutate/ExportADLDS.csv"
add_field => {"userName_string%{TargetUserName}" => "%{displayName}"}
field => "event_data.TargetUserName"
destination => "event_data.userName"
remove_field => [ "@version", "@timestamp" ]
override => true
}
}

output {
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-2017.03.07"
}
stdout {}
}

:sob: :sob: :sob:

Last attempt before suicide :slight_smile:

input{
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-2017.03.07"
query=> '{"query": { "match_all": {"event_data": {"TargetUserName": true} } }'
}
}
filter{
mutate{
add_field => {"[@metadata][userName_string]" => "%{event_data.TargetUserName}"}
}
translate {
dictionary_path => "/etc/logstash/mutate/ExportADLDS.yml"
field => "[@metadata][userName_string]"
destination => "userName"
remove_field => [ "@version", "@timestamp" ]
override => true
}
}
output {
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-2017.03.07"
}
stdout {}
}

And always the same error, don't know where to put expected characher

fetched an invalid config {:config=>"input{\r\n elasticsearch {\r\n hosts => ["192.168.18.15:9200"]\r\n index => "logstash-security-2017.03.07"\r\n query=>"{"query": { "match_all": {"event_data.TargetUserName"} } }"\r\n }\r\n}\r\nfilter{ \r\n mutate{\r\n add_field => {"[@metadata][userName_string]" => "%{event_data.TargetUserName}"}\r\n }\r\n translate {\r\n dictionary_path => "/etc/logstash/mutate/ExportADLDS.yml"\r\n field => "[@metadata][userName_string]"\r\n destination => "userName"\r\n remove_field => [ "@version", "@timestamp" ]\r\n override => true\r\n }\r\n }\r\noutput {\r\n elasticsearch {\r\n hosts => ["192.168.18.15:9200"]\r\n index => "logstash-security-2017.03.07"\r\n }\r\n stdout {}\r\n}\r\n\n", :reason=>"Expected one of #, {, } at line 5, column 15 (byte 124) after input{\r\n elasticsearch {\r\n hosts => ["192.168.18.15:9200"]\r\n index => "logstash-security-2017.03.07"\r\n query=>"{"", :level=>:error}

And here is a part of my log stored in ES

{
"_index": "logstash-security-2017.03.07",
"_type": "wineventlog",
"_id": "AVqq_VmXc49FTvkI5kKg",
"_score": null,
"_source": {
"@timestamp": "2017-03-07T22:59:59.998Z",
"beat": {
"hostname": "VMDC",
"name": "VMDC",
"version": "5.1.1"
},
"computer_name": "VMDC.mycompany.fr",
"event_data": {
"LogonType": "3",
"TargetDomainName": "DOMAINXXXXX,
"TargetLogonId": "0xea4f87c4",
"TargetUserName": "M0123345",
"TargetUserSid": "S-1-5-21-117609710-1482476501-xxxxxxxx-17782"
},
"event_id": 4634,
"keywords": [
"Audit Success"

I forget the exact mapping of the field event-data.TargetUserName in ES

{
"logstash-security-2017.03.08": {
"mappings": {
"wineventlog": {
"event_data.TargetUserName": {
"full_name": "event_data.TargetUserName",
"mapping": {
"TargetUserName": {
"type": "text",
"norms": false,
"fields": {
"raw": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
}
}

You can comment out lines in your configuration to narrow down which line is resulting in the error.

Thank you Magnus, I tried that too. The difficulty is to set the exact query. Here is the mapping of my field "event_data.TargetUserName"

GET /logstash-security-2017.03.08/_mapping/field/event_data.TargetUserName

{
"logstash-security-2017.03.08": {
"mappings": {
"wineventlog": {
"event_data.TargetUserName": {
"full_name": "event_data.TargetUserName",
"mapping": {
"TargetUserName": {
"type": "text",
"norms": false,
"fields": {
"raw": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
}
}

I use that mapping for my query in logstash . Here is my conf file :

input{
elasticsearch {
hosts => ["192.168.18.15:9200"]
index => "logstash-security-2017.03.08"
codec => "json"
ssl => false
docinfo_fields => ["_index", "_type", "_id"]
docinfo_target => "@metadata"
query => "{ "query": {"mappings":{"wineventlog":{"event_data.TargetUserName":{"full_name":"event_data.TargetUserName","mapping":{"TargetUserName":{"type":"text","norms":false,"fields":{"raw":{"type":"keyword","ignore_above":256}}}}}}}}} }"
}
}
filter{
mutate{
add_field => {"[@metadata][userName_string]" => "%{event_data.TargetUserName}"}
}
translate {
dictionary_path => "/etc/logstash/mutate/ExportADLDS.yml"
field => "event_data.TargerUserName"
destination => "userName"
remove_field => [ "@version", "@timestamp" ]
override => true
}
}
output {
elasticsearch {
hosts => ["192.168.18.15:9200"]
manage_template => false
index => "logstash-security-user-2017.03.08"
document_type => "new-type"
}
stdout {}
}

And the error loop :

A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Elasticsearch hosts=>["192.168.18.15:9200"], index=>"logstash-security-2017.03.08", codec=><LogStash::Codecs::JSON charset=>"UTF-8">, ssl=>false, docinfo_fields=>["_index", "_type", "_id"], docinfo_target=>"@metadata", query=>"{ \"query\": {\"mappings\":{\"wineventlog\":{\"event_data.TargetUserName\":{ \"full_name\":\"event_data.TargetUserName\",\"mapping\":{\"TargetUserName\":{\"type\":\"text\",\"norms\":false,\"fields\":{\"raw\":{\"ty pe\":\"keyword\",\"ignore_above\":256}}}}}}}}} }", scan=>true, size=>1000, scroll=>"1m", docinfo=>false>
Error: [500] {"error":{"root_cause":[{"type":"json_parse_exception","reason":"Unexpected character ('\' (code 92)): was expecting double-quote to start field name\n at [So urce: org.elasticsearch.transport.netty4.ByteBufStreamInput@873f129; line: 1, column: 4]"}],"type":"json_parse_exception","reason":"Unexpected character ('\' (code 92)): was expecting double-quote to start field name\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@873f129; line: 1, column: 4]"},"status":500} {:level=>:error}

Hi, here is some news. I upgraded Logstash to 5.2.2 and create new conf file for importing in ES my active directory logs, and installed logstash-filter-translate with an offline package (Thanks Magnus for the link).

I try to add a field to an old index (for the moment) and try to match one existant field (event_data.TargetUserName) which is registration number of users with my yaml dictionary. If the field is present I want to add a new field with the displayName of the yaml file

yaml file
sAMAccountName;displayName
C583: jane Doe
C090: John Doe
C587: JMichael Jackson

In a terminal, when I use that command

/opt/logstash/bin# ./logstash -f /etc/logstash/mutate/Add_userName.conf

I can see that Logstash is parsing my index (all the logs display very fast) and when the parsing is over, my new field isn't here.

What I am missing ??

Here is my conf file for translate old index

input {
elasticsearch {
hosts => "192.168.18.15:9200"
index => "logstash-security-2017.03.09"
codec => "json"
query => '{"query": {"match_all": {}}}'
}
}
filter{
translate {
dictionary_path => "/etc/logstash/mutate/ExportADLDS.yml"
field => "event_data.TargetUsername"
destination => "userName"
remove_field => [ "@version", "@timestamp" ]
override => true
}
}
output {
elasticsearch {
hosts => "192.168.18.15:9200"
manage_template => false
index => "logstash-security-2017.03.09"
}
stdout {}
}

Thank you dear friends.

Fayce

Please show an example event, either produced by Logstash's stdout { codec => rubydebug } output or the actual Elasticsearch event (copy/paste from the JSON tab in Kibana).

Thanks Magnus, here is an event with TargetUserName in it:

{
"_index": "logstash-security-2017.03.09",
"_type": "wineventlog",
"_id": "AVrMXpFlR-S19PVZbRjo",
"_score": null,
"_source": {
"computer_name": "VMNTXXXXXDC.company.fr",
"process_id": 628,
"keywords": [
"Audit Success"
],
"level": "Information",
"log_name": "Security",
"record_number": "4146914779",
"event_data": {
"Status": "0x0",
"Workstation": "12-U0054",
"PackageName": "MICROSOFT_AUTHENTICATION_PACKAGE_V1_0",
"TargetUserName": "N130042"
},
"message": "The computer attempted to validate the credentials for an account.\n\nAuthentication Package:\tMICROSOFT_AUTHENTICATION_PACKAGE_V1_0\nLogon Account:\tN130042\nSource Workstation:\t12-U0054\nError Code:\t0x0",
"opcode": "Info",
"type": "wineventlog",
"tags": [
"ActiveDirectory"
],
"thread_id": 4928,
"@timestamp": "2017-03-09T15:56:45.381Z",
"event_id": 4776,
"task": "Credential Validation",
"provider_guid": "{54849625-5478-4994-A5BA-3E3B0328C30D}",
"beat": {
"hostname": "VMNTXXXDC",
"name": "VMNTXXXDC",
"version": "5.1.1"
},
"@version": "1",
"source_name": "Microsoft-Windows-Security-Auditing"
},
"fields": {
"@timestamp": [
1489075005381
]
},
"sort": [
1489075005381
]
}

Another one, a little bit different

{
"_index": "logstash-security-2017.03.09",
"_type": "wineventlog",
"_id": "AVrMbOuUR-S19PVZcaDq",
"_score": null,
"_source": {
"computer_name": "VMNTXXXDC.company.fr",
"process_id": 628,
"keywords": [
"Audit Success"
],
"level": "Information",
"log_name": "Security",
"record_number": "4146914776",
"event_data": {
"TargetLogonId": "0xe8f3149a",
"LogonType": "3",
"TargetUserName": "N129244",
"TargetDomainName": "COMPANY",
"TargetUserSid": "S-1-5-21-117609710-1482476501-1801674531-60335"
},
"message": "An account was logged off.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-21-117609710-1482476501-1801674531-60335\n\tAccount Name:\t\tN129244\n\tAccount Domain:\t\tINTRANICE\n\tLogon ID:\t\t0xE8F3149A\n\nLogon Type:\t\t\t3\n\nThis event is generated when a logon session is destroyed. It may be positively correlated with a logon event using the Logon ID value. Logon IDs are only unique between reboots on the same computer.",
"opcode": "Info",
"type": "wineventlog",
"tags": [
"ActiveDirectory"
],
"thread_id": 1360,
"@timestamp": "2017-03-09T15:56:45.253Z",
"event_id": 4634,
"task": "Logoff",
"provider_guid": "{54849625-5478-4994-A5BA-3E3B0328C30D}",
"beat": {
"hostname": "VMNTXXXDC",
"name": "VMNTXXXDC",
"version": "5.1.1"
},
"@version": "1",
"source_name": "Microsoft-Windows-Security-Auditing"
},
"fields": {
"@timestamp": [
1489075005253
]
},
"sort": [
1489075005253
]
}

You have no Event.TargetUserName field, but you have an Event field which has a TargetUserName subfield. The correct syntax for addressing that field is [Event][TargetUserName]. See https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#logstash-config-field-references.

It create a new event with tag "_grokparsefailure" for each event present in the index

{
"_index": "logstash-security-2017.03.09",
"_type": "wineventlog",
"_id": "AVrMwF1CR-S19PVZhdBp",
"_score": null,
"_source": {
"computer_name": "VMNT656DC.intranice.ville-nice.fr",
"process_id": 628,
"keywords": [
"Audit Success"
],
"level": "Information",
"log_name": "Security",
"record_number": "4146914779",
"event_data": {
"Status": "0x0",
"Workstation": "12-U0054",
"PackageName": "MICROSOFT_AUTHENTICATION_PACKAGE_V1_0",
"TargetUserName": "N130042"
},
"message": "The computer attempted to validate the credentials for an account.\n\nAuthentication Package:\tMICROSOFT_AUTHENTICATION_PACKAGE_V1_0\nLogon Account:\tN130042\nSource Workstation:\t12-U0054\nError Code:\t0x0",
"opcode": "Info",
"type": "wineventlog",
"tags": [
"ActiveDirectory",
"_grokparsefailure"
],
"thread_id": 4928,
"@timestamp": "2017-03-09T15:56:45.381Z",
"event_id": 4776,
"task": "Credential Validation",
"provider_guid": "{54849625-5478-4994-A5BA-3E3B0328C30D}",
"beat": {
"hostname": "VMNT656DC",
"name": "VMNT656DC",
"version": "5.1.1"
},
"@version": "1",
"source_name": "Microsoft-Windows-Security-Auditing"
},
"fields": {
"@timestamp": [
1489075005381
]
},
"sort": [
1489075005381
]
}

I don't understand what you're asking.

excuse me, english is not my native language.

My dictionary.yml contains 15000 entries

yaml file example
sAMAccountName: displayName
C583: jane Doe
C090: John Doe
C587: Michael Jackson

my filter file

filter{
translate {
dictionary_path => "/etc/logstash/mutate/ExportADLDS.yml"
field => "[event_data][TargetUserName]"
destination => "[username]"
remove_field => [ "@version", "@timestamp" ]
override => true
}
}

The difficulty I have is matching the field "[event_data][TargetUserName]" of my event in ES with the "sAMAccountName" of my dictionary file. If e value is present, add a new field "[username]" in my event (and it is the value of the "displayName" from my dictionary.

EXAMPLE : if in the event the field "[event_data][TargetUserName]: "C587", I want a new field "[username]" to be added in my event with the value "Michael Jackson".

Thanks for your help, it is very hard to use filters