I've a logstash integration with postgresql table.
the table has a self join from incident_parent_id to incident_number.
incident_number ( primary key)
incident_parent_id ( self join with incident number).
But the requirement is to load the incident number and it's parent incident details into the Elasticsearch.
the requirement can be achieved via python django but not via logstash.
Here is the sample data using python
the db data field for incident_parent_id
I created the index and mapping in Elasticsearch
PUT logstash_incidents/_mapping
{
"properties": {
"account_id": {
"type": "text"
},
"closed_at": {
"type": "date"
},
"description": {
"type": "text"
},
"incident_number": {
"type": "text",
"fields": {
"raw": {
"type": "text",
"analyzer": "keyword"
},
"suggest": {
"type": "completion",
"analyzer": "simple",
"preserve_separators": true,
"preserve_position_increments": true,
"max_input_length": 50
}
}
},
"incident_parent_id": {
"properties": {
"account_id": {
"type": "text"
},
"company": {
"type": "text"
},
"impact": {
"type": "text"
},
"incident_number": {
"type": "text"
},
"incident_state": {
"type": "text"
},
"issue_type": {
"type": "text"
},
"resolved_at": {
"type": "date"
},
"resolved_by": {
"type": "text"
},
"site_id": {
"type": "text"
},
"state": {
"type": "text"
},
"sub_account_id": {
"type": "text"
},
"sub_site_id": {
"type": "text"
},
"urgency": {
"type": "text"
}
}
},
"incident_state": {
"type": "text"
},
"short_description": {
"type": "text"
},
"site_id": {
"type": "text"
},
"state": {
"type": "text"
},
"sub_account_id": {
"type": "text"
},
"sub_site_id": {
"type": "text"
}
}
}
The config file for logstash pipeline.
input {
jdbc {
jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_stg"
jdbc_user => "postgres"
jdbc_password => "root"
jdbc_driver_class => "org.postgresql.Driver"
tracking_column => "id"
schedule => "0 * * * *" # cronjob schedule format (see "Helpful Links")
statement => "SELECT incident_number, site_id, sub_site_id, account_id, sub_account_id, closed_at, state, short_description, description, incident_parent_id from customerdata_incident"
jdbc_paging_enabled => "true"
jdbc_page_size => "300"
}
}
filter {
mutate {
copy => {"id" => "[@metadata][_id]"}
remove_field => ["@timestamp", "@version"]
}
}
output {
stdout { codec => "json" }
elasticsearch {
hosts => ["https://localhost:9200"]
ssl => true
ssl_certificate_verification => false
cacert => "/Users/ca_logstash.cer"
user => "elastic"
password => "+JY"
index => "logstash_incidents"
ilm_enabled => true
}
}
incident_parent_id shows only incident number.
the log stash throws error as below.
:response=>{"index"=>{"_index"=>"logstash_incidents", "_id"=>"bdMfB4oBKjuENOHQW44F", "status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:123] object mapping for [incident_parent_id] tried to parse field [incident_parent_id] as object, but found a concrete value"}}}}
how to resolve foreign keys and joins via logstash ?