How to frame json object in Logstash?

Can anyone tell me how to frame json object in logstash configuration.

What do you mean? Perhaps an example would clarify the situation.

Hi Magnusbaeck I need like

employee:{
"FirstName":"sai",
"LastName":"Kartheek"
}
So input is sql select statement and I need to insert to elasticsearch

"_source": {
"employee": {
"id": 119,
"accountId": 3,
"firstName": "himaaa",
"lastName": "jk",
"sSNEnc": "iwbYgrIeuOuXh31JzIENXYKZSJnhWt3OT5jmp5sC2lYVxST95mowbXKNTd/PwUmoVXVTgkp2ubl2ohQiPKPpyPeRxdU47JpfQ/5z76rWmClR9IFbuwQbGfC8iKQTCZ+zrgt4ZHMpDiL45sutCk1EZAetS/9JwCr2WO7PjmWTBy0lgDq9Hh3PgpyDxhnqBuh3UMmKODBWuKOtofG2BxJCLgxdYztWhWwpW1A8Q3CzoErzUmRtUPq+PXtV2DBKRZ5a5qmxkx55c8QZ+YP9AvpocfMH7Ei6DuV37c8K6ZefhWKY5rGq0eLTUQMSl6wtrTI8sucV39w5faJ1lViktCGXXw==",
"sSNHash": "sbI+JOA4SmGm9a7/nmE5dbIlZP+H3JDQL07wZywpzD8=",
"sSNLast4": "0000",
"createdOn": "2018-02-02T13:31:39.42",
"eGuid": "31d266f9-dea7-4a5d-8e15-f1a18bf3aad6",
"isInvalidEmail": false
},
"i9s": [
{
"id": 123,
"employeeId": 119,
"firstName": "himaaa",
"lastName": "jk",
"isSSNAppliedFor": true,
"sSNEnc": "dYUkGBhHeN/ux+d0LrV5eEWBC96L22zxPnPH/erh4Cu6MEbqQE3+J9YxRYUkBf4J4/tdHtn5s50HNYew1b4Ql+HNqavrofzDhcm3EE5gqGHBA+3KEqTayHa77sGXrajTtV7iNA1g3XFyRuiNeABN7evzv+0zJMtERsnmuEziR/939h+V0or66RFVfty9nmdHRxxRKaKqobjfzdTrArqOedAaw4mPw6j5oayAvlbzzh19TJT+LzhTTtdArUN3dfWNCKvsRxXWeowqnyT3y3rVDWmvOofiUfcj/T5HLM/N6Dpkvu+Lqax15r4UmtUU/Vornq9+4o67RRCNmNA9pbeFFQ==",
"sSNHash": "sbI+JOA4SmGm9a7/nmE5dbIlZP+H3JDQL07wZywpzD8=",
"sSNLast4": "0000",
"formTemplateId": 1,
"citizenshipTypeId": 4,
"locationId": 10,
"hireDate": "2019-01-09T00:00:00",
"employeeSignatureIP": "172.31.16.254",
"employerRepId": 0,
"createdOn": "2018-02-02T13:31:40.013",
"guid": "49c17d07-2373-4151-bc30-0254f3447afd"
}
],
"locations": [
{
"id": 10,
"locationCode": "CA01",
"addressId": 2,
"accountId": 3,
"name": "Casper",
"fEIN": "020464184",
"locationContactId": 1,
"locationStatusId": 402,
"createdOn": "2017-12-07T19:02:12.377",
"modifiedOn": "2018-05-02T15:18:26.243",
"isActive": true
}
],
"supportDoc": [
{
"id": 212,
"i9FormId": 14,
"docListId": 30,
"docTypeId": 1101,
"docTitle": "Document Under List A",
"docNum": "1234567898",
"expiryDate": "2018-05-05T00:00:00",
"createdOn": "2018-02-19T15:04:45.777",
"passportNumber": "ab1111111",
"hasReceiptReceived": false
}
]
}

In the above code block we have employee object right that we need to frame

Rest all I already framed if you need config file I will show you

Here i s the configuration file

input {
jdbc {
jdbc_driver_library => "c:\drivers\sqljdbc4"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I91"

jdbc_user => "sa"
jdbc_password => "sails123"
statement => " SELECT DISTINCT(emp.Id) AS empId, emp.AccountId AS accountId, i9.Id AS id, i9.LocationId As locations FROM Employee emp
              INNER JOIN I9Form i9 ON i9.EmployeeId = emp.Id
              INNER JOIN SupportDoc sd ON sd.I9FormId = i9.Id
              WHERE emp.ModifiedOn > :sql_last_value OR 
              i9.ModifiedOn > :sql_last_value OR sd.ModifiedOn > :sql_last_value"
  use_column_value => false
tracking_column => "ModifiedOn"
  tracking_column_type => "timestamp"
record_last_run => true
clean_run => false

}
}

The filter part of this file is commented out to indicate that it is

optional.

filter {
#********** Building array of I9s **************************#
jdbc_streaming {
jdbc_driver_library => "c:/drivers/sqljdbc4.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I91"
jdbc_user => "sa"
jdbc_password => "sails123"
statement => "SELECT i9.Id 'id' ,i9.EmployeeId, i9.FirstName , i9.LastName ,i9.MiddleName, i9.SSNLast4,
i9.Alias, i9.MaidenName, i9.AddressId, i9.IsSSNAppliedFor,i9.SSNEnc, i9.SSNHash, i9.DOB,
i9.FormTemplateId, i9.PreparerId, i9.Email, i9.Phone, i9.CitizenshipTypeId, i9.LocationId 'LocationId',
i9.AllienWorkAuthzId, i9.USCISNum, i9.HireDate, i9.EmployeeSignature, i9.EmployeeSignatureDate,
i9.EmployeeSignatureIP, i9.EmployerSignature, i9.EmployerSignatureDate, i9.EmployerTitle,
i9.EmployerRepId, i9.EmployerRepFirstName, i9.EmployerRepLastName, i9.EmployerRepBizName,
i9.EmployerRepAddressId, i9.I9Status, i9.CreatedOn, i9.ModifiedOn, i9.EmployerSignatureIP,
i9.TerminationDate, i9.DocumentID, i9.BatchId, i9.EVerifyStatus, i9.Guid,
i9.TokenExpiration,i9.Section1CompletionDate, i9.Section2CompletionDate,
i9.Section3CompletionDate, i9.EVerifyInitiationDate, i9.ReminderStatus,
i9.LateReasonCode, i9.LateReasonOtherDescription, i9.AdditionalInfo,
i9.StoredI9Hash, i9.StoredS3I9FormFile
FROM I9Form AS i9
WHERE i9.EmployeeId = :empId "
parameters => { "empId" => "empid"}
target => "i9s"
}
#
Building array of Support Documents **************************#
jdbc_streaming {
jdbc_driver_library => "c:/drivers/sqljdbc4.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I91"
jdbc_user => "sa"
jdbc_password => "sails123"
statement => "SELECT sd.Id AS Id, sd.I9FormId, sd.DocListId, sd.DocTypeId, sd.DocTitle, sd.IssuingAuth,sd.DocNum,
sd.ExpiryDate, sd.CreatedOn, sd.ModifiedOn, sd.DocNum2, sd.DocNum3, sd.VisaNumber, sd.CountryOfIssuanceCode,
sd.PassportNumber, sd.I94Number, sd.AlienNumber, sd.EVerifySupportDocTypeID, sd.HasNoExpirationDate,
sd.IsDocExpired, sd.HasReceiptReceived, sd.IssuanceDate
FROM SupportDoc sd
INNER JOIN I9Form i9 ON sd.I9FormId = i9.Id
INNER JOIN Employee emp ON emp.Id = i9.EmployeeId WHERE i9.EmployeeId = :empId "
parameters => { "empId" => "empid"}
target => "supportDoc"
}
#
Building array of Location Documents **************************#
jdbc_streaming {
jdbc_driver_library => "c:/drivers/sqljdbc4.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I91"
jdbc_user => "sa"
jdbc_password => "sails123"
statement => "SELECT loc.Id, loc.LocationCode, loc.AddressId, loc.AccountId, loc.Name, loc.FEIN, loc.LocationContactId,
loc.LocationStatusId, loc.CreatedOn, loc.ModifiedOn, loc.IsActive
FROM [Location] loc WHERE loc.Id = :locations "
parameters => { "locations" => "locations"}
target => "locations"
}
#
Building array of Section3 Documents ************************************#
jdbc_streaming {
jdbc_driver_library => "c:/drivers/sqljdbc4.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I91"
jdbc_user => "sa"
jdbc_password => "sails123"
statement => "SELECT s3.Id, s3.I9FormId, s3.SupportDocId, s3.RehireDate, s3.FirstName, s3.LastName, s3.MiddleInitial,
s3.EmployerSignature, s3.EmployerSignatureDate, s3.EmployerName, s3.CreatedOn, s3.ModifiedOn
FROM Section3 s3
INNER JOIN I9Form i9 On s3.I9FormId = i9.Id
INNER JOIN Employee emp ON emp.Id = i9.EmployeeId WHERE i9.EmployeeId = :empId "
parameters => { "empId" => "empid"}
target => "section3"
}
mutate {
#Removing Specific fields
remove_field => ["empid","locationId","@version", "@timestamp", "tags"] # if needed
}
date {
match => [ "sql_last_value", "YYYY-MM-dd HH:mm:ss.SSS" ] #2018-01-29 22:16:59.537
timezone => "Etc/UTC"
}
}
output {
elasticsearch{
index => "i9"
document_type => "i9details"
action =>"update" #if want to update existing index data based on ID column
#ssl=>true # if node is on SSL
hosts => ["localhost:9200"]
doc_as_upsert => true
action => "update"
document_id => "%{id}"
}
stdout { codec => json_lines }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.