Merge information from separated databases into a single document

I would like to merge information from two different tables in different database.
In table t_a from database db_a I have information about the users, like username, Name, Phone Number, etc.
In table t_b from database db_b I have system-specific information about users. Eg, for that system the user X is of Type Y.
The table t_b has a field indicating the id of the user on table t_a, and this is how I can link the two tables.
I want to group users by Username in Elastic and inside each document there will be an array of users that have specific user information for each existing user with that username.
Eg.
{
"_index" : "users",
...
"source" : {
"user_id" : "sysadmin",
"username" : "sysadmin"
"users": {
{
"name" : null, \ From t_a
"email" : "fabio.lp@abcd.com", \ From t_a
"tenantid" : "e118af1f-6674-4348-bb12-xxxxxxxx27a6", \ From t_a
"user_type" : 1050 \ From t_b
},
{
"name" : "Administrator", \ From t_a
"email" : "joao.lapa@abcd.com", \ From t_a
"tenantid" : "e118af1f-6674-4348-bb12-9032bd2xxxxx", \ From t_a
"user_type" : 1050 \ From t_b
}
}
}
}

Here is what I am trying to do at the moment. Unfortunately is not working:

input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://db_a;databaseName=t_a;user=xx;password=xxx;"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_user => "xx"
	jdbc_password => "xxx"

    statement => "SELECT * FROM Users order by username"
  }
}

filter {
  jdbc_streaming {
		jdbc_connection_string => "jdbc:sqlserver://db_b;databaseName=t_b;user=bbb;password=aaa;"
		jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
		jdbc_user => "aaa"
		jdbc_password => "bbb"
		statement => "select ExternalID, UserTypeID from [User] where ExternalID = :userid"
		parameters => { "userid" => "id" }
		target => "extra"
		add_field => {
			"userType" => "%{[extra][0][UserTypeID]}"
		}
		remove_field => ["extra"]
	}

  aggregate {
    task_id => "%{username}"
	code => "
         map['username'] = event.get('username')
         map['users'] ||= [ ]
         map['users'] << {'name' => event.get('name'), 
						'email' => event.get('email'),
						'tenantid' => event.get('tenantid')}
         event.cancel()
       "
	push_previous_map_as_event => true
	timeout_task_id_field => "user_id"
	timeout => 3600
	inactivity_timeout => 300
	timeout_tags => ['_aggregatetimeout']
	}
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "users"
	}
}

I tried to explain the best I could =/
I want to know if this is possible and if it is, how can I achieve it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.