Logstash configuration file for self join field with error object mapping found a concrete value

I've a logstash integration with postgresql table.
the table has a self join from incident_parent_id to incident_number.

incident_number ( primary key)
incident_parent_id ( self join with incident number).

But the requirement is to load the incident number and it's parent incident details into the Elasticsearch.

the requirement can be achieved via python django but not via logstash.

Here is the sample data using python

the db data field for incident_parent_id

I created the index and mapping in Elasticsearch

PUT logstash_incidents/_mapping
{
"properties": {
        "account_id": {
          "type": "text"
        },
        "closed_at": {
          "type": "date"
        },
        "description": {
          "type": "text"
        },
        "incident_number": {
          "type": "text",
          "fields": {
            "raw": {
              "type": "text",
              "analyzer": "keyword"
            },
            "suggest": {
              "type": "completion",
              "analyzer": "simple",
              "preserve_separators": true,
              "preserve_position_increments": true,
              "max_input_length": 50
            }
          }
        },
        "incident_parent_id": {
          "properties": {
            "account_id": {
              "type": "text"
            },
            "company": {
              "type": "text"
            },
            "impact": {
              "type": "text"
            },
            "incident_number": {
              "type": "text"
            },
            "incident_state": {
              "type": "text"
            },
            "issue_type": {
              "type": "text"
            },
            "resolved_at": {
              "type": "date"
            },
            "resolved_by": {
              "type": "text"
            },
            "site_id": {
              "type": "text"
            },
            "state": {
              "type": "text"
            },
            "sub_account_id": {
              "type": "text"
            },
            "sub_site_id": {
              "type": "text"
            },
            "urgency": {
              "type": "text"
            }
          }
        },
        "incident_state": {
          "type": "text"
        },
        "short_description": {
          "type": "text"
        },
        "site_id": {
          "type": "text"
        },
        "state": {
          "type": "text"
        },
        "sub_account_id": {
          "type": "text"
        },
        "sub_site_id": {
          "type": "text"
        }
      }
    }

The config file for logstash pipeline.

input {
    jdbc {
        jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_stg"
        jdbc_user => "postgres"
        jdbc_password => "root"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "id"
        schedule => "0 * * * *" # cronjob schedule format (see "Helpful Links")
        statement => "SELECT incident_number, site_id, sub_site_id, account_id, sub_account_id, closed_at, state, short_description, description, incident_parent_id from customerdata_incident"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "300"
    }
}

filter {
	mutate {
		copy => {"id" => "[@metadata][_id]"}
                remove_field => ["@timestamp", "@version"]
	}
}
output {
	stdout { codec => "json" }
	elasticsearch {
		hosts => ["https://localhost:9200"]
                ssl => true
                ssl_certificate_verification => false
                cacert => "/Users/ca_logstash.cer"
		user => "elastic"
		password => "+JY"
		index => "logstash_incidents"
                ilm_enabled => true
	}
}

incident_parent_id shows only incident number.

the log stash throws error as below.

:response=>{"index"=>{"_index"=>"logstash_incidents", "_id"=>"bdMfB4oBKjuENOHQW44F", "status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:123] object mapping for [incident_parent_id] tried to parse field [incident_parent_id] as object, but found a concrete value"}}}}

how to resolve foreign keys and joins via logstash ?

Hi,

You could add a jdbc filter for that:
If the incident_parent_id is set, call the jdbc filter to query the details for the parent id and store the results in the same field.

Best regards
Wolfram

1 Like

thanks . i used jdbc filter. but the incident_parent_id is empty.

fake username and password in the below conf.

input {
    jdbc {
        jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_itsm_stg"
        jdbc_user => "ss"
        jdbc_password => "ttt"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "incident_number"
        schedule => "0 * * * *" # cronjob schedule format (see "Helpful Links")
        statement => "SELECT incident_number, site_id, sub_site_id, account_id, sub_account_id, closed_at, state, short_description, description, incident_parent_id from customerdata_incident"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "300"
    }
}

filter {
	if [incident_parent_id] {
  		jdbc_streaming {
        		jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        		jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_itsm_stg"
        		jdbc_user => "ss"
        		jdbc_password => "ttt"
        		jdbc_driver_class => "org.postgresql.Driver"
    			statement => "select incident_number, company, sub_site_id, site_id, account_id, sub_account_id, issue_type, incident_state, resolved_at, resolved_by, impact, state, urgency from cusotmerdata_incident WHERE incident_number = :parent_id"
    			parameters => { "parent_id" => "incident_parent_id"}
    			target => "incident_parent_id"
  		}
	}
	mutate {
		copy => {"id" => "[@metadata][_id]"}
                remove_field => ["@timestamp", "@version"]
	}
}
output {
	stdout { codec => "json" }
	elasticsearch {
		hosts => ["https://localhost:9200"]
                ssl => true
                ssl_certificate_verification => false
                cacert => "/Users/cdev/a_logstash.cer"
		user => "elastic"
		password => "+K"
		index => "logstash_itsm_incidents_parent"
                ilm_enabled => true
	}
}

The incident_parent_id shows _source . How to expande the incident_parent_id without _source.
However the incident_parent_id value is dropped.

Can you please check the logs? I see that the document has a _jdbcstreamingfailure tag which shows that the jdbc_streaming filter ran into an error.

thanks again.

i found it in the log. the table name has a typo.
now i can get parent id data but it comes as a array.

i tried to get the first value of the array. using ruby filter and mutate, it throws exception. how to fix it .

input {
    jdbc {
        jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_itsm_stg"
        jdbc_user => "postgres"
        jdbc_password => "tt"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "incident_number"
        schedule => "0 * * * *" # cronjob schedule format (see "Helpful Links")
        statement => "SELECT incident_number, site_id, sub_site_id, account_id, sub_account_id, closed_at, state, short_description, description, incident_parent_id from customerdata_incident"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "300"
    }
}

filter {
	if [incident_parent_id] {
  		jdbc_streaming {
        		jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        		jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_itsm_stg"
        		jdbc_user => "postgres"
        		jdbc_password => "tt"
        		jdbc_driver_class => "org.postgresql.Driver"
    			statement => "select incident_number, company, sub_site_id, site_id, account_id, sub_account_id, issue_type, incident_state, resolved_at, resolved_by, impact, state, urgency from customerdata_incident WHERE incident_number = :parent_id"
    			parameters => { "parent_id" => "incident_parent_id"}
    			target => "incident_parent_id"
  		}
                ruby {
			code => '
    					incident_parent = event.get("incident_parent_id")
    					if incident_parent.is_a? Array
						event.set("parent_id", incident_parent_id[0])
                                        end
                                '

                }
	}
        if [parent_id] {
		mutate {
                        rename => {"parent_id" => "incident_parent_id"}
                }
        }
	mutate {
		copy => {"id" => "[@metadata][_id]"}
                
	}
}
output {
	stdout { codec => "json" }
	elasticsearch {
		hosts => ["https://localhost:9200"]
                ssl => true
                ssl_certificate_verification => false
                cacert => "/Users/dev/ca_logstash.cer"
		user => "elastic"
		password => "+K"
		index => "logstash_itsm_incidents_parent"
                ilm_enabled => true
	}
}

the exception from log

Executing JDBC query {:statement=>"select incident_number, company, sub_site_id, site_id, account_id, sub_account_id, issue_type, incident_state, resolved_at, resolved_by, impact, state, urgency from customerdata_incident WHERE incident_number = :parent_id", :parameters=>{:parent_id=>"INC0035280"}}
[2023-08-24T15:00:02,935][ERROR][logstash.filters.ruby    ][main][7fdb4a58b5147713b3c52681c4cdfadae8fdd7f628d57acac94a7c0169a54a9f] Ruby exception occurred: undefined local variable or method `incident_parent_id' for #<LogStash::Filters::Ruby:0x1d63abf9> {:class=>"NameError", :backtrace=>["(ruby filter code):5:in `block in filter_method'", "/usr/local/Cellar/logstash/8.9.0/libexec/vendor/bundle/jruby/2.6.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:96:in `inline_script'", "/usr/local/Cellar/logstash/8.9.0/libexec/vendor/bundle/jruby/2.6.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:89:in `filter'", "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/logstash/filters/base.rb:159:in `do_filter'", "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/logstash/filters/base.rb:178:in `block in multi_filter'", "org/jruby/RubyArray.java:1865:in `each'", "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/logstash/filters/base.rb:175:in `multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:133:in `multi_filter'", "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"]}

the expected output is (PS: i've removed senstive column values with "")

   "hits": [
      {
        "_index": "itsm_incidents",
        "_id": "INC0042109",
        "_score": 9.765795,
        "_source": {
          "incident_number": "INC0042109",
          "incident_parent_id": {
            "incident_number": "INC0042108",
            "company": "",
            "sub_site_id": "",
            "site_id": "",
            "account_id": "",
            "sub_account_id": "",
            "issue_type": "Device is Down",
            "incident_state": "In Progress",
            "resolved_at": null,
            "resolved_by": "",
            "impact": "3 - Low",
            "state": "In Progress",
            "urgency": "3 - Low"
          },
          "incident_state": "Cancelled",
          "site_id": "",
          "sub_site_id": "",
          "account_id": "",
          "sub_account_id": "",
          "closed_at": "2022-02-04T06:38:15+00:00",
          "state": "Cancelled",
          "short_description": "Device is down",
          "description": "Device is down"
        }
      }
    ]

but the output comes as follows

    "hits": [
      {
        "_index": "logstash_itsm_incidents_parent",
        "_id": "7gbhJooBMK2vIdtfIlbD",
        "_score": 9.765795,
        "_source": {
          "sub_account_id": "",
          "sub_site_id": "",
          "tags": [
            "_rubyexception"
          ],
          "@timestamp": "2023-08-24T09:30:02.272630Z",
          "site_id": "",
          "description": "Device is down",
          "@version": "1",
          "short_description": "Device is down",
          "account_id": "",
          "closed_at": "2022-02-04T06:38:15.000Z",
          "state": "Cancelled",
          "incident_number": "INC0042109",
          "incident_parent_id": [
            {
              "sub_account_id": "",
              "issue_type": "Device is Down",
              "sub_site_id": "",
              "urgency": "3 - Low",
              "site_id": "",
              "resolved_by": "",
              "incident_state": "In Progress",
              "account_id": "",
              "state": "In Progress",
              "incident_number": "INC0042108",
              "impact": "3 - Low",
              "company": "",
              "resolved_at": null
            }
          ]
        }
      }
    ]

I am sorry, I cannot help you here as I am no expert in Ruby.

In the second line, remove the trailing _id from incideent_parent_id.

1 Like

thanks . it works :slightly_smiling_face:

the working conf file for reference

input {
    jdbc {
        jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_itsm_stg"
        jdbc_user => "postgres"
        jdbc_password => "ttt"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "incident_number"
        schedule => "0 * * * *" # cronjob schedule format (see "Helpful Links")
        statement => "SELECT incident_number, site_id, sub_site_id, account_id, sub_account_id, closed_at, state, short_description, description, incident_parent_id from customerdata_incident"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "300"
    }
}

filter {
	if [incident_parent_id] {
  		jdbc_streaming {
        		jdbc_driver_library => "/usr/local/Cellar/logstash/8.9.0/libexec/logstash-core/lib/jars/postgresql-jdbc.jar"
        		jdbc_connection_string => "jdbc:postgresql://localhost:5432/pl_itsm_stg"
        		jdbc_user => "postgres"
        		jdbc_password => "ttt"
        		jdbc_driver_class => "org.postgresql.Driver"
    			statement => "select incident_number, company, sub_site_id, site_id, account_id, sub_account_id, issue_type, incident_state, resolved_at, resolved_by, impact, state, urgency from customerdata_incident WHERE incident_number = :parent_id"
    			parameters => { "parent_id" => "incident_parent_id"}
    			target => "incident_parent_id"
  		}
 
        }
        ruby {
			code => '
    					incident_parent = event.get("incident_parent_id")
    					if incident_parent.is_a? Array
						event.set("parent_id", incident_parent[0])
                                        end
                                '

        }	
        if [parent_id] {
		mutate {
                        rename => {"parent_id" => "incident_parent_id"}
                }
        }
	mutate {
		copy => {"id" => "[@metadata][_id]"}
                
	}
}
output {
	stdout { codec => "json" }
	elasticsearch {
		hosts => ["https://localhost:9200"]
                ssl => true
                ssl_certificate_verification => false
                cacert => "/Users/dev/ca_logstash.cer"
		user => "elastic"
		password => "+K"
		index => "logstash_itsm_incidents_parent"
                ilm_enabled => true
	}
}

i would like to know , once the data is imported into Elastic search via logstash. do i have to reindex to expected mapping properties.

I created one index via django-dsl-elasticsearch and loaded data. the below index is as below.

{
  "itsm_incidents": {
    "mappings": {
      "properties": {
        "account_id": {
          "type": "text"
        },
        "closed_at": {
          "type": "date"
        },
        "description": {
          "type": "text"
        },
        "incident_number": {
          "type": "text",
          "fields": {
            "raw": {
              "type": "text",
              "analyzer": "keyword"
            },
            "suggest": {
              "type": "completion",
              "analyzer": "simple",
              "preserve_separators": true,
              "preserve_position_increments": true,
              "max_input_length": 50
            }
          }
        },
        "incident_parent_id": {
          "properties": {
            "account_id": {
              "type": "text"
            },
            "company": {
              "type": "text"
            },
            "impact": {
              "type": "text"
            },
            "incident_number": {
              "type": "text"
            },
            "incident_state": {
              "type": "text"
            },
            "issue_type": {
              "type": "text"
            },
            "resolved_at": {
              "type": "date"
            },
            "resolved_by": {
              "type": "text"
            },
            "site_id": {
              "type": "text"
            },
            "state": {
              "type": "text"
            },
            "sub_account_id": {
              "type": "text"
            },
            "sub_site_id": {
              "type": "text"
            },
            "urgency": {
              "type": "text"
            }
          }
        },
        "incident_state": {
          "type": "text"
        },
        "short_description": {
          "type": "text"
        },
        "site_id": {
          "type": "text"
        },
        "state": {
          "type": "text"
        },
        "sub_account_id": {
          "type": "text"
        },
        "sub_site_id": {
          "type": "text"
        }
      }
    }
  }
}

the logstash one created has different mapping as below. how shall i make it to have above mapping properties.

{
  "logstash_itsm_incidents_parent": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "account_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "closed_at": {
          "type": "date"
        },
        "description": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "incident_number": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "incident_parent_id": {
          "properties": {
            "account_id": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "company": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "impact": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "incident_number": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "incident_state": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "issue_type": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "resolved_at": {
              "type": "date"
            },
            "resolved_by": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "site_id": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "state": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "sub_account_id": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "sub_site_id": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "urgency": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        },
        "short_description": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "site_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "state": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "sub_account_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "sub_site_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}

do i have to create index first in Elasticsearch and mention the same index in logstash conf file?

i have Elasticsearch 7.17 in prod environment and my PC has 8.x version. i see document_type is deprecated. My mapping should have fuzzy logic for this index. how shall i achieve it?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.