Problem with data merging

Hello All,

I am using jdbc input and jdbc_streaming section in my logstash config to bring data from two different table into the same index.

My logstash config is below.

input {
    jdbc {
        jdbc_connection_string => "abc"
        jdbc_user => "user"
        jdbc_password => "xyz"
        jdbc_driver_library => "/solr-index/Logstash/logstash-7.12.1/logstash-core/lib/jars/postgresql-jdbc.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *" # cronjob schedule format (see "Helpful Links")
        #jdbc_paging_enabled => "true"
        #jdbc_page_size => "300"
        lowercase_column_names => false
        statement => 'SELECT PYID AS "CaseID",3 AS "CaseTypeID" FROM U90CCMWT.Marketing limit 10'
    }
}
    filter {
        jdbc_streaming{
        jdbc_connection_string => "abc"
        jdbc_user => "user"
        jdbc_password => "xyz"
        jdbc_driver_library => "/solr-index/Logstash/logstash-7.12.1/logstash-core/lib/jars/postgresql-jdbc.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        statement => 'select case_type_de AS "CaseType_de",case_type_en AS "CaseType_en",case_type_es AS "CaseType_es",case_type_fi AS "CaseType_fi",case_type_fr AS "CaseType_fr",case_type_hi AS "CaseType_hi",case_type_it AS "CaseType_it",case_type_pl AS "CaseType_pl",case_type_pt AS "CaseType_pt",case_type_ru AS "CaseType_ru",case_type_sv AS "CaseType_sv",case_type_th AS "CaseType_th",case_type_tr AS "CaseType_tr",case_type_zh AS "CaseType_zh" from dbo.dtac_case_type_all_languages where case_type_id = "0"'
        parameters => {"case_type_id" => "CaseTypeID"}
        target => "table1"
    }
        #json { source => "table1" } # didn't work
        split { field => "table1" }
}

output {
    # used to output the values in the terminal (DEBUGGING)
    # once everything is working, comment out this line
    stdout { codec => "json" }
    # used to output the values into elasticsearch
    elasticsearch {
        hosts => ["http://ip_node:9200"]
        index => "vikings_es1"
        #document_id => "{101}"
        #doc_as_upsert => true # upserts documents (e.g. if the document does not exist, creates a new record)
    }
}

Actual indexing:

{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "vikings_es1",
        "_type" : "doc",
        "_id" : "3B8UvHkBNEGMCKzqWc8g",
        "_score" : 1.0,
        "_source" : {
          "CaseID" : "MKT-2062",
          "@timestamp" : "2021-05-30T07:02:00.408Z",
          "CaseTypeID" : 3,
          "table1" : {
            "CaseType_tr" : "Warranty",
            "CaseType_pl" : "Warranty",
            "CaseType_fr" : "Garantie",
            "CaseType_pt" : "Garantia",
            "CaseType_sv" : "Garanti",
            "CaseType_ru" : "Гарантия",
            "CaseType_th" : "Warranty",
            "CaseType_fi" : "Takuu",
            "CaseType_zh" : "三包",
            "CaseType_es" : "Garantía",
            "CaseType_it" : "Garanzia",
            "CaseType_en" : "Warranty",
            "CaseType_de" : "Garantie",
            "CaseType_hi" : "वारंटी"
          },
          "@version" : "1"
        }
      },

Desired indexing

{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "vikings_es1",
        "_type" : "doc",
        "_id" : "3B8UvHkBNEGMCKzqWc8g",
        "_score" : 1.0,
        "_source" : {
          "CaseID" : "MKT-2062",
          "@timestamp" : "2021-05-30T07:02:00.408Z",
          "CaseTypeID" : 3,
          "CaseType_tr" : "Warranty",
          "CaseType_pl" : "Warranty",
          "CaseType_fr" : "Garantie",
          "CaseType_pt" : "Garantia",
          "CaseType_sv" : "Garanti",
          "CaseType_ru" : "Гарантия"            
		  "CaseType_th" : "Warranty",
          "CaseType_fi" : "Takuu",
          "CaseType_zh" : "三包",
          "CaseType_es" : "Garantía",
          "CaseType_it" : "Garanzia",
          "CaseType_en" : "Warranty",
		  "CaseType_de" : "Garantie",
		  "CaseType_hi" : "वारंटी",
          "@version" : "1"
        }
      },

Please help in getting the desired indexing.

If you want to move values from [table] to the root then see this.

I pasted that code in the filter section of my logstash config but while executing that config file it is throwing error "Ruby exception occurred: no implicit conversion of Hash into String".

below indexing output is coming.

"hits" : {
"total" : 10,
"max_score" : 1.0,
"hits" : [
{
"_index" : "vikings_es1",
"_type" : "doc",
"_id" : "9R_ewHkBNEGMCKzqPc-A",
"_score" : 1.0,
"_source" : {
"CaseID" : "MKT-2062",
"CaseTypeID" : 3,
"@version" : "1",
"@timestamp" : "2021-05-31T05:21:00.483Z",
"table1" : [
{
"CaseType_it" : "Garanzia",
"CaseType_tr" : "Warranty",
"CaseType_de" : "Garantie",
"CaseType_th" : "Warranty",
"CaseType_hi" : "वारंटी",
"CaseType_fi" : "Takuu",
"CaseType_ru" : "Гарантия",
"CaseType_sv" : "Garanti",
"CaseType_pt" : "Garantia",
"CaseType_pl" : "Warranty",
"CaseType_es" : "Garantía",
"CaseType_en" : "Warranty",
"CaseType_fr" : "Garantie",
"CaseType_zh" : "三包"
}
],
"tags" : [
"_rubyexception"
]
}

any idea why this exception is coming ?

Executing the ruby code is printing the key,value as only one key can be seen in the below actual output and the value is null

Actual
key= {"CaseType_en"=>"Parts"}
value= ""

but the expected output is as below

key= "CaseType_en"
value= "Parts"`Preformatted text`

So table1 is now an array where it was a hash before?

we are using below query in jdbc_streaming and setting the value in target field (table1)

'select case_type_de AS "CaseType_de",case_type_en AS "CaseType_en",case_type_es AS "CaseType_es",case_type_fi AS "CaseType_fi",case_type_fr AS "CaseType_fr",case_type_hi AS "CaseType_hi",case_type_it AS "CaseType_it",case_type_pl AS "CaseType_pl",case_type_pt AS "CaseType_pt",case_type_ru AS "CaseType_ru",case_type_sv AS "CaseType_sv",case_type_th AS "CaseType_th",case_type_tr AS "CaseType_tr",case_type_zh AS "CaseType_zh" from dbo.dtac_case_type_all_languages where case_type_id = "0"'

The target of a jdbc_streaming filter is always an array of hashes (one hash per row that matches).

You could use a split filter to remove the array, or just

mutate { replace => { "table1" => "[table1][0]" } }

Then you should be able to use the code I linked to to move the contents of [table1] to the top level.

it worked through ruby code by iterating and using set method, without having to use mutate filter.

Thanks a lot for your valuable inputs!

Hello Badger,

While indexing some html page i tried removing html tags with the help of strip_tags in my ruby script as below 

ruby {
  code => '
        xyz = strip_tags("<div>Welcome to my website!</div>")
        puts xyz
        '
}


but while executing, its throwing error
Ruby exception occurred: undefined method `strip_tags' for #<LogStash::Filters::Ruby:0x1bd7v80>

Could you please help me why any workaround for this error.

Thanks!

strip_tags is part of ActionView, and that does not ship with logstash. You can use nokogiri

    ruby {
        init => '
            require "nokogiri"
        '
        code => '
            xyz = Nokogiri::HTML("<div>Welcome to my website!</div>")
            puts xyz.text
        '
    }
Thanks Badger!! It worked.

One more point i wanted to check.

I have data coming as RKH6756 and I want to parse it to 6756 before indexing. 

Do I need to write ruby script for this or does Logstash provide some built-in filter to take care of such kind of parsing ?

Are you saying that you have a field containg "RKH6756" and you just want to remove the letters?