Add the configuration for reference
logstash config file
input {
file {
path => "/var/log/xxxxxx/*_pcap.json"
sincedb_path => "/var/log/logstash/tshark-sincedb"
#interval => 300
type => "Check_OK"
start_position=> "beginning"
}
file {
path => "/var/log/xxxxxx/*_err.json"
sincedb_path => "/var/log/logstash/tshark-err-sincedb"
#interval => 300
type => "Check_Fail"
start_position=> "beginning"
}
}
filter{
json{
source => "message"
}
fingerprint{
concatenate_sources => true
method => "SHA1"
key => "xxxxxx-elasticsearch"
source => [ "timestamp","Source_IP","Destination_IP","Application_ID","Command_Code","Flags_Request","Session_Id","Origin_Realm","Origin_Host","Destination_Realm","Destination_Host","User_Name","Result_Code","Experimental_Result_Code","RAT_Type","CC_Request_Type","CC_Request_Number","Service_Context_Id","User_Equipment_Info_Value","PDP_Address_IPv4","Rule_Space_Decision","MME_Name","missing" ]
}
date {
match => [ "timestamp", "MMM dd, YYYY HH:mm:ss.SSSSSSSSS ZZ", "MMM dd, YYYY HH:mm:ss.SSSSSSSSS ZZ" ]
}
if [type] == "Check_OK" {
mutate {
add_field => {
"Command_Name_temp" => "%{Command_Code}%{Flags_Request}"
}
}
translate {
field => "Application_ID"
destination => "Application_Name"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/xxxxxx_dictionary.yml"
}
translate {
field => "Command_Name_temp"
destination => "Command_Name"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/xxxxxx_dictionary.yml"
remove_field => [ "Command_Name_temp" ]
}
translate {
exact => true
regex => true
field => "Origin_Realm"
destination => "Origin_Provider"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/MNC-MCC-PLNM.yaml"
}
translate {
exact => true
regex => true
field => "Destination_Realm"
destination => "Destination_Provider"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/MNC-MCC-PLNM.yaml"
}
translate {
exact => true
regex => true
field => "Origin_Realm"
destination => "Origin_Country"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/MCC-Country.yaml"
}
translate {
exact => true
regex => true
field => "Destination_Realm"
destination => "Destination_Country"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/MCC-Country.yaml"
}
translate {
exact => true
field => "Origin_Country"
destination => "Origin_Location_temp"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/Country-LatLong.json"
}
translate {
exact => true
field => "Destination_Country"
destination => "Destination_Location_temp"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/Country-LatLong.json"
}
translate {
exact => true
regex => true
field => "Source_IP"
destination => "Source_Host_temp"
fallback => "Unknown,Unknown"
dictionary_path => "/var/log/logstash/xxxxxx-hosts.yaml"
}
translate {
exact => true
regex => true
field => "Destination_IP"
destination => "Destination_Host_temp"
fallback => "Unknown,Unknown"
dictionary_path => "/var/log/logstash/xxxxxx-hosts.yaml"
}
if ("" in [Source_Host_temp]) {
mutate {
split => { "Source_Host_temp" => "," }
add_field => {
"Source_Host_Name" => "%{[Source_Host_temp][0]}"
}
add_field => {
"Source_Host_Type" => "%{[Source_Host_temp][1]}"
}
remove_field => [ "Source_Host_temp" ]
}
}
if [Source_Host_Type] == "IPX" {
translate {
exact => true
regex => true
field => "Source_Host_Name"
destination => "Source_IPX"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/xxxxxx-ipx.yaml"
}
}
if ("" in [Destination_Host_temp]) {
mutate {
split => { "Destination_Host_temp" => "," }
add_field => {
"Destination_Host_Name" => "%{[Destination_Host_temp][0]}"
}
add_field => {
"Destination_Host_Type" => "%{[Destination_Host_temp][1]}"
}
remove_field => [ "Destination_Host_temp" ]
}
}
if [Destination_Host_Type] == "IPX" {
translate {
exact => true
regex => true
field => "Destination_Host_Name"
destination => "Destination_IPX"
fallback => "Unknown"
dictionary_path => "/var/log/logstash/xxxxxx-ipx.yaml"
}
}
if ("" in [Origin_Country]) {
mutate {
split => { "Origin_Location_temp" => "," }
add_field => [ "[Origin_Location][lat]", "%{[Origin_Location_temp][0]}" ]
add_field => [ "[Origin_Location][lon]", "%{[Origin_Location_temp][1]}" ]
remove_field => [ "Origin_Location_temp" ]
}
}
if ("" in [Destination_Country]) {
mutate {
split => { "Destination_Location_temp" => "," }
add_field => [ "[Destination_Location][lat]", "%{[Destination_Location_temp][0]}" ]
add_field => [ "[Destination_Location][lon]", "%{[Destination_Location_temp][1]}" ]
remove_field => [ "Destination_Location_temp" ]
}
# remove_field => [ "Origin_Location_temp", "Destination_Location_temp" ]
}
}
}
output {
if [type] == "Check_OK" {
elasticsearch {
action => "index"
hosts => [ "10.192.0.178:9200","10.192.0.147:9200" ]
index => "tshark-%{+YYYYMMdd}"
document_id => "%{fingerprint}"
flush_size => 5000
}
file {
path => "/var/log/xxxxxx/output_test_%{+YYYYMMdd}.txt"
codec => "json_lines"
}
}
if [type] == "Check_Fail" {
elasticsearch {
action => "index"
hosts => [ "10.192.0.178:9200","10.192.0.147:9200" ]
index => "tshark-err-%{+YYYYMMdd}"
document_id => "%{fingerprint}"
flush_size => 5000
}
file {
path => "/var/log/xxxxxx/output_err_test_%{+YYYYMMdd}.txt"
codec => "json_lines"
}
}
}