How to use multiple inputs in Logstash?

Hi,

My setup: Two Windows boxes; one is for Elasticsearch and Kibana, and other is for Logstash. I am already dumping Windows events to my ELK setup.

Now, I want to collect Microsoft Exchange logs too. How can I control multiple inputs in logstash or how will it work in ELK stack? I will use this guide to collect Exchange log: https://elijahpaul.co.uk/analysing-exchange-2013-message-tracking-logs-using-elk-elasticsearch-logstash-kibana/

As per above-mentioned guide, Exchange logs will be collected in syslog format but I am collecting Windows events in Jason, will it be okay?

1 Like

Yes. In each input, use the type field and/or tags so that you can distinguish between different kinds of messages in subsequent filters and/or outputs.

Thanks Magnus,

So, how will they be distinguished in Kibana? Do I need to be create new index for it? Maybe I am asking a lame question but please bear with me.

You can use different indexes or distinguish between them using the type and/or tags. I prefer the latter since each index has a fixed RAM overhead.

Hi Magnus,

Thanks for your response. Can I use the same port while configuring new INPUT in logstash for Microsoft Exchange?

If you want to have different kinds of messages arriving on the same port you need to use some other kind of mechanism to distinguish events and modify the type field and/or the event's tags array. This could e.g. be done with the help of regexp matches on the event contents. If that's possible to do then yes you can use the same port.

I am a newbie to ELK stack. Could you please help me in writing the logstash-indexer for Microsoft Events and Exchange? Here is my config file:

input {

tcp {
codec => "json"
port => 5544
tags => ["windows","nxlog"]
type => "nxlog-json"
}

} # end input

filter {

if [type] == "nxlog-json" {
date {
match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"]
# timezone => "Europe/London"
}

mutate {
rename => [ "AccountName", "user" ]
rename => [ "AccountType", "[eventlog][account_type]" ]
rename => [ "ActivityId", "[eventlog][activity_id]" ]
rename => [ "Address", "ip6" ]
rename => [ "ApplicationPath", "[eventlog][application_path]" ]
rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ]
rename => [ "Category", "[eventlog][category]" ]
rename => [ "Channel", "[eventlog][channel]" ]
rename => [ "Domain", "domain" ]
rename => [ "EventID", "[eventlog][event_id]" ]
rename => [ "EventType", "[eventlog][event_type]" ]
rename => [ "File", "[eventlog][file_path]" ]
rename => [ "Guid", "[eventlog][guid]" ]
rename => [ "Hostname", "hostname" ]
rename => [ "Interface", "[eventlog][interface]" ]
rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ]
rename => [ "InterfaceName", "[eventlog][interface_name]" ]
rename => [ "IpAddress", "ip" ]
rename => [ "IpPort", "port" ]
rename => [ "Key", "[eventlog][key]" ]
rename => [ "LogonGuid", "[eventlog][logon_guid]" ]
rename => [ "LogonType", "[eventlog][logon_type]" ]
rename => [ "Message", "message" ]
rename => [ "ModifyingUser", "[eventlog][modifying_user]" ]
rename => [ "NewProfile", "[eventlog][new_profile]" ]
rename => [ "OldProfile", "[eventlog][old_profile]" ]
rename => [ "Port", "port" ]
rename => [ "PrivilegeList", "[eventlog][privilege_list]" ]
rename => [ "ProcessID", "pid" ]
rename => [ "ProcessName", "[eventlog][process_name]" ]
rename => [ "ProviderGuid", "[eventlog][provider_guid]" ]
rename => [ "ReasonCode", "[eventlog][reason_code]" ]
rename => [ "RecordNumber", "[eventlog][record_number]" ]
rename => [ "ScenarioId", "[eventlog][scenario_id]" ]
rename => [ "Severity", "level" ]
rename => [ "SeverityValue", "[eventlog][severity_code]" ]
rename => [ "SourceModuleName", "nxlog_input" ]
rename => [ "SourceName", "[eventlog][program]" ]
rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ]
rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ]
rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ]
rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ]
rename => [ "System", "[eventlog][system]" ]
rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ]
rename => [ "TargetLogonId", "[eventlog][target_logonid]" ]
rename => [ "TargetUserName", "[eventlog][target_user_name]" ]
rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ]
rename => [ "ThreadID", "thread" ]

}
mutate {
remove_field => [
"CurrentOrNextState",
"Description",
"EventReceivedTime",
"EventTime",
"EventTimeWritten",
"IPVersion",
"KeyLength",
"Keywords",
"LmPackageName",
"LogonProcessName",
# "LogonType",
"Name",
"Opcode",
"OpcodeValue",
"PolicyProcessingMode",
"Protocol",
"ProtocolType",
"SourceModuleType",
"State",
"Task",
"TransmittedServices",
"Type",
"UserID",
"Version"
]
}
}

}

output {
elasticsearch {
host => "ip_address"
protocol => "http"
}
} # end output

1 Like

You're not saying what the input events look like (are they also sent via NXLog?) or what you expect Logstash to do with those events so it's impossible to help.

Have your Exchange events sent to Logstash and use a stdout { codec => rubydebug } output to inspect what you get. Then start applying filters, one by one, to morph the events into what you want to have.

Yes, they are sent via nxlog. The config file, I copied earlier was for Windows Events. I guess, for Windows events, it's mentioned what to do with them.

For exchange I am planning for following config but I don't know how to merge both inputs in one file.

input

#udp syslogs stream via 5141
input {  
udp {
type => "Exchange"
port => 5141
}
}

filter

filter {  
if [type] == "Exchange" {
csv {
 add_tag => [ 'exh_msg_trk' ]
columns => ['logdate', 'client_ip', 'client_hostname',  'server_ip', 'server_hostname', 'source_context', 'connector_id', 'source', 'event_id', 'internal_message_id', 'message_id', 'network_message_id', 'recipient_address', 'recipient_status', 'total_bytes', 'recipient_count', 'related_recipient_address', 'reference', 'message_subject', 'sender_address', 'return_path', 'message_info', 'directionality', 'tenant_id', 'original_client_ip', 'original_server_ip', 'custom_data']
remove_field => [ "logdate" ]
}
grok {      
match => [ "message", "%{TIMESTAMP_ISO8601:timestamp}" ]
}
mutate {
convert => [ "total_bytes", "integer" ]
convert => [ "recipient_count", "integer" ]
split => ["recipient_address", ";"]
split => [ "source_context", ";" ]
split => [ "custom_data", ";" ]
}
date {
match => [ "timestamp", "ISO8601" ]
timezone => "Europe/London"
remove_field => [ "timestamp" ]
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
}

output

output {  
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

For exchange I am planning for following config but I don't know how to merge both inputs in one file.

You mean you don't know how to listen to messages on a single UDP port? It seems the eventlog stuff arrives as JSON and the eventlog messages are CSV. Would a simple condition that checks if the first character of the payload is a left curly brace do? If there's a curly brace, use the json filter to unmarshal the JSON payload.

Who or what will be sending the Exchange messages?

Hi Again Magnus,

Thanks a lot for your reply but to be very honest, I didn't understand your reply completely :slight_smile: I apologize for that. But lemme brief again. We got 3 Mail and 2 CAS Exchange servers. I want to dump their message tracking logs to our ELK stack servers. I already know which logs need to be collected. But now the confusion is at logstash config file which is already configured for Event Logs as I have mentioned earlier. Yes, Event logs are coming as Jason via Nxlog. Exchange message tracking logs will also push via NXLOG. Here is its output lines:

<Output out_exchange>  
Module    om_udp
Host      192.168.0.2 # Replace with your Logstash hostname/IP
Port      5141        # Replace with your desired port
Exec      $SyslogFacilityValue = 2;
Exec      $SourceName = 'exchange_msgtrk_log';
Exec      to_syslog_bsd();
</Output>

<Route exchange>  
 Path      in_exchange => out_exchange
</Route>  

Now, how can I deal both incoming Windows Event and Exchange logs in a single logstash config file. I think, now you may get a whole idea.

So, the email servers send datagrams with a single JSON payload? And the Exchange logs are not JSON? In that case it sounds like you'll be receiving UDP datagrams with payloads that more or less look like either of these two:

{"message": "some message", ...}
<123>Dec 13 16:48:11 some-hostname some-daemon[3142]: some message

If this is true you can take a peek as the message payload and see if it looks like JSON. If it does, parse it as JSON. Otherwise assume it's syslog and use a grok filter to extract fields from the syslog message.

filter {
  if [message] =~ /^\{/ {
    json {
      source => "message"
    }
  } else {
    grok {
      ...
    }
  }
}

This would be easier if you'd use different ports or wrap the Exchange messages as JSON instead of sending them as syslog messages.

1 Like