Getting slow logs on kibana with an hour delay time

Hi friends, i have elk cluster that includes: 20 filebeats , 6 logstash, 9 elasticsearches and 2 kibanas
Elasticsearch node roles are :
3 x [ data_hot, data_content ]
3 x [ data_warm, ingest, master ]
1x [ master ]
2 x
logstash.yml:

node.name: xxxxxxxx
path.data: /data/logstash/data
config.reload.automatic: true
config.reload.interval: 30s
path.logs: /var/log/logstash

all versions are 7.16.2
my problem is the logs on kibana are not real-time it takes about an hour for all logs to be visible in kibanas discover.
when an incident or an update occurs we have to wait an hour to see if we have a problem or everything is fine.
if you have any ideas to help me i would appreciate it

A one hour delay seem very long so I would recommend that you first verify that timestamps are parsed/set correctly. It may also be worth verifying that time is set correctly on all hosts. If this does look OK I would recommend editing your ingest pipelines to add a timestamp indicating exactly when the each document was indexed into Elasticsearch. This may provide additional information for troubleshooting.

On another note it does seem odd that the hot nodes where data is being indexed are not the ingest nodes. As it is set now I would expect data to first go to the warm nodes for ingest pipeline processing before being forwarded to the hot nodes for indexing. This sounds like an odd arrangement to me.

thanks a lot for your answer , time is set correctly on all hosts, i have a timestamp for elastic and event time for my logs. for example, my event time is 13:30 but the timestamp for Elasticsearch that i see on kibana is 14:16 . i can't find bottleneck for this delay.
if you have any opinion on arranging my nodes i would appreciate it.

Can you show a sample event with this type of delay (at least all the relevant timestamp fields)?

What does your indexing pipeline/flow look like? What Logstash filters are you using?

input {
    beats {
         port => 5044
         client_inactivity_timeout => 60000
    }
}
filter {
  if "san" in [tags] or "pr" in [tags]{
    if [message] =~ "CarbonCoreActivator" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\] \[] \[%{TIMESTAMP_ISO8601:eventTime}\]  %{LOGLEVEL:logLevel} \{%{JAVACLASS}\} - ... }" }
      }
      mutate{add_tag => [ "CarbonCoreActivator","inessential" ]}
    } else if [message] =~ "LogName = income" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\]%{SPACE}\[]%{SPACE}\[%{TIMESTAMP_ISO8601:eventTime}\]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\{%{JAVACLASS}\}%{SPACE}-%{SPACE}To: ((%{DATA:toProtocol}:/)?(%{URIPATHPARAM:toURL})?(,%{SPACE}WSAction:%{SPACE}%{DATA:WSAction})?(, SOAPAction:%{SPACE}%{DATA:soapAction})?)?, MessageID: (urn:uuid:)?(%{UUID:messageId})?, Direction: %{GREEDYDATA:direction}....( \{%{JAVACLASS}\})?" }
      }
      mutate{add_tag => [ "income","essential" ]}
      mutate {remove_field => ["message"]}
    } else if [message] =~ "LogName = out" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\]%{SPACE}\[]%{SPACE}\[%{TIMESTAMP_ISO8601:eventTime}\]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\{%{JAVACLASS}\}%{SPACE}-%{SPACE}To: ((%{DATA:toProtocol}:/)?(%{URIPATHPARAM:toURL})?(,%{SPACE}WSAction:%{SPACE}%{DATA:WSAction})?(, SOAPAction:%{SPACE}%{DATA:SOAPAction})?)?, MessageID: (urn:uuid:)?(%{UUID:messageId})?, Direction: %...( \{%{JAVACLASS}\})?" }
      }
      mutate{add_tag => [ "out","essential" ]}
      mutate {remove_field => ["message"]}
    } else if [message] =~ "LogName = Response" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\]%{SPACE}\[]%{SPACE}\[%{TIMESTAMP_ISO8601:eventTime}\]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\{%{JAVACLASS}\}%{SPACE}-%{SPACE}To: ((%{DATA:toProtocol}:/)?(%{URIPATHPARAM:toURL})?(,%{SPACE}WSAction:%{SPACE}%{DATA:WSAction})?(, SOAPAction:%{SPACE}%{DATA:SOAPAction})?)?, MessageID: (urn:uuid:)?(%{UUID:messageId})?, Direction: %{GREEDYDATA:direction}, .... ( \{%{JAVACLASS}\})?" }
      }
      mutate{add_tag => [ "Response","essential" ]}
      mutate {remove_field => ["message"]}
    } else if [message] =~ "LogName = Respond" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\]%{SPACE}\[]%{SPACE}\[%{TIMESTAMP_ISO8601:eventTime}\]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\{%{JAVACLASS}\}%{SPACE}-%{SPACE}To: ((%{DATA:toProtocol}:/)?(%{URIPATHPARAM:toURL})?(,%{SPACE}WSAction:%{SPACE}%{DATA:WSAction})?(....( \{%{JAVACLASS}\})?" }
      }
      mutate{add_tag => [ "Respond","essential" ]}
      mutate {remove_field => ["message"]}
    } else if [message] =~ "LogName = Fault" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\]%{SPACE}\[]%{SPACE}\[%{TIMESTAMP_ISO8601:eventTime}\]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\{%{JAVACLASS}\}%{SPACE}-%{SPACE}To: ((%{DATA:toProtocol}:/)?(%{URIPATHPARAM:toURL})?(,%{SPACE}WSAction:%{SPACE}%{DATA:WSAction})?(, SOAPAction:%{SPACE}%{DATA:SOAPAction})?)?, MessageID: (urn:uuid:)?(%{UUID:messageId})?, Direction: %{GREEDYDATA:direction}, ...( \{%{JAVACLASS}\})?" }
      }
      mutate{add_tag => [ "FaultLog","essential" ]}
      mutate {remove_field => ["message"]}
    } else if [message] =~ "LogName = ThrottleReject" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\]%{SPACE}\[]%{SPACE}\[%{TIMESTAMP_ISO8601:eventTime}\]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\{%{JAVACLASS}\}%{SPACE}-%{SPACE}To: ((%{DATA:toProtocol}:/)?(%{URIPATHPARAM:toURL})?(,%{SPACE}WSAction:%{SPACE}%{DATA:WSAction})?(, SOAPAction:%{SPACE}%{DATA:SOAPAction})?)?, MessageID: (urn:uuid:)?(%{UUID:messageId})?, Direction: %{GREEDYDATA:direction}, ....?( \{%{JAVACLASS}\})?" }
      }
      mutate{add_tag => [ "ThrottleReject","essential" ]}
      mutate {remove_field => ["message"]}
    } else if [source] =~ "http_access_management_console" {
      grok {
        match => { "message" => "\A%{IP:clientIp} - - \[%{GREEDYDATA:eventTime}\] \"%{GREEDYDATA:request}\" %{NUMBER:statusCode} %{GREEDYDATA} \"%{GREEDYDATA:url}\" \"%{GREEDYDATA:browser_details}\"" }
      }
      mutate{add_tag => [ "http_access_management_console","inessential" ]}
    } else if [message] =~ "logged" {
      grok {
        match => { "message" => "\[%{TIMESTAMP_ISO8601:eventTime}\]  %{LOGLEVEL:logLevel} -  \'%{GREEDYDATA:UserId} \[%{GREEDYDATA:tenantId}\]\' %{GREEDYDATA:action} at \[%{TIMESTAMP_ISO8601:actionTime}\]"}
      }
      mutate{add_tag => [ "logged","inessential" ]}
    } else if [message] =~ "INFO" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\] \[] \[%{TIMESTAMP_ISO8601:eventTime}\]  %{LOGLEVEL:logLevel} \{%{JAVACLASS}\} - %{GREEDYDATA:content} \{%{JAVACLASS}\}" }
      }
      mutate{add_tag => [ "INFO","inessential" ]}
    } else if [message] =~ "WARN" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\] \[] \[%{TIMESTAMP_ISO8601:eventTime}\]  %{LOGLEVEL:logLevel} \{%{JAVACLASS}\} - %{GREEDYDATA:content} \{%{JAVACLASS}\}" }
      }
      mutate{add_tag => [ "WARN","warn_error" ]}
    } else if [message] =~ "ERROR" {
      grok {
        match => { "message" => "\ATID: \[%{NUMBER:tenantId}\] \[] \[%{TIMESTAMP_ISO8601:eventTime}\] %{LOGLEVEL:logLevel} \{%{GREEDYDATA:errorGenerator}\} - %{GREEDYDATA:errorMessage} \{%{GREEDYDATA}\}" }
      }
      mutate{add_tag => [ "ERROR","warn_error" ]}
    }
    mutate {
        convert => ["x1", "integer"]
        convert => ["x2", "integer"]
        convert => ["x3", "integer"]
        convert => ["x4", "integer"]
    }
    mutate {
        lowercase => [ "x1" ]
        lowercase => [ "x2" ]
        lowercase => [ "x3" ]
    }
    if "out" in [xx] or "inc" in [y] {
      mutate {
            gsub => [
                "payload", ........',
                "payload", ........',
                "payload", ........',
                "payload", ........',
                "payload", ........',
                "payload", ........',
                "payload", ........'
            ]
      }
    }
    date {
        match => [ "eventTime" , "YYYY-MM-dd HH:mm:ss,SSS","ISO8601"]
        remove_field => [ "timestamp" ]
        target => "eventTime"
    }
  }
}
output {
 if "essential" in [tags] and "w" in [tags] and "san" in [tags]{
    elasticsearch {
      hosts => ["data01.:9200","data02.:9200"]
      index => "san-%{+YYYY-MM}"
      #template_name => "main"
      user => logstashuser
      password => "xxxxxxxxxxxxx"
    }
 }else  if "/test/ in [context] and "a2" in [tags] and "pr" in [tags]{
    elasticsearch {
      hosts => ["data01.:9200","data02.:9200"]
          index => "main-%{+YYYY-MM}"
      user => logstashuser
      password => "xxxxxxxxxxxxx"
    }
 }else if "essential" in [tags] and "a2" in [tags] and "pr" in [tags]{
    elasticsearch {
      hosts => ["data01.:9200","data02.:9200"]
      index => "main-%{+YYYY-MM-dd}"
      #template_name => "main"
      user => logstashuser
      password => "xxxxxxxxxxxxx"
    }
 }else if "a2" in [tags] and "pr" in [tags]{
    elasticsearch {
      hosts => ["data01:9200","data02.:9200"]
      index => "general-%{+xxxx}-w%{+ww}"
      #template_name => "general-logtemplate"
      user => logstashuser
      password => "xxxxxxxxxxxxx"
    }
 }
}

}

Please show the JSON representation as Kibana does reformat some fields, e.g. adapt timezone. Also do not post images as they can be difficult to read and can not be searched.

I would also recommend keeping the original parsed timestamp field instead of replacing it as this will help troubleshooting. Please also consider adding an ingest timestamp as I described in earlier post.