Elasticsearch index/result aggregation with multiple datasources

Hello,

I am currently researching the functionalities and features of the Elastic stack, more precisely Elasticsearch and Logstash and would need some advice regarding a scenario that I would need to implement. I have searched the forums here, Stackoverflow and other sites as well, but sadly could not find any relevant and consequential information.

So, here goes:
we have a microservice architecture that we migrated from a monolith but the database is still shared as the project is huge and we need time to migrate everything, however each microservice has its own schema, in order to make it easier to migrate to separate databases in the future.
We have a complex search mechanism in the app that is still not migrated and uses data from 3 different "databases" (currently schemas), making ugly joins between them.
Our idea is to setup an Elasticsearch microservice that will take over this search, however I am having a difficulty imagining or understanding how this "data aggregation" should work.

What I have so far is a one-way data sync from a schema to Elasticsearch with Logstash and the JDBC plugin, meaning I can sync the data from the "main table" to Elasticsearch.

How can I accomplish this? My thoughts so far:

  1. Setup 3 JDBC connections and sync the data to 3 indeces. -> How can I aggregate the results when searching? Also, how do I search 3 different indeces?
  2. Can Logstash somehow aggregate the data for me from 3 different sources so I would have just 1 index with all the data I need for the search then?
  3. I have seen something with 1 JDBC connection and then something with some JDBC streaming, however I could not figure out how exactly this works, not even from the documentation... Would this be an option? If so, how exactly?
  4. I am open to other suggestions, since I am quite new to this and don't know all the capabilities of the stack?
  5. Almost forgot, how can I "bulk-import" 15000+ entries into Elasticsearch for the "initial load"? I know Elasticsearch has a limit of 10000/transaction...

Are the 3 databases in the same server?

If so what we have done to accomplish this is the SQL query in JDBC connection does the joining and aggregation using CTE's. Then when the data gets to Logstash it's already aggregated and can go into 1 index.

Another way is just making your 3 indexes as is right now. Then have another logstash pipeline that retrieves those indexes and does the aggregation in the filter.

There are a bunch of different ways to do this but we chose doing it within the SQL query for our method.

Well, currently we only have one database, so yes, the schemas are all on the same database, but in the future when we separate them, they probably won't be on the same server anymore, so while I can understand why you went that way, it is not an option (at least not future-proof) for us...
I like the 2 pipeline idea, however won't the second one interfere with the first one that syncs the data from the db? I mean I would like to trigger this with a schedule, so new entries in those tables that are used for the search are transported automatically to Elasticsearch... If pipe1 syncs some data and in the meantime pipe2 is aggregating the indeces, won't that lea to corrupt data?

The pipeline way just seemed over complicated so we stopped going that route.

But yes in order for it to work it would need a trigger when to run. If your pipelines are constantly pulling data then it might not be a good choice. If you are only ingesting once a day then it can work.

Is the aggregated data from 3 schemas have the same meaning?
I mean, even the field name is different but the meaning is the same.
For ie:
Schema 1, have field orders
Schema 2 have field cust_buy
Schema 3 have filed sell
but the 3 of them the same meaning that you want to aggregate?

Yes, the data is connected through FKs, meaning in this case order_id for example. And the aggregated data should refer to the same thing, yes.

I think you can use the approach like UNION

  1. Build view of each schema with the same name of the field that you interested, don't forget the timestamp field !
  2. Test the view and make sure the result of the fields is the same of each view
  3. Build the logstash configuration of the input of each schema, ie: 1001-schema1-input.conf that contain only input bracket
input {
jdbc { #1
    jdbc_driver_library => "/opt/elasticsearch-jdbc-2.3.4.0/lib/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://database:3306/database_name"
    jdbc_user => "user"
    jdbc_password => "password"
    jdbc_paging_enabled => "true"
    use_column_value => "true"
    tracking_column => "transaction_date"
    tracking_column_type => "timestamp"
    schedule => "*/5 * * * *"
    last_run_metadata_path => "/data/metadata/schema1-view1.txt"
   statement => "SELECT * from schema1.view1 where transaction_date = :sql_last_value"   
  }
}
  1. Do the same thing for another 2 views with name 1002-schema2-view2.conf, and so...
  2. Build next conf file for filter, ie: 2001-filter.conf, put the filter if needed, such as mutate or something.
filter {
   mutate => ....
}
  1. Build last file for output, ie 3001-output.conf
output {
    elasticsearch {
      hosts => ["http://elastic01:9200"]
      index => "orders-%{+YYYY.MM.dd}"
    }
 }

  1. Test first in development env, if you satisfy, go to production.. :rofl:

This approach will give you flexibility if you want to separate the database connection when the second and third schema move to another database, also, you still have the detail information per timestamp and you can aggregate using elasticsearch capability.

In first run of logstash configuration will get all data until the time logstash starts, then will get the delta from the last run for next round, so it will make your query more efficient because just the delta fro previous time to now().

I hope this helps...

Regards,
Fadjar Tandabawana

1 Like

Thanks, that is definitely got me started in the (hopefully) right direction, however my biggest problem is the filter part... I have seen there is also the aggregate function in the filter, however what I am not getting is how can I access the 3 different types of data and how can I tell which is which? I don't have a column in the database that tells the type of the object (although if completely necessary, I would introduce one, now that I think of it...)...

My question:
Why you need aggregation in this stage?
The aggregation will be done in Elasticsearch, with the detail timeseries data, you got the 2 advantages:

  1. The detail data, I mean the original data
  2. Aggregation can do what ever you need in elasticsearch, sum, avg, max, min, derivative, etc.

That's my 5 cent...

Regards,
Fadjar T

True, I am still new to this so I am figuring things out gradually.
I guess the config is not predefined, so I could say for example:

input {
    jdbc {
        type => "orders"
        # Add rest of the config
    }
    jdbc {
        type => "sell"
        # Add rest of the config
    }
}

filter {
    if [type] == "orders" {
        # map the order fields to whatever I need them, remove fields, etc.
    }
}

and then in the filter section check for the type and map the attributes how I would need them in Elasticsearch right?
I mean the filter produces one output for these I guess?

Or did you mean something completely different with your post that details the configs?

If the meaning of "orders" and "sell" is the same and you want to aggregate this in the same field, such as "orders", create the view of all schema with the same field.

create view schema1_view1 as select orders from schema1.table;

for second view

create view schema2_view2 as select sell as orders from schema2.table;

This is make sure the views field same for every view with different tables, but the same meaning
It's simpler than you add filter in the logstash and it will consume resources, if just view in the origin database, it will use small footprint...

Regards,
Fadjar Tandabawana

No, they are not the same, my "final" data is/should be composed of 3 separate and different parts that come from 3 different inputs (3 different schemas -> 3 different databases in the future). They are however connected through foreign keys, meaning for example:

order:
    id
    ...

sell:
    id
    order_id
    ...

supplier:
    id
    sell_id
    ...

Just a very simple example, might not be 100% correct, but just to give an idea of what I have.

What is the field that you want to aggregate of that 3 tables?
If you have slack, we can discus it in detail....

I ended up doing the following:

  • created my own custom Docker image of Logstash with the DB2 driver included, pushed it to my registry and used it in the Logstash config
  • I have 2 separate Logstash config files (I know they get merged no matter what, it's just so I can keep things separate in the code)
  • the first one imports the masterdata I need later on when importing my main objects
  • the second one is my main pipeline actually, it searches for the "aggregate data" in Elastic instead of the DB with jdbc_streaming, that option was way too slow

Anyway, here's my pipeline:

    logstashPipeline:
      logstash.conf: |
        input {
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "30 22 * * * Etc/GMT"
            statement => "SELECT plant_name, plant_number from ELASTIC_POC_PLANT"
            tags => "plant"
          }
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "45 22 * * * Etc/GMT"
            statement => "SELECT id, index, number, name, city from ELASTIC_POC_SUPPLIER"
            tags => "supplier"
          }
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "15 23 * * * Etc/GMT"
            statement => "SELECT part_id, p_part_number_base, part_number, p_part_number_es1, part_number_es2, partname, color_designation from ELASTIC_POC_PART"
            tags => "part"
          }
        }
        output {
          if "supplier" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "supplier"
              document_id => "%{id}"
            }
          }
          if "plant" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "plant"
              document_id => "%{plant_number}"
            }
          }
          if "part" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "part"
              document_id => "%{part_id}"
            }
          }
        }
 
      complaint.conf: |
        input {
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/db2jcc4.jar"
            jdbc_driver_class => "Java::com.ibm.db2.jcc.DB2Driver"
            jdbc_connection_string => "<connection string>"
            jdbc_user => "${username}"
            jdbc_password => "${password}"
            sequel_opts => {
              jdbc_properties => {
                "encryptionAlgorithm" => "2"
                "securityMechanism" => "7"
              }
            }
            schedule => "15 1 * * * Etc/GMT"
            statement => "SELECT id, status, part_number, supplier_id, fault_description, creator_id, modifier_id, editor_supplier_id, editor_id, modified_at, created_at, closed_at, plant_number, part_number, p_part_number_base from complaint"
            tags => "complaint"
          }
        }
        filter {
          if "complaint" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "supplier"
              query => "id:%{[supplier_id]}"
              fields => {
                "name" => "[supplier][name]"
                "index" => "[supplier][supplierIndex]"
                "number" => "[supplier][supplierNumber]"
                "city" => "[supplier][city]"
              }
            }
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "plant"
              query => "plant_number:%{[plant_number]}"
              fields => {
                "plant_name" => "[comp][plant]"
              }
            }
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "part"
              query => "p_part_number_base:%{[p_part_number_base]} AND part_number:%{[part_number]}"
              fields => {
                "p_part_number_base" => "[part][partNumberBase]"
                "part_number" => "[part][partNumber]"
                "p_part_number_es1" => "[part][es1]"
                "p_part_number_es2" => "[part][es2]"
                "partname" => "[part][partName]"
                "color_designation" => "[part][colorDesignation]"
              }
            }
          }
          mutate {
            rename => {
              "id" => "[comp][complaintId]"
              "status" => "[comp][status]"
              "plant_number" => "[comp][plantNumber]"
              "supplier_id" => "[comp][supplierId]"
              "fault_description" => "[comp][faultDescription]"
              "creator_id" => "[comp][creatorId]"
              "modifier_id" => "[comp][modifierId]"
              "editor_supplier_id" => "[comp][editorSupplierId]"
              "editor_id" => "[comp][editorId]"
              "modified_at" => "[comp][modifiedAt]"
              "created_at" => "[comp][createdAt]"
              "closed_at" => "[comp][closedAt]"
            }
            remove_field => ["supplier_id", "supplier_data", "plant_data", "part_number", "p_part_number_base", "part_data", "@version", "@timestamp"]
          }
        }
        output {
          if "complaint" in [tags] {
            elasticsearch {
              hosts => ["http://complaint-elk-elasticsearch-master:9200"]
              index => "complaint"
              document_id => "%{[comp][complaintId]}"
            }
          }
        }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.