Adding new fields from timestamp nvarchar(max) with grok filter in logstash

hey everyone,
i want to create 2 new fields (as month, year) from an existing timestamp field that comes from a json filter (payload) when the timestamp field type is nvarchar(max)
I tried going in several directions including filter grok or creating an index template for the specific index (I did not try through KQL) I would love to know which approach you think should be followed
here is an example of the conf:

input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://10.211.2.9:1433;databaseName=SNJ_CAP_CBM;integratedSecurity=false;encrypt=false;trustServerCertificate=true;"
jdbc_user => "SO"
jdbc_password => "1234"
jdbc_driver_library => "/etc/logstash/conf.d/logstash-core/lib/jars/mssql-jdbc-11.2.0.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
statement => "select payload from dbo.tbl_partly_log"
}
}
filter {

json {
source => "payload"
}
}
output {
elasticsearch {
hosts => ["http://10.211.3.164:9200"]
index => "cbm"
doc_as_upsert => true

}

}

It is important to note that the logs are taken from the SQL SERVER DB server from which I take JSON and filter its fields to independent
Thanks for the helpers :slight_smile:

Welcome to the community
Can you show how data looks like? I mean from where you need to extract, past a sample.

here is example of the log:

{"log": {"process_guid": "0042CF45-A073-4E19-9352-60A0609CB12E", "operation_type": "act", "timestamp": "2020-08-23 14:07:56.264521", "source_system": "crm", "target_system": "magam", "process_type": "addfile", "process_type_heb": "הוספת קובץ", "sub_process": "", "sub_process_heb": "", "app_version": "", "machine_name": "PARTLY", "status": "OK", "severity": "INFO", "payload": {"header": {"process_type": "addfile", "user_identifier": "guy"}, "payload": {"research_details": {"research_id": 10077}, "files": [{"zip_file_name": "be559412-95be-11ed-aafe-02ca.zip"}, {"zip_file_name": "be559413-95be-11ed-92d9-02ca47.zip"}, {"zip_file_name": "be559414-95be-11ed-8dd3-02ca47.zip"}]}}}}

so you can see that i am using the timestamp in log

this is example from the select statement (payload - json field) in the input

	grok {
		match => { "timestamp" =>  "%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY}%{GREEDYDATA}" }
	}

Result:

{
          "month" => "08",
           "year" => "2020"
}

You should also convert to date field:

     date {
     match => ["timestamp", "YYYY-MM-dd HH:mm:ss.SSSSSS"]
     target => "timestamp"
    }

PS Don't forget to recreate index pattern.

i added your change's:
as you can see below

input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://10.211.2.9:1433;databaseName=SNJ_CAP_CBM;integratedSecurity=false;encrypt=false;trustServerCertificate=true;"
jdbc_user => "SO"
jdbc_password => "1234"
jdbc_driver_library => "/etc/logstash/conf.d/logstash-core/lib/jars/mssql-jdbc-11.2.0.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
statement => "select payload from dbo.tbl_partly_log"
}
}
filter {

json {
source => "payload"
}
grok {
match => { "timestamp" => "%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY}%{GREEDYDATA}" }
}
date {
match => ["timestamp", "YYYY-MM-dd HH:mm:ss.SSSSSS"]
target => "timestamp"
}
}
output {
elasticsearch {
hosts => ["http://10.211.3.164:9200"]
index => "cbm"
doc_as_upsert => true

}

}

can you tell me if the syntax is ok?

i will update soon if it works :slight_smile:
thanks

Yes this is it.

Optionally: You can add in output to see your data how will look like before insert.

output {
    stdout {
        codec => rubydebug{}
     }
    elasticsearch {...
    }
}

appreciate the help

1 Like

when i run the conf file with the changes, its creating an index, but there is no year, month fields

as you can see the mapping of the index (not include those fields)

{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"log": {
"properties": {
"app_version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"machine_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"operation_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"payload": {
"properties": {
"header": {
"properties": {
"process_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"user_identifier": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"payload": {
"properties": {
"files": {
"properties": {
"zip_file_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"research_details": {
"properties": {
"research_id": {
"type": "long"
},
"research_name_heb": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"software": {
"properties": {
"software_component": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"software_version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"users": {
"properties": {
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"first_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"first_name_heb": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"id": {
"type": "long"
},
"last_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"last_name_heb": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"phone": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
}
},
"process_guid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"process_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"process_type_heb": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"severity": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"source_system": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"status": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"sub_process": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"sub_process_heb": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"target_system": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"timestamp": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"payload": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tags": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}

it seems log.timestamp not converted to date type

Change index => "cbm" to index => "test" or whatever you want, reload data, create index pattern. Should work.

i found the solution, beacuse the timestamp is in json nested format, i had to get into the log.timestamp in that way

grok {
match => { "%{[log][timestamp]}" => "%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY}%{GREEDYDATA}" }
}
date {
match => ["%{[log][timestamp]}", "YYYY-MM-dd HH:mm:ss.SSSSSS"]
target => %{"[log][timestamp]}"
}
}

thank for the help
its working:)

1 Like

Good catch, well done. :+1: I have focused only on the time field. :upside_down_face:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.