Merge multiple input into one document

How can I run two sql queries and merge the info into one single document?

I'm trying to use the jdbc input plugin to make a data merge into one document, the data comes from two diferent databases.

In one database I have a table like this:

ID Field1 Field2
1 data1 data2
2 data6 data7

And on the other database i have

ID Field3 Field4
1 data3 data4
2 data8 data9

I want to create a documents like:
{
"id" : "1",
"Field1" : "data1",
"Field2" : "data2",
"Field3" : "data3",
"Field4" : "data4"
},
{
"id" : "2",
"Field1" : "data6",
"Field2" : "data7",
"Field3" : "data8",
"Field4" : "data9"
}

using a conf file like this:

input {
    jdbc {
        ...
        statement => "SELECT ID, Field1, Field2 from tableDb1"
    }
    jdbc {
        ...
        statement => "SELECT ID, Field3, Field4 from tableDb2"
    }
}

output {
    elasticsearch {
        index => "myIndex"
        document_type => "myDocument"
        document_id => %{id}
        hosts => "localhost"
    }
}

the document are created like

{
"id" : "1",
"Field1" : "data1",
"Field2" : "data2"
},
{
"id" : "2",
"Field3" : "data8",
"Field4" : "data9"
}

or vice-versa.

But if I run 2 separate instances of logstash, the documents will be complemented.

is it possible to do this in one conf file only.

I'm very new to ELK stack.

Thanks for any help in advance.

you can use the aggregate filter in logstash to combine multiple events
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

or you could query the second database in a filter with the jdbc-streaming or -static filter (depends on your frequency of data changes)
https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_static.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_streaming.html

@Tiago_Pinto, I don't know about relationship between theses tables.

But It is possible you force a join between theses tables like this [statement => "SELECT DB1.ID, DB1.Field1, DB1.Field2, DB2.Field3, DB2.Field4 from tableDb1 DB1 JOIN tableDb2 DB2 ON DB1.ID = DB2.ID"]

Just I want to remeber that I don't know anything about your database model. If you want to discuss about it, share with us your database model to help you in the best way.

You can use aggregate too as @Shaoranlaos suggested.

Im trying to figure out the @Shaoranlaos answer, no success yet.

@camarar I can't do that, databases ar on different servers and are from different vendors.

I have not used it myself only which i have read in the doku and here on the forum so take all of it with a grain of salt. Also i have in the moment no test vehicle on hand so even the syntax could be off.
It should only bring you on the right track (hopefully).

You would have to setup your logstash with the input and output that you have in your question (two jdbc inputs and one es output) with a tagging of which input spawned wich event.

And than create a filter area with the aggregate filter that will use the fields of the events with the same id to fill an internal map and publish this map when a timeout expires.

input {
  jdbc {
     ...
     statement => "SELECT ID, Field1, Field2 from tableDb1"
     tags => ["in1"]
  }
  jdbc {
    ...
    statement => "SELECT ID, Field3, Field4 from tableDb2"
     tags => ["in2"]
  }
}

output {
  elasticsearch {
    index => "myIndex"
    document_type => "myDocument"
    document_id => %{id}
    hosts => "localhost"
  }
}

filter {
  aggregate {
    task_id => "%{id}"
    code => "
        if (event.get('tags').include('in1')) {
          map['field1'] = event.get('field1');
          map['field2'] = event.get('field2');
        } else if (event.get('tags').include('in2')) {
          map['field3'] = event.get('field3');
          map['field4'] = event.get('field4');
        } 
        event.cancel();
    "
    inactivity_timeout => 300  #seconds since last event
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "id"
  }
}
2 Likes

Hello @Shaoranlaos, I didn't manage to make it work with aggregate, on the other hand today, I managed to do it with jdbc_streaming like this:


input {
	jdbc {
                jdbc_connection_string => "myConnectionStringToDb1"
		jdbc_user => "myDb1User"
		jdbc_password => "myDb1UserPassword"
		statement_ => "SELECT id, field1, field2 from TableDb1"
	}
	
}

filter {
	jdbc_streaming {
		jdbc_connection_string => "myConnectionStringToDb2"
		jdbc_user => "myDb2User"
		jdbc_password => "myDb2UserPassword"
		statement => "select field3, field4 from TableDb2 where id = :idparam"
		parameters => { "idparam" => "id" }
                target => "extra"
		add_field => {
			"field3" => "%{[extra][0][field3]}"
			"field4" => "%{[extra][0][field4]}"
		}
		remove_field => ["extra"]
	}
	
}

output {
	elasticsearch{
		index => "myIndex"
		document_type => "myDocument"
		document_id => "%{id}"
		hosts => "localhost"
	}
}

Thank you for your help.

Does anybody know why jdbc_streaming do not support the properties jdbc_password_filepath and statement_filepath??

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.