ELK final output

Hello community,

I have setup a small ELK cluster with 3 Master nodes , 3 Data Nodes , 3 Logstash Nodes and 1 Kibana Node. 3 Logstash nodes are behind F5 load balancer and those logstash connected with data nodes.

Now my concern is to get all data which lands on Data nodes i.e for one day no matter how much it is my approx. data would be 1-15 Million a day. I have configured logstash with HTTP pipeline for now and also logs those all hits into a json file along sending them into data nodes. Since I have 3 logstash nodes each will create its own file and on data node I will have a consolidated data .

My single source of truth for data , will be data nodes. Is there is away that I can schedule something to get all data with out missing anything from data node automatically and dump it into a file.

Regards

Hello and welcome,

Can you provide more context on this? It is not clear what exactly you are trying to do and what is your issue.

What you want to consolidate?

Can you share your logstash pipeline configuration so it is easier to understand what it is doing?

1 Like

@leandrojmp

Here is the config for logstash pipe-line

###############PiPeLine-Config-Logstash

#########################

# INPUT

#########################

input {

http {

host  => "0.0.0.0"

port  => 5044

ssl_enabled => true

ssl_certificate => "/etc/logstash/certs/elk.mydomain.cert"

ssl_key         => "/etc/logstash/certs/elk.mydomain.key"

ssl_certificate_authorities => \["/etc/logstash/certs/rootCA.pem"\]

#ssl_verify_mode => "force_peer"

#ssl_verify_mode => "none"

ssl_client_authentication => "optional"

threads => 16

codec => json

}

}

#######################

# FILTER

#######################

filter {

############################

# Parse Serilog Timestamp

############################

if [Timestamp] {

date {

  match    => \["Timestamp", "ISO8601"\]

  target   => "@timestamp"

  timezone => "Asia/Karachi"

}

}

############################

# Normalize log level

############################

if [Level] {

mutate {

  lowercase => \["Level"\]

  rename => { "Level" => "\[log\]\[level\]" }

}

}

############################

# Flatten Serilog Properties

############################

if [Properties] {

ruby {

  code => '

    props = event.get("Properties")

    if props.is_a?(Hash)

      props.each { |k,v| event.set(k, v) }

      event.remove("Properties")

    end

  '

}

}

############################

# JWT Masking (OPTIMIZED)

############################

ruby {

code => '

  jwt = /eyJ\[A-Za-z0-9\_-\]+\\.\[A-Za-z0-9\_-\]+\\.\[A-Za-z0-9\_-\]+/

  event.to_hash.each do |k,v|

    if v.is_a?(String) && v.match?(jwt)

      event.set(k, v.gsub(jwt, "\[REDACTED_JWT\]"))

    end

  end

'

}

############################

# ECS Mapping

############################

mutate {

rename => {

  "User_Id"         => "\[user\]\[id\]"

  "Session_Id"      => "\[session\]\[id\]"

  "Prod_Code"       => "\[event\]\[code\]"

  "IP"              => "\[source\]\[ip\]"

  "Dest_Id"        => "\[dest\]\[id\]"

  "imo"            => "\[device\]\[imo\]"

  "Server_Name"     => "\[host\]\[name\]"

  "RenderedMessage" => "message"

}

}

############################

# Geo fields

############################

mutate {

convert => {

  "longitude" => "float"

  "latitude"  => "float"

}

}

mutate {

add_field => {

  "\[location\]\[lat\]" => "%{latitude}"

  "\[location\]\[lon\]" => "%{longitude}"

}

}

mutate {

convert => {

  "\[location\]\[lat\]" => "float"

  "\[location\]\[lon\]" => "float"

}

}

############################

# Add metadata

############################

mutate {

add_field => {

  "source"      => "serilog"

  "environment" => "dev"

  "event_type"  => "parsed"

}

}

############################

# Cleanup

############################

mutate {

remove_field => \[

  "@version",

  "Timestamp",

  "MessageTemplate"

\]

}

}

###########################

# OUTPUT

###########################

output {

if [event_type] == "parsed" {

elasticsearch {

  hosts => \[

    "https://elkd1.mydomain.com:9200",

    "https://elkd2.mydomain.com:9200",

    "https://elkd3.mydomain.com:9200"

  \]

  ssl_enabled => true

  #cacert => "/etc/logstash/certs/rootCA.pem"

  ssl_certificate_authorities => "/etc/logstash/certs/rootCA.pem"

  user => "myuser"

  password => "abc122"

  index => "serilog-%{+YYYY.MM.dd}"

  ilm_enabled => false

  pool_max => 1000

  pool_max_per_route => 200

  timeout => 60

  retry_on_conflict => 3

}

############################

\# FILE OUTPUT (MANDATORY)

############################

file {

  path => "/data/logstash/parsed/serilog-%{+YYYY-MM-dd}.json"

  codec => json_lines

  flush_interval => 5

}

}

}

Hello any update on this? Anyone