How to Insert array of Json to Elasticsearch from Logstash


(Kartheek Gummaluri) #1
Hi all, 

I'm trying to import data from sql server to elasticsearch using logstash and please find attached conf file.I'm not able to add an array toPreformatted text the existing json.

This is the configuartion file:

input {
  jdbc {
    jdbc_driver_library => "c:\drivers\sqljdbc4" 
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
	 jdbc_connection_string => "jdbc:sqlserver://localhost\SAILS-DM29:1433;databasename=I9"

    jdbc_user => "sa"
    jdbc_password => "sails123"
    statement => "SELECT i9.Id AS i9FormId,
                  emp.AccountId, emp.FirstName, emp.LastName, emp.MiddleName, emp.MaidenName, emp.Alias,
                  emp.AddressId, emp.SSNEnc, emp.SSNHash, emp.SSNLast4, emp.Email, emp.Phone, emp.CreatedOn,
                  emp.ModifiedOn, emp.UserId, emp.EGuid, emp.LocationId, emp.OriginalHireDate, emp.MostRecentHireDate,
                  emp.TerminationDate, emp.DOB, emp.CitizenshipTypeId, emp.StoreId,  emp.WOTCLocationId, emp.PayrollLocationId,
                  emp.UHRR, emp.ClientEmployeeId, emp.IsInvalidEmail, sd.DocListId, sd.I9FormId, sd.Id As supoortId
                  FROM Employee emp
                  INNER JOIN I9Form i9 ON emp.Id = i9.EmployeeId
                  LEFT JOIN SupportDoc sd ON sd.I9FormId = i9.Id
                  WHERE emp.Id = 1 "
		use_column_value => false
    tracking_column => "ModifiedOn"
		tracking_column_type => "timestamp"
    clean_run => false
  }
}
# The filter part of this file is commented out to indicate that it is
# optional.
filter {
    json_encode {
    add_tag => [ "supportDoc" ]
  }

  aggregate {
    task_id => "%{i9FormId}"
    code => "
     map['employee'] = {
        'id' => event.get('Id'), 
        'accountId' => event.get('emp.AccountId'),
        'firstName' => event.get('emp.FirstName'),
        'lastName' => event.get('emp.LastName'),
        'middleName' => event.get('emp.MiddleName'),
        'maidenName' => event.get('emp.MaidenName'),
        'alias' => event.get('emp.Alias'),
        'addressId' => event.get('emp.AddressId'),
        'sSNEnc' => event.get('emp.SSNEnc'),
        'sSNHash' => event.get('emp.SSNHash'),
        'sSNLast4' => event.get('emp.SSNLast4'),
        'email' => event.get('emp.Email'),
        'phone' => event.get('emp.Phone'),
        'createdOn' => event.get('emp.CreatedOn'),
        'modifiedOn' => event.get('emp.ModifiedOn'),
        'userId' => event.get('emp.UserId'),
        'eGuid' => event.get('emp.EGuid'),
        'locationId' => event.get('emp.LocationId'),
        'originalHireDate' => event.get('emp.OriginalHireDate'),
        'mostRecentHireDate' => event.get('emp.MostRecentHireDate'),
        'terminationDate' => event.get('emp.TerminationDate'),
        'dob' => event.get('emp.DOB'),
        'citizenShipTypeId' => event.get('emp.CitizenshipTypeId'),
        'storeId' => event.get('emp.StoreId'),
        'wotcLocationId' => event.get('emp.WOTCLocationId'),
        'payrollLocationId' => event.get('emp.PayrollLocationId'),
        'uhrr' => event.get('emp.UHRR'),
        'clientEmployeeId' => event.get('emp.ClientEmployeeId'),
        'isInvaliedEmail' => event.get('emp.IsInvalidEmail')
      }
       map['supportdocs'] ||= []
       map['supportdocs'] << {'i9FormId' => event.get('sd.I9FormId'),
                              'doclistid' => event.get('sd.DocListId')}
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 10
  }
mutate {
#if needed remove/ delete fields 
#remove_field => ["ssnenc","ssnhash"] # if needed 
remove_field => ["support_docs_List"]

}
date {
     match => [ "sql_last_value", "YYYY-MM-dd HH:mm:ss.SSS" ] #2018-01-29 22:16:59.537
     timezone => "Etc/UTC"
    }
}
output {
  stdout { codec => rubydebug  }
  elasticsearch{
	index => "i91"
  codec => "json"
	#action =>"update"	#if want to update existing index data based on ID column
	#ssl=>true # if node is on SSL
	hosts => ["localhost:9200"]
  manage_template => false
  document_type => "i9details"
	document_id => "%{sd.docListId}"
  doc_as_upsert => true
  action => "update"
	}
}

#2

It is not clear from your posting what problem you want help with.


(Kartheek Gummaluri) #3

supportdocs is an array of jsons and I'm not able to add to existing json


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.