Hi all,
I'm trying to import data from sql server to elasticsearch using logstash and please find attached conf file.I'm not able to add an array toPreformatted text the existing json.
This is the configuartion file:
input {
jdbc {
jdbc_driver_library => "c:\drivers\sqljdbc4"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I9"
jdbc_user => "sa"
jdbc_password => "sails123"
statement => "SELECT i9.Id AS i9FormId,
emp.AccountId, emp.FirstName, emp.LastName, emp.MiddleName, emp.MaidenName, emp.Alias,
emp.AddressId, emp.SSNEnc, emp.SSNHash, emp.SSNLast4, emp.Email, emp.Phone, emp.CreatedOn,
emp.ModifiedOn, emp.UserId, emp.EGuid, emp.LocationId, emp.OriginalHireDate, emp.MostRecentHireDate,
emp.TerminationDate, emp.DOB, emp.CitizenshipTypeId, emp.StoreId, emp.WOTCLocationId, emp.PayrollLocationId,
emp.UHRR, emp.ClientEmployeeId, emp.IsInvalidEmail, sd.DocListId, sd.I9FormId, sd.Id As supoortId
FROM Employee emp
INNER JOIN I9Form i9 ON emp.Id = i9.EmployeeId
LEFT JOIN SupportDoc sd ON sd.I9FormId = i9.Id
WHERE emp.Id = 1 "
use_column_value => false
tracking_column => "ModifiedOn"
tracking_column_type => "timestamp"
clean_run => false
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
filter {
json_encode {
add_tag => [ "supportDoc" ]
}
aggregate {
task_id => "%{i9FormId}"
code => "
map['employee'] = {
'id' => event.get('Id'),
'accountId' => event.get('emp.AccountId'),
'firstName' => event.get('emp.FirstName'),
'lastName' => event.get('emp.LastName'),
'middleName' => event.get('emp.MiddleName'),
'maidenName' => event.get('emp.MaidenName'),
'alias' => event.get('emp.Alias'),
'addressId' => event.get('emp.AddressId'),
'sSNEnc' => event.get('emp.SSNEnc'),
'sSNHash' => event.get('emp.SSNHash'),
'sSNLast4' => event.get('emp.SSNLast4'),
'email' => event.get('emp.Email'),
'phone' => event.get('emp.Phone'),
'createdOn' => event.get('emp.CreatedOn'),
'modifiedOn' => event.get('emp.ModifiedOn'),
'userId' => event.get('emp.UserId'),
'eGuid' => event.get('emp.EGuid'),
'locationId' => event.get('emp.LocationId'),
'originalHireDate' => event.get('emp.OriginalHireDate'),
'mostRecentHireDate' => event.get('emp.MostRecentHireDate'),
'terminationDate' => event.get('emp.TerminationDate'),
'dob' => event.get('emp.DOB'),
'citizenShipTypeId' => event.get('emp.CitizenshipTypeId'),
'storeId' => event.get('emp.StoreId'),
'wotcLocationId' => event.get('emp.WOTCLocationId'),
'payrollLocationId' => event.get('emp.PayrollLocationId'),
'uhrr' => event.get('emp.UHRR'),
'clientEmployeeId' => event.get('emp.ClientEmployeeId'),
'isInvaliedEmail' => event.get('emp.IsInvalidEmail')
}
map['supportdocs'] ||= []
map['supportdocs'] << {'i9FormId' => event.get('sd.I9FormId'),
'doclistid' => event.get('sd.DocListId')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 10
}
mutate {
#if needed remove/ delete fields
#remove_field => ["ssnenc","ssnhash"] # if needed
remove_field => ["support_docs_List"]
}
date {
match => [ "sql_last_value", "YYYY-MM-dd HH:mm:ss.SSS" ] #2018-01-29 22:16:59.537
timezone => "Etc/UTC"
}
}
output {
stdout { codec => rubydebug }
elasticsearch{
index => "i91"
codec => "json"
#action =>"update" #if want to update existing index data based on ID column
#ssl=>true # if node is on SSL
hosts => ["localhost:9200"]
manage_template => false
document_type => "i9details"
document_id => "%{sd.docListId}"
doc_as_upsert => true
action => "update"
}
}