Difference between Doc count and Number of events sent by a collector

For sizing purposes, I am studying how much data size would be taken by different logs. I have ingested more than 10000 events (PANOS events automatically generated by a script) but i only see 4472 documents in my index. the reason I am asking this question is, with my understanding of this, while I was slowly increasing the number of ingested events I noticed that the number of events i generate is always equal to the number of documents :
yellow open traffic 7rMZiJ6BS5W4aW0zNEH6Jg 1 1 4472 0 2.7mb 2.7mb

But when i started ingesting more and more the number of documents stopped matching the number of events.

You should know that my script only changes the receiving date and destination IP randomly the rest is the same. I wanna know if there is something else going on under elasticsearch that limits the number of documents.

Thank you

Are you by any chance specifying a document ID when you index?

no, i am not specifying anything about adding a document id in my logstash config file

What does your Logstash config file look like?

input {
        file {
                path => ["/home/elastic/test/*.log"]
                start_position => "beginning"
                tags => ["paloalto"]
        }
}
filter {
if "paloalto" in [tags] {
        if ([message] =~ /TRAFFIC/) {
          grok {
                patterns_dir => ["/home/elastic/logstash/patterns"]
                match => { "message" => "ReceiveTime=%{PANDATE:ReceiveTime}\|SerialNumber=%{NUMBER:SerialNumber}\|Type=%{UPWORD:Type}\|Subtype=%{DATA:Subtype}\|devTime=%{DATA:devTime}\|src=%{IP:SoucreIP}\|dst=%{IP:DestinationIP}\|srcPostNAT=%{IP:NATSourceIP}\|dstPostNAT=%{IP:NATDestinationIP}\|RuleName=%{DATA:RuleName}\|usrName=%{DATA:userName}\|SourceUser=%{DATA:SourceUser}\|DestinationUser=%{DATA:DestinationUser}\|Application=%{DATA:Application}\|VirtualSystem=%{DATA:VirtualSystem}\|SourceZone=%{DATA:SourceZone}\|DestinationZone=%{DATA:DestinationZone}\|IngressInterface=%{DATA:InboundInterface}\|EgressInterface=%{DATA:OuboundInterface}\|LogForwardingProfile=%{DATA:LogForwardingProfile}\|SessionID=%{NUMBER:SessionID}\|RepeatCount=%{NUMBER:RepeatCount}\|srcPort=%{NUMBER:SourcePort}\|dstPort=%{NUMBER:DestinationPort}\|srcPostNATPort=%{NUMBER:NATSourcePort}\|dstPostNATPort=%{NUMBER:NATDestinationPort}\|Flags=%{DATA:Flags}\|proto=%{DATA:Protocol}\|action=%{DATA:Action}\|totalBytes=%{NUMBER:Bytes}\|dstBytes=%{NUMBER:BytesSent}\|srcBytes=%{NUMBER:BytesReceived}\|totalPackets=%{NUMBER:Packets}\|StartTime=%{PANDATE:StartTime}\|ElapsedTime=%{NUMBER:ElapsedTime}\|URLCategory=%{DATA:URLCategory}\|sequence=%{NUMBER:SequenceNumber}\|ActionFlags=%{DATA:ActionFlags}\|SourceLocation=%{DATA:SourceLocation}\|DestinationLocation=%{DATA:DestinationLocation}\|dstPackets=%{NUMBER:PacketsSent}\|srcPackets=%{NUMBER:PacketsReceived}\|SessionEndReason=%{DATA:SessionEndReason}\|DeviceGroupHierarchyL1=%{DATA:DeviceGroupHierarchyL1}\|DeviceGroupHierarchyL2=%{DATA:DeviceGroupHierarchyL2}\|DeviceGroupHierarchyL3=%{DATA:DeviceGroupHierarchyL3}\|DeviceGroupHierarchyL4=%{DATA:DeviceGroupHierarchyL4}\|vSrcName=%{DATA:VirtualSystemName}\|DeviceName=%{DATA:DeviceName}\|ActionSource=%{DATA:ActionSource}\|SrcUUID=%{DATA:SourceVMUUID}\|DstUUID=%{DATA:DestinationVMUUID}\|TunnelID=%{DATA:TunnelID_IMSI}\|MonitorTag=%{DATA:MonitorTag_IMEI}\|ParentSessionID=%{DATA:ParentSessionID}\|ParentStartTime=%{DATA:ParentSartTime}\|TunnelType=%{DATA:TunnelType}"
            }
        }
          mutate {
                convert => [ "Bytes", "integer" ]
                convert => [ "BytesReceived", "integer" ]
                convert => [ "BytesSent", "integer" ]
                convert => [ "ElapsedTime", "integer" ]
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "Packets", "integer" ]
                convert => [ "PacketsReceived", "integer" ]
                convert => [ "PacketsSent", "integer" ]
                convert => [ "SequenceNumber", "integer" ]

                add_tag => [ "PAN-OS_traffic"]
            }
        }
        mutate {
            # Original message has been fully parsed, so remove it.
            remove_field => [ "message" ]
        }
        if [SourceIP] and [SourceIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
               source => "SourceIP"
               target => "SourceIPGeo"
          }
            if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "SourceIPGeo.location", "" ]
                }
            }
        }
        if [DestinationIP] and [DestinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
                source => "DestinationIP"
                target => "DestinationIPGeo"
            }
            if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "DestinationIPGeo.location", "" ]
                }
            }
        }
        if [SourceIP] and [DestinationIP] {
            fingerprint {
                concatenate_sources => true
                method => "SHA1"
                key => "logstash"
                source => [ "SourceIP", "SourcePort", "DestinationIP", "DestinationPort", "Protocol" ]
            }
        }
}
}
output {
        if "PAN-OS_traffic" in [tags] {
        elasticsearch {
            index => "traffic"
            hosts => ["localhost:9200"]
        }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.