I'm receiving just one log


(Salma Ait Lhaj) #1

Hi,

Please I don't why Kibana is receinving juste one log and when I refresh the timestamp changes ( It means that he is receing logs but just one and deleting other :o )


(Bill McConaghy) #2

In that screenshot the time filter is set to the last 15 minutes. If you try picking last day do you see more data? If that does not fix your issue, I'll need a lot more details in order to help you find a solution.


(Salma Ait Lhaj) #3

Event when I set the time to see all logs, I m stiil seing juste one log !!


(Bill McConaghy) #4

Seems like maybe you have set up whatever is sending the logs to Elasticsearch to use the same document ID. This would result in the behavior you are seeing (only 1 doc). If you go to console in Kibana and do a search on the index, GET yourindexname/_search, what doc count comes back?


(Christian Dahlqvist) #5

Are you assigning your own ID in the ingest pipeline? Any chance there is something wrong with this logic so all documents gets indexed using the same ID? You should be able to look at the ID of the event you can see to see if this is the case.


(Salma Ait Lhaj) #6

Here is the result :

{
  "error": {
    "root_cause": [
      {
        "type": "index_not_found_exception",
        "reason": "no such index",
        "resource.type": "index_or_alias",
        "resource.id": "yourindexname",
        "index_uuid": "_na_",
        "index": "yourindexname"
      }
    ],
    "type": "index_not_found_exception",
    "reason": "no such index",
    "resource.type": "index_or_alias",
    "resource.id": "yourindexname",
    "index_uuid": "_na_",
    "index": "yourindexname"
  },
  "status": 404
}

(Salma Ait Lhaj) #7

I'm using fingerprint to avoid duplicate data, but it was working very well until yesterday !!


(Salma Ait Lhaj) #8

@dadoonet
I removed :

fingerprint {
          method => "SHA1"
          key => "KEY"
     }

It works I'm receiving all logs but duplicated :confused: !!


(David Pilato) #9

I did not say to remove it but to put it on the top of the filters. But I feel that if the timestamp of the event (when filebeat collected the logs) is different it will generate duplicates.

When back to my keyboard, I'll try to give you some ideas to fix it.

As I told you yesterday it's more important IMO that you fix the date filter problem you are having.


(Christian Dahlqvist) #10

Typically you run the hash based on the content of the message field early on, so that the automatically assigned @timestamp field does not come into play.


(Salma Ait Lhaj) #11

I already put it on the top of the filter, it doesn't change anything, the logs were duplicated :confused: .

Can you show me how to fix the date, please ?


(Salma Ait Lhaj) #12

Do you know how to solve this, beacause i'm stuck !!

do you need my conf of logstash ?


(Christian Dahlqvist) #13

Did you specify to use just the message field as source for the hash: source => "message" ?


(Salma Ait Lhaj) #14

This is my file.conf

input {
     beats {

          port => "5044"
          type => "%{[fields] [log_type]}"
     }
}

filter {
     grok {
          match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{GREEDYDATA:data}"}
     }
     date {
          match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
     }     

if [fields][log_type] == "fortigate" {
          kv  {
               source => "data"
          }
          mutate {
               remove_field => [ "data" ]

              rename => {"dstip" => "dst_ip"}
              rename => {"dstport" => "dst_port"}
              rename => {"proto" => "protocol"}
              rename => {"dstintf" => "dst_interface"}
              rename => {"srcport" => "src_port"}
              rename => {"srcip" => "src_ip"}
              rename => {"srcintf" => "src_interface"}
          }

          if [protocol] == "6" {
               mutate { replace => {"protocol" => "tcp" }}
          }
          if [protocol] == "17" {
               mutate { replace => {"protocol" => "udp" }}
          }          

          if [protocol] == "58" {
               mutate { replace => {"protocol" => "ipv6-icmp" }}
          }

     }    
          if [fields][log_type] == "cisco_asa" {
          grok {
               match => { "data" => "%{CISCOTAG:cisco_tag}: %{GREEDYDATA:cisco_message}"}
          }
          grok {
               match => [
        "cisco_message", "",
        "cisco_message", "%{CISCOFW106006_106007_106010}",
        "cisco_message", "%{CISCOFW106014}",
        "cisco_message", "%{CISCOFW106015}",
        "cisco_message", "%{CISCOFW106021}",
        "cisco_message", "%{CISCOFW106023}",
        "cisco_message", "%{CISCOFW106100}",
        "cisco_message", "%{CISCOFW110002}",
        "cisco_message", "%{CISCOFW302010}",
        "cisco_message", "%{CISCOFW302013_302014_302015_302016}",
        "cisco_message", "%{CISCOFW302020_302021}",
        "cisco_message", "%{CISCOFW305011}",
        "cisco_message", "%{CISCOFW313001_313004_313008}",
        "cisco_message", "%{CISCOFW313005}",
        "cisco_message", "%{CISCOFW402117}",
        "cisco_message", "%{CISCOFW402119}",
        "cisco_message", "%{CISCOFW419001}",
        "cisco_message", "%{CISCOFW419002}",
        "cisco_message", "%{CISCOFW500004}",
        "cisco_message", "%{CISCOFW602303_602304}",
        "cisco_message", "%{CISCOFW710001_710002_710003_710005_710006}",
        "cisco_message", "%{CISCOFW713172}",
        "cisco_message", "%{CISCOFW733100}",
        "cisco_message", "%{WORD:action} %{WORD:protocol} %{CISCO_REASON:reason} from %{DATA:src_interface}:%{IP:src_ip}/%{INT:src_port} to %{DATA:dst_interface}:%{IP:dst_ip}/%{INT:dst_port}; %{GREEDYDATA:dnssec_validation}",
        "cisco_message", "%{CISCO_ACTION:action} %{WORD:protocol} %{CISCO_REASON:reason}.*(%{IP:src_ip}).*%{IP:dst_ip} on interface %{GREEDYDATA:interface}",
        "cisco_message", "Connection limit exceeded %{INT:inuse_connections}/%{INT:connection_limit} for input packet from %{IP:src_ip}/%{INT:src_port} to %{IP:dst_ip}/%{INT:dst_port} on interface %{GREEDYDATA:interface}",
        "cisco_message", "TCP Intercept %{DATA:threat_detection} to %{IP:ext_nat_ip}/%{INT:ext_nat_port}.*(%{IP:int_nat_ip}/%{INT:int_nat_port}).*Average rate of %{INT:syn_avg_rate} SYNs/sec exceeded the threshold of %{INT:syn_threshold}.#%{INT}",
        "cisco_message", "Embryonic connection limit exceeded %{INT:econns}/%{INT:limit} for %{WORD:direction} packet from %{IP:src_ip}/%{INT:src_port} to %{IP:dst_ip}/%{INT:dst_port} on interface %{GREEDYDATA:interface}"
      ]
          }
          mutate {
               remove_field => [ "data" ]

               remove_field => [ "cisco_message" ]
               lowercase => [ "protocol" ]
          }

     }

          else if [fields][log_type] == "paloalto" {
               csv {
                    source => "data"
                   columns => ["FUTURE_USE", "Receive Time", "Serial Number", "Type", "Subtype", "FUTURE_USE", "Generated Time", "src_ip", "dst_ip", "NAT Source IP", "NAT Source IP",  "dst_mapped_ip", "Rule Name", "Source User", "Destination User", "Application", "Virtual System", "Source Zone", "Destination Zone", "src_interface", "dst_interface", "Log Forwarding Profile", "FUTURE_USE", "Session ID", "Repeat Count", "src_port", "dst_port", "src_mapped_ip", "NAT Destination Port", "Flags", "protocol", "action", "Bytes", "Bytes Sent", "Bytes Received", "Packets", "Start Time", "Elapsed Time", "Category", "FUTURE_USE", "Sequence Number", "Action Flags", "Source Location", "Destination Location", "FUTURE_USE", "Packets Sent", "Packets Received", "Session End Reason", "Device Group Hierarchy Level 1", "Device Group Hierarchy Level 2", "Device Group Hierarcherarchy Level 3", "Device Group Hierarchy Level 4", "Virtual System Name", "hostname", "Action Source"]
                }
                mutate {
                        remove_field => [ "data" ]
                }
          }

     # Eviter la duplication des logs

     fingerprint {
          method => "SHA1"
          key => "KEY"
     }
}

output {
  elasticsearch {
    hosts => [ "localhost:9200" ]

    document_id => "%{fingerprint}"
  }
}

What do I have to change please !!


(Christian Dahlqvist) #15

Try this:

fingerprint {
  method => "SHA1"
  key => "KEY"
  source => "message"
}

(Salma Ait Lhaj) #16

I still have the same problem :confused: !! I don't understand why !!

Or is there an other way to avoid duplicated logs ?


(Christian Dahlqvist) #17

That screen shot shows a single document, not duplicates. Not sure I understand.


(Salma Ait Lhaj) #18

Yes I know !!

When I use : fingerprint : I have one log

When I remove it : I receive all logs duplicated !!


(Christian Dahlqvist) #19

Can you show us the full event you have? Are you removing the message field somewhere in your config (did not see it in what you posted)?


(Salma Ait Lhaj) #20

I didn't get you, do you mean I have to remove this line ?

mutate {
                        remove_field => [ "data" ]
                }

I also have this WARN when testing the conf :

[root@frghcslnetv10 conf.d]# /usr/share/logstash/bin/logstash --config.test_and_exit --path.settings /etc/logstash -f /etc/logstash/conf.d/firewalls.conf
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
[2018-07-06T15:03:25,517][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2018-07-06T15:03:35,257][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash