Now i want to visualize in kibana e.g. how many historical data entries there is per ID with Status = open from Table A.
Both tables are updated regularly in the database and i want the visualization in kibana to be relatively up-to-date.
So far I'm loading the data into elasticsearch using the jdbc plugin in logstash.
What is the most efficient way to achieve this?
(What I'm doing right now, which might take some time though for very big tables) Joining both tables in the SQL Query in the logstash config file and outputting the result into an elasticsearch index
As @Hendrik_Muhs mentioned several times in this forum, e.g. here: Use transforms. What i didn't quite understand yet: How do i do this? How do i merge all data from table A and table B into one destination index without using any aggregation since i want the aggregations to happen in kibana and thus only keep the raw data in my index?
Use the enrich setup - as far as i understand this might be feasible but is not recommended since both tables are updated regularly
That's all the options i explored so far, is there any other option to solve my usecase?
What is the most (time-)efficient/recommended way to do this?
You understood well, if both of your indices are regularly getting new data, a transform is a better solution than an enrich setup (which is recommended when at least one the two indices is static).
To join documents using transforms, you can write a latest, cotinuous transform that takes table A and historical table B as source indices, so something like :
thanks for your reply!
The principle of your approach is pretty much exactly what I was looking for.
However, the result of the merged tables is not as expected:
The number of documents in the target index is that of table A, the available fields are those of table B.
I would like to achieve a full join so that the target index contains all fields from both source indices as well as all documents.
To stick with my example:
This is the result i get using your suggested Transform:
ID (mutual)
Data (from Table B)
1
Some old Data
2
Other Data
This is what i want:
ID (mutual)
Data (from Table A)
Status (from Table A)
Data (from Table B)
1
Some Data
open
Some old Data
1
Some Data
open
Some older Data
2
Other Data
open
Other Data
2
Other Data
open
Other older Data
2
Other Data
open
Other very old Data
Can you help me adjusting the transform job accordingly?
I think the main problem is that an ID can only appear once in the target index.
In my logstash config, I have so far solved this by creating a new composite key for each document, which contains the primary key of table A and the primary key of table:
Hi!
I was able to hack the solution below using painless script.
It produces the results you mentioned (5 rows).
You can treat it as a base for further experimentation.
PUT a
{
"mappings": {
"properties": {
"event.ingested": {
"type": "date"
},
"ID": {
"type": "long"
},
"Data": {
"type": "keyword"
},
"Status": {
"type": "keyword"
}
}
}
}
PUT b
{
"mappings": {
"properties": {
"event.ingested": {
"type": "date"
},
"ID": {
"type": "long"
},
"Data": {
"type": "keyword"
}
}
}
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Index the ingest timestamp as 'event.ingested'",
"field": "event.ingested",
"value": "{{{_ingest.timestamp}}}"
}
}
]
}
POST a/_doc?pipeline=my-pipeline
{
"ID": 1,
"Data": "Some Data",
"Status": "open"
}
POST a/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other Data",
"Status": "open"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 1,
"Data": "Some older Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 1,
"Data": "Some old Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other very old Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other older Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other Data"
}
GET _transform/_preview
{
"source": {
"index": [ "a", "b" ]
},
"dest": {
"index": "joined"
},
"pivot": {
"group_by": {
"ID": {
"terms": {
"field": "ID"
}
}
},
"aggs": {
"buckets": {
"scripted_metric": {
"init_script": """
state.docsA = [];
state.docsB = [];
""",
"map_script": """
if (doc.containsKey('Status')) {
state.docsA.add(new HashMap(params['_source']));
} else {
state.docsB.add(new HashMap(params['_source']));
}
""",
"combine_script": """
return state;
""",
"reduce_script": """
def docsA = [];
def docsB = [];
for (s in states) {
docsA.addAll(s.docsA);
docsB.addAll(s.docsB);
}
def ret = [];
for (a in docsA) {
for (b in docsB) {
def joined = new HashMap(b);
joined['StatusA'] = a.Status;
joined['DataA'] = a.Data;
ret.add(joined);
}
}
return ret;
"""
}
}
}
}
}
Thanks for the solution, that helped.
Is it also possible to find a denormalised solution, where all the data is stored in fields of the document instead of using buckets?
I realise that my example was not sufficient for this usecase, so I'll try to describe it in more detail:
Both tables, A and B come from an Oracle SQL database. The primary key of table 1 is ID.
In table B there is at least one entry for each ID from table A, but in most cases there are several entries. The primary key for table B is KEY_B.
I would now like to use the transform to create a destination index that basically contains all the documents with all their fields from index B and the corresponding fields from the document in index A that has the same ID as in table A for each entry.
I adapt my example tables accordingly a:
Table A
ID
Data
Status
1
Some Data
open
2
Other Data
open
Table B
KEY_B
ID
Data
a
1
Some old Data
b
1
Some older Data
c
2
Other Data
d
2
Other older Data
e
2
Other very old Data
Desired destination index:
Destination_key
ID (Primary Key from Table A)
Data (from Table A
Status (from Table A)
Data (from Table B)
a_1
1
Some Data
open
Some old Data
b_1
1
Some Data
open
Some older Data
c_2
2
Other Data
open
Other Data
d_2
2
Other Data
open
Other older Data
e_2
2
Other Data
open
Other very old Data
I have used a composite field from the matched primary keys of the respective entries from tables A and B as the destination_key.
Unfortunately, I have so far failed to transfer your solution to my desired result, can you help me again?
As i noticed, the bucket structure works well for my needs. I hadn't originally understood its functionality, so I thought the denormalised structure with one document per line, which would result in an SQL join, would be necessary.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.