How to combine data from different index in one view while viewing in kibana?

I am trying to put traffic logs to Elasticsearch.
Suppose there are 100 traffic rows. Suppose there are around 25 columns for each row. Suppose 15 columns of all of the 100 traffic logs are same . rest of the columns differs. I have found that there are 3 different types of traffic logs . I have split the different types of traffic logs into 3 different files in a client system.
Using filebeat i have exported the 3 different files to 3 different indices in to elastic search server.

But while viewing in kibana I need to view all the 100 logs together since all are traffic related logs. Is it possible ? For this should we create another new index with the common 15 columns and export data from the 3 different indices? Is this possible or is there any other method?

A sample of the 3 different types of rows in traffic logs is shown below. After the column appcat="unscanned" , it is differnet columns for different rows
row1
2021-09-14T03:16:02.152834+05:30 10.10.10.1 date=2021-09-14 time=03:16:09 devname="PPFW02" devid="PPQQ303686" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="root" eventtime=1638569569 srcip=172.172.172.152 srcport=42754 srcintf="port11" srcintfrole="lan" dstip=142.250.185.74 dstport=443 dstintf="port10" dstintfrole="wan"
poluuid="-06d3-51ea-25d9-16b47fdcee45" sessionid=119262719 proto=6 action="server-rst" policyid=56 policytype="policy" service="HTTPS" dstcountry="Germany" srccountry="Reserved" trandisp="snat" transip=210.210.210.210 transport=42754 duration=6 sentbyte=1809 rcvdbyte=2645 sentpkt=9 rcvdpkt=10 appcat="unscanned"

wanin=2245 wanout=1445 lanin=1763 lanout=1763 utmaction="allow" countweb=1 devtype="Linux PC" mastersrcmac="00:e0:df:68:07:56" srcmac="00:e0:4c:68:07:91" srcserver=0

row2
2021-09-14T03:16:04.102605+05:30 10.10.10.1 date=2021-09-14 time=03:16:11 devname="PPFW02" devid="PPQQ3913686" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="root" eventtime=1631569571 srcip=17.111.110.20 srcport=27793 srcintf="port10" srcintfrole="wan" dstip=117.117.117.19 dstport=53 dstintf="port8" dstintfrole="dmz"
poluuid="-4c13-51e7-8c5c-b4ffe1eba994" sessionid=119261428 proto=17 action="accept" policyid=4 policytype="policy" service="PP_SERVC_DNS_UDP_53" dstcountry="India" srccountry="United States" trandisp="dnat" tranip=10.100.10.10 tranport=53 duration=180 sentbyte=68 rcvdbyte=205 sentpkt=1 rcvdpkt=1 appcat="unscanned"

row3
2021-09-14T03:16:04.852155+05:30 10.10.10.1 date=2021-09-14 time=03:16:12 devname="PPFW02" devid="PPQQ303686" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="root" eventtime=1631569572 srcip=172.172.172.133 srcport=59656 srcintf="port11" srcintfrole="lan" dstip=15.206.9.63 dstport=8080 dstintf="port10" dstintfrole="wan"
poluuid="-677a-51eb-8c18-58dc25683233" sessionid=119262794 proto=6 action="deny" policyid=81 policytype="policy" service="PP_SERVC_8080" dstcountry="India" srccountry="Reserved" trandisp="noop" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned"

crscore=30 craction=131072 crlevel="high" devtype="Router/NAT Device" mastersrcmac="e4:b9:7a:e9:14:d3" srcmac="e4:gh:7a:df:14:d3" srcserver=0

regards
shini

Also read that elastic search index is schemaless and we can put /post variable amount of data in each request /row . But how to make use of this feature while exporting rows from filebeat to elasticsearch?

What are your index names for each of the 3 sources?
If they start with something like indexname-source, then you can setup an index pattern in Kibana of indexname-* and it'll read any index that begins with that.

That's its default state, so you don't need to do anything for that.
Ideally though, you'd setup a mapping template.

Do you have any requirement to split your index this way?

You are working with logs from Fortigate devices that have the same format and are generated by the same device, there is no need to create different index just because some events have different fields, this behaviour is expected.

The events logged by your firewall will not always have the same number of fields, as the number of fields is related to what is being logged, there are some features like intrusion prevention or data leak prevention that could add extra fields.

I would say that is better to have all your traffic logs in the same index.

1 Like

Sir,
Thanks a lot for both the replies,
I can make all indexnames starting strings same, so I can address all the 3 indices together by like indexname-* and I hope It will combine and mix records from all the 3 indices together and it will even sort by timestamp?
. This is a new concept for me. hope my understanding is correct. I will try this method

I was confused because I was trying to create an ingest pipline and index using the "upload a sample file" feature provided by kibana. And we click "override settings " to correct the grok pattern generated. i found that it was not working when the sample log files was having records of variable length. After this step I was using the index name and pipeline name generated in kibana in to my filebeat configuration at the client for exporting logs to elastic search. This method had worked correctly for logs with fixed length records. Since actually these are traffic logs , as you have advised I need not separate them at all.
But the grok expression while doing index creation with the help of kibana was not working when sample log was having records with variable lengths . But can I create an ingest pipeline and index for the shortest length record in the log and if i use those index name and pipeline name in filebeat , will elastic search accept records of variable length also in the same index?

thanks and regards
shini

Sir,
I tried the method of creating an ingest pipeline and index for the shortest length record in the log using kibana upload file method and used those index name and pipeline name in filebeat ,.But elastic search accepted records of only the same length and pattern .

regards
shini

Fiebeat is generating this error message for the rows in logfile when the record/row length is not mnatching
(status=400): {"type":"illegal_argument_exception","reason":"Provided Grok expressions do not match field value:

============Full error message from filebeat for non matching length records=============

2021-09-22T16:32:44.970+0530 DEBUG [Elasticsearch] Elasticsearch/client.go:411 Bulk item insert failed (i=0, status=500): {type:runtime_exception,reason:grok pattern matching was interrupted after [1000] ms}
2021-09-22T16:32:44.970+0530 WARN [Elasticsearch] Elasticsearch/client.go:405 Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Time{wall:0xc04ae124f587bed5, ext:3215406854, loc:(*time.Location)(0x55b1a772e320)}, Meta:null, Fields:{agent:{ephemeral_id:0e3c6f0d-6ee3-440c-8f65-eff3377e0cb1,hostname:localhost.localdomain,id:a396dd7a-98a5-47e8-a16b-bcb0b57831cb,name:localhost.localdomain,type:filebeat,version:7.14.1},ecs:{version:1.10.0},fields:{type:PPPraf2},host:{architecture:x86_64,containerized:false,hostname:localhost.localdomain,id:8f072d32b6cd4548a888f2e1e7fe432a,ip:[10.10.10.223,fe80::8140:7927:370e:7221],mac:[e4:43:4b:c7:65:98,e4:43:4b:c7:65:99,e4:43:4b:c7:65:9a,e4:43:4b:c7:65:9b],name:localhost.localdomain,os:{codename:Ootpa,family:redhat,kernel:4.18.0-240.10.1.el8_3.x86_64,name:Red Hat Enterprise Linux,platform:rhel,type:linux,version:8.3 (Ootpa)}},input:{type:filestream},log:{offset:756,path:/var/log/PP_LOGS/DMZ_SERVERS/10.10.10.1/10.10.10.1/10.10.10.1--10.10.10.1--traf-forw.log},message:2021-09-22T03:25:09.461366+05:30 10.10.10.1 date=2021-09-22 time=03:26:32 devname=\QQQQFW02\ devid=\PPP5HD3916803686\ logid=\0000000013\ type=\traffic\ subtype=\forward\ level=\notice\ vd=\root\ eventtime=1632261392 srcip=17.17.17.106 srcport=65484 srcintf=\port11\ srcintfrole=\lan\ dstip=3.6.213.72 dstport=8080 dstintf=\port10\ dstintfrole=\wan\ poluuid=\20a0aee4-677a-51eb-8c18-58dc25683233\ sessionid=132218225 proto=6 action=\deny\ policyid=81 policytype=\policy\ service=\PP_SERVC_VVVV_8080\ dstcountry=\India\ srccountry=\Reserved\ trandisp=\noop\ duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat=\unscanned\ crscore=30 craction=131072 crlevel=\high\ devtype=\Windows PC\ mastersrcmac=\c4:6e:1f:00:96:ad\ srcmac=\c4:6e:1f:00:96:ad\ srcserver=0,tags:[PP_ship_fg3]}, Private:(*input_logfile.updateOp)(0xc0004f6dc0), TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:common.MapStr(nil)}} (status=400): {type:illegal_argument_exception,reason:Provided Grok expressions do not match field value: [2021-09-22T03:25:09.461366+05:30 10.10.10.1 date=2021-09-22 time=03:26:32 devname=\QQQQFW02\ devid=\PPP5HD3916803686\ logid=\0000000013\ type=\traffic\ subtype=\forward\ level=\notice\ vd=\root\ eventtime=1632261392 srcip=17.17.17.106 srcport=65484 srcintf=\port11\ srcintfrole=\lan\ dstip=3.6.213.72 dstport=8080 dstintf=\port10\ dstintfrole=\wan\ poluuid=\20a0aee4-677a-51eb-8c18-58dc25683233\ sessionid=132218225 proto=6 action=\deny\ policyid=81 policytype=\policy\ service=\PP_SERVC_VVVV_8080\ dstcountry=\India\ srccountry=\Reserved\ trandisp=\noop\ duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat=\unscanned\ crscore=30 craction=131072 crlevel=\high\ devtype=\Windows PC\ mastersrcmac=\c4:6e:1f:00:96:ad\ srcmac=\c4:6e:1f:00:96:ad\ srcserver=0]}
2021-09-22T16:32:44.971+0530 DEBUG [Elasticsearch] Elasticsearch/client.go:411 Bulk item insert failed (i=2, status=500): {type:runtime_exception,reason:grok pattern matching was interrupted after [1000] ms}
2021-09-22T16:32:44.971+0530 INFO [publisher] pipeline/retry.go:213 retryer: send wait signal to consumer

==========================================

Does any thing need to be modified at filebeat side also or in the grok pattern?
regards
shini

i modified the grok in Ingest/pipeline to include the 2 patterns the log file was containing, and it worked. it accepted all entries in the log file from filebeat. i am not using logtash,

. hope it is ok

thanks
shini