Logstash configuration

I need to parse mongodb logs in elasticsearch using logstash. I need to index only command that fired in mongodb logs. I have following config file of logstash:-

input {
file {
path => "C:/Data/log/mongod.log"
start_position => "beginning"
}
}

filter {
grok {
match => {"message" => "[%{DATA:timestamp} PDT]"}
}
date {
match => ["timestamp","dd/MM/yy HH:mm:ss:SSS"]
}
}

output {
elasticsearch{ hosts => ["localhost:9200"] index => "logstash2" }
stdout{codec => "json"}
}

From this i get all the logs indexed in elasticsearch.but i just want to see the logs of user and command fired by the user

what changes i need to do in this config file to read only the command fired by the user

Show us what the logs look like, both log entries you want to keep and those you want to skip.

i want to skip those log

2017-02-14T14:03:11.569+0530 I CONTROL [main] Hotfix KB2731284 or later update is not installed, will zero-out data files
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] MongoDB starting : pid=1584 port=27017 dbpath=C:\data\db\ 64-bit host=Admin-PC
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] targetMinOS: Windows Vista/Windows Server 2008
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] db version v3.2.1
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] git version: a14d55980c2cdc565d4704a7e3ad37e4e535c1b2
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] allocator: tcmalloc
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] modules: none
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] build environment:
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] distarch: x86_64
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] target_arch: x86_64
2017-02-14T14:03:11.569+0530 I CONTROL [initandlisten] options: { systemLog: { destination: "file", path: "C:\Data\log\mongo.log" } }
2017-02-14T14:03:11.569+0530 I - [initandlisten] Detected data files in C:\data\db\ created by the 'wiredTiger' storage engine, so setting the active storage engine to 'wiredTiger'.
2017-02-14T14:03:11.569+0530 W - [initandlisten] Detected unclean shutdown - C:\data\db\mongod.lock is not empty.
2017-02-14T14:03:11.569+0530 W STORAGE [initandlisten] Recovering data from the last clean checkpoint.
2017-02-14T14:03:11.569+0530 I STORAGE [initandlisten] wiredtiger_open config: create,cache_size=4G,session_max=20000,eviction=(threads_max=4),config_base=false,statistics=(fast),log=(enabled=true,archive=true,path=journal,compressor=snappy),file_manager=(close_idle_time=100000),checkpoint=(wait=60,log_size=2GB),statistics_log=(wait=0),
2017-02-14T14:03:12.696+0530 I NETWORK [HostnameCanonicalizationWorker] Starting hostname canonicalization worker
2017-02-14T14:03:12.696+0530 I FTDC [initandlisten] Initializing full-time diagnostic data capture with directory 'C:/data/db/diagnostic.data'
2017-02-14T14:03:12.697+0530 I NETWORK [initandlisten] waiting for connections on port 27017

2017-02-14T14:03:13.028+0530 I FTDC [ftdc] Unclean full-time diagnostic data capture shutdown detected, found interim file, some metrics may have been lost. OK
2017-02-14T14:04:31.782+0530 I NETWORK [initandlisten] connection accepted from 127.0.0.1:52304 #1 (1 connection now open)
2017-02-14T14:04:31.806+0530 I NETWORK [initandlisten] connection accepted from 127.0.0.1:52305 #2 (2 connections now open)
2017-02-14T14:04:35.377+0530 I NETWORK [initandlisten] connection accepted from 127.0.0.1:52306 #3 (3 connections now open)
2017-02-14T14:04:48.475+0530 I COMMAND [conn2] CMD: drop pooja.grades
2017-02-14T14:04:59.284+0530 I COMMAND [conn2] CMD: drop pooja.restaurants
2017-02-14T14:05:49.978+0530 I NETWORK [initandlisten] connection accepted from 127.0.0.1:52312 #4 (4 connections now open)
2017-02-14T14:05:50.025+0530 I NETWORK [initandlisten] connection accepted from 127.0.0.1:52313 #5 (5 connections now open)
2017-02-14T14:05:50.059+0530 I NETWORK [initandlisten] connection accepted from 127.0.0.1:52314 #6 (6 connections now open)
2017-02-14T14:05:50.175+0530 I COMMAND [conn2] command pooja.zips command: insert { insert: "zips", ordered: false, documents: 1000 } ninserted:1000 keyUpdates:0 writeConflicts:0 numYields:0 reslen:40 locks:{ Global: { acquireCount: { r: 17, w: 17 } }, Database: { acquireCount: { w: 16, W: 1 } }, Collection: { acquireCount: { w: 16, W: 1 } } } protocol:op_query 204ms
2017-02-14T14:05:50.210+0530 I COMMAND [conn4] command pooja.zips command: insert { insert: "zips", ordered: false, documents: 1000 } ninserted:1000 keyUpdates:0 writeConflicts:0 numYields:0 reslen:40 locks:{ Global: { acquireCount: { r: 16, w: 16 } }, Database: { acquireCount: { w: 16 }, acquireWaitCount: { w: 1 }, timeAcquiringMicros: { w: 132255 } }, Collection: { acquireCount: { w: 16 } } } protocol:op_query 188ms
2017-02-14T14:05:50.213+0530 I COMMAND [conn5] command pooja.zips command: insert { insert: "zips", ordered: false, documents: 1000 } ninserted:1000 keyUpdates:0 writeConflicts:0 numYields:0 reslen:40 locks:{ Global: { acquireCount: { r: 16, w: 16 } }, Database: { acquireCount: { w: 16 }, acquireWaitCount: { w: 1 }, timeAcquiringMicros: { w: 92240 } }, Collection: { acquireCount: { w: 16 } } } protocol:op_query 147ms
2017-02-14T14:05:51.104+0530 I COMMAND [conn2] command pooja.zips command: insert { insert: "zips", ordered: false, documents: 1000 } ninserted:1000 keyUpdates:0 writeConflicts:0 numYields:0 reslen:40 locks:{ Global: { acquireCount: { r: 16, w: 16 } }, Database: { acquireCount: { w: 16 } }, Collection: { acquireCount: { w: 16 } } } protocol:op_query 112ms
2017-02-14T14:05:51.148+0530 I COMMAND [conn6] command pooja.zips command: insert { insert: "zips", ordered: false, documents: 1000 } ninserted:1000 keyUpdates:0 writeConflicts:0 numYields:0 reslen:40 locks:{ Global: { acquireCount: { r: 16, w: 16 } }, Database: { acquireCount: { w: 16 } }, Collection: { acquireCount: { w: 16 } } } protocol:op_query 109ms
2017-02-14T14:05:51.350+0530 I COMMAND [conn4] command pooja.zips command: insert { insert: "zips", ordered: false, documents: 1000 } ninserted:1000 keyUpdates:0 writeConflicts:0 numYields:0 reslen:40 locks:{ Global: { acquireCount: { r: 16, w: 16 } }, Database: { acquireCount: { w: 16 } }, Collection: { acquireCount: { w: 16 } } } protocol:op_query 101ms

i want load those log.in this log i just want those commands to be indexed

Okay, then you can use a grok expression that only matches the COMMAND and NETWORK messages. The grok filter will fail for all other messages, and those events will get a _grokparsefailure tag. Those messages can then be dropped. This should be a reasonable starting point for a grok expression:

%{TIMESTAMP_ISO8601:timestamp} \| (COMMAND|NETWORK) %{GREEDYDATA:message}

thanks

if i want starting_position from 10th log then what i should insert in config file???
above solution is running but in kibana it shows all logs.
pls just tell me the startin position if i want logs from 10th line

input {
file {
path => "C:/Data/log/mongo.log"
start_position => "beginning"
}
}

filter {
grok {
match => {"message" => "[%{TIMESTAMP_ISO8601:timestamp} | (COMMAND|NETWORK) %{GREEDYDATA:message} PDT]"}
}
date {
match => ["insert","dd/MM/yy HH:mm:ss:SSS"]
}
}

output {
elasticsearch{ hosts => ["localhost:9200"] index => "logstash-log" }
stdout{codec => "json"}
}

in start_position => "beginning" i wnt it start from 10th line

in start_position => "beginning" i wnt it start from 10th line

That's not supported. If you tell us why you want to start from the 10th line we might be able to suggest something.

If i want to ignore those logs becoze it not need to index.
if it is not possible then can u pls tell me reason?

actually i just want to see the commands fire by the authenticated user in perticular days.so I skip above logs and start from the authentication logs

if it is not possible then can u pls tell me reason?

Perhaps because nobody has needed to do this, or because the feature hasn't been prioritized.

ok. thanks for help.
And as u told me the grok expression, i tried it but it fails for all logs

input {
file {
path => "C:/Data/log/mongo.log"
start_position => "beginning"
}
}

filter {
grok {
match => {"message" => "[%{TIMESTAMP_ISO8601:timestamp} | (COMMAND|NETWORK) %{GREEDYDATA:message} PDT]"}
}
date {
match => ["insert","dd/MM/yy HH:mm:ss:SSS"]
}
}

output {
elasticsearch{ hosts => ["localhost:9200"] index => "logstash-log" }
stdout{codec => "json"}
}

I tried this.Is this right??

  • Why do you have PDT] in your grok expression?
  • Your grok expression should begin with ^%{TIMESTAMP... or %{TIMESTAMP..., not [%{TIMESTAMP....

ok.I will try it again

can u pls tell me what _ISO8601 indicates in the grok expression???

That grok pattern is named TIMESTAMP_ISO8601 because it matches timestamps in ISO8601 format.

ok. I tried it but it gives same result.Can i use different filter for this?