[Ann] JDBC River Plugin for ElasticSearch


(Jörg Prante) #1

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON objects
for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database manually
into
the plugins directory where the jar file of the JDBC river plugin resides.

By default, the JDBC river re-executes the SQL statement on a regular basis
(60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name", orders.customer
as "product.customer.name", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as "contact.employee"
from orders left join employees on employees.department =
orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The repeated
occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name", orders.customer
as "product.customer.name", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name", orders.customer
as "product.customer.name", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests to
cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(David Pilato) #2

Hi Jörg !

That's really a cool new feature !
Thanks for sharing.

I think that you will get many users (and many questions) for it. :wink:

David :wink:
Twitter : @dadoonet / @elasticsearchfr

Le 16 juin 2012 à 22:59, Jörg Prante joergprante@gmail.com a écrit :

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON objects for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database manually into
the plugins directory where the jar file of the JDBC river plugin resides.

By default, the JDBC river re-executes the SQL statement on a regular basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly] [_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly] [jdbc][my_jdbc_river] starting JDBC connector: URL [jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select * from orders], indexing to [jdbc]/[jdbc], poll [1h]
[2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly] [jdbc] creating index, cause [api], shards [5]/[1], mappings []
[2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly] [_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly] [jdbc][my_jdbc_river] got 5 rows
[2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly] [jdbc][my_jdbc_river] next run, waiting 1h, URL [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name", orders.customer as "product.customer.name", orders.quantity * products.price as "product.customer.bill" from products, orders where products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as "product.customer", orders.quantity * products.price as "product.customer.bill" from products, orders where products.name = orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as "_id", orders.customer as "contact.customer", employees.name as "contact.employee" from orders left join employees on employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id", orders.customer as "contact.customer", employees.name as "contact.employee" from orders left join employees on employees.department = orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The repeated occurence of the _id column
controls how values are folded into arrays for making use of the ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name", orders.customer as "product.customer.name", orders.quantity * products.price as "product.customer.bill" from products, orders where products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name", orders.customer as "product.customer.name", orders.quantity * products.price as "product.customer.bill" from products, orders where products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing process.

Each SQL result set will be indexed by a single bulk if the bulk size is not specified.

A bulk size can be defined, also a maximum size of active bulk requests to cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(Otis Gospodnetić) #3

Like David said - I hope you are ready for the avalanche of questions....
at least I think that's what will happen based on what happened when Solr
got DataImportHandler a few years ago.

And speaking of DIH, how does one handle deletion of DB rows and how does
one select+index incrementally (only rows that changed since last river
run)?

Thanks,
Otis

Search Analytics - http://sematext.com/search-analytics/index.html
Scalable Performance Monitoring - http://sematext.com/spm/index.html

On Saturday, June 16, 2012 4:59:18 PM UTC-4, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON objects
for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database manually
into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests to
cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(Karel Minarik) #4

Hi Jörg,

the interface/configuration looks great, I love the way you use SELECT x as y (“labeled columns”) to construct the JSON.

Karel

On Saturday, June 16, 2012 10:59:18 PM UTC+2, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON objects
for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database manually
into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests to
cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(Jörg Prante) #5

Sorry, in 1.0.0, there was a glitch, the data did not get indexed. Fix
version 1.0.1 just released.

Jörg

On Sunday, June 17, 2012 9:00:47 AM UTC+2, Karel Minařík wrote:

Hi Jörg,

the interface/configuration looks great, I love the way you use SELECT x as y (“labeled columns”) to construct the JSON.

Karel

On Saturday, June 16, 2012 10:59:18 PM UTC+2, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON
objects for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database
manually into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to
select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests
to cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(Jörg Prante) #6

For future versions, I am thinking about some incremental update/delete
options. I don't know which one will work best.

Selected rows from SQL do not have an identity, as required for objects.
This leads to the requirement to store object identities, manage these
identities (in a different index?), and compare the last saved state of
indexed objects with the current state. This is quite resource expensive.
An internal ElasticSearch river index could be used as a persistent state
memory.

More elegant is another option, to let the SQL DB provide object
identities. This is possible by using the "_id" column label, so
Elasticsearch river indexing will simply overwrite old objects with new
objects.

Deletion of objects could be performed in two variants. First, true
deletion, i.e. Elasticsearch uses something like delete by query based upon
a list of ids. How to maintain such a list of ids is the question: either
the SQL DB provides a separate table or an extra SQL request that gives
back an id list with timestamps or markers (so the SQL DB must manage
additional object identities), or Elasticsearch serves as a persistent
state memory (maybe with the help of the version feature?). And second,
another variant of deleting object is not to delete documents, but to mark
documents as deleted, e.g. "{ "deleted":true }. This is a more
straightforward process together with data updates. A housekeeper job could
run periodically to clean such deleted documents from the index.

Best regards,

Jörg

On Sunday, June 17, 2012 6:26:21 AM UTC+2, Otis Gospodnetic wrote:

Like David said - I hope you are ready for the avalanche of questions....
at least I think that's what will happen based on what happened when Solr
got DataImportHandler a few years ago.

And speaking of DIH, how does one handle deletion of DB rows and how does
one select+index incrementally (only rows that changed since last river
run)?

Thanks,
Otis

Search Analytics - http://sematext.com/search-analytics/index.html
Scalable Performance Monitoring - http://sematext.com/spm/index.html

On Saturday, June 16, 2012 4:59:18 PM UTC-4, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON
objects for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database
manually into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to
select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests
to cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(phoenix) #7

Really nice !

I'm waiting to see the update process.
I think to options for object id should be available : using db id of
object (but how to manage compound ids), or let elasticsearch generate one
(but how to manage update/insert).
Maybe letting the user provide a primary key comparator may be nice,
allowing to let elasticsearch handle the id generation but recognize when
an object is being updated rather than inserted.

Frederic

Le samedi 16 juin 2012 22:59:18 UTC+2, Jörg Prante a écrit :

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON objects
for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database manually
into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests to
cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg


(Curt Hu) #8

Hi, I wonder after loading some data into Elastic, how to query the indices, do we have some examples? Thanks


(Jörg Prante) #9

Right now, I'm heading towards the versioning feature of Elasticsearch to
get used by the JDBC river.

Basic idea is, let the river first run, and set version of all indexed ES
documents to 1, then save this version to a river internal index, before
waiting for the next run. Subsequent river runs are updates and will use
version 2,3 ... and so on. And, at the end of each cycle, the documents in
the JDBC index with version of "version - 1" (one lower than the current
version) are deleted, making only the current state of the SQL DB being
indexed and searchable.

Best regards,

Jörg

On Monday, June 18, 2012 6:09:27 PM UTC+2, Frederic Esnault wrote:

Really nice !

I'm waiting to see the update process.
I think to options for object id should be available : using db id of
object (but how to manage compound ids), or let elasticsearch generate one
(but how to manage update/insert).
Maybe letting the user provide a primary key comparator may be nice,
allowing to let elasticsearch handle the id generation but recognize when
an object is being updated rather than inserted.

Frederic


(David Pilato) #10

Hi Jörg,

Have you seen this thread :
https://groups.google.com/forum/?hl=fr&fromgroups#!topic/elasticsearch/A4791hc_x2Q
?

You can not sort on _version. I don't know if you can search on _version ?
If not, that means that you will have to make a scan & scroll on each doc
on ES side to detect the version ? It could be a huge overhead for the
river...

David.

Le mercredi 20 juin 2012 22:47:20 UTC+2, Jörg Prante a écrit :

Right now, I'm heading towards the versioning feature of Elasticsearch to
get used by the JDBC river.

Basic idea is, let the river first run, and set version of all indexed ES
documents to 1, then save this version to a river internal index, before
waiting for the next run. Subsequent river runs are updates and will use
version 2,3 ... and so on. And, at the end of each cycle, the documents in
the JDBC index with version of "version - 1" (one lower than the current
version) are deleted, making only the current state of the SQL DB being
indexed and searchable.

Best regards,

Jörg

On Monday, June 18, 2012 6:09:27 PM UTC+2, Frederic Esnault wrote:

Really nice !

I'm waiting to see the update process.
I think to options for object id should be available : using db id of
object (but how to manage compound ids), or let elasticsearch generate one
(but how to manage update/insert).
Maybe letting the user provide a primary key comparator may be nice,
allowing to let elasticsearch handle the id generation but recognize when
an object is being updated rather than inserted.

Frederic


(Jörg Prante) #11

Hi,

a new version of the JDBC river is available.

Version 1.1.0 comes with an update/delete feature and fixes an issue with
the JDBC username.

The update/delete feature comes with an overhead of scanning through the
index when the SQL DB has changed data relevant to the JSON object ID set.

More information at https://github.com/jprante/elasticsearch-river-jdbc

Best regards,

Jörg


(Jörg Prante) #12

Yes, unfortunately it is a huge overhead. Improvements are welcome.

There is one option not implemented, where the SQL DB could manage
updates/deletes in a special table on its own. This option would enforce
structural changes at the side of the SQL DB which is often not possible.
Consider DBA team opinions being different from the search engineer team
opinions.

Best regards,

Jörg

On Thursday, June 21, 2012 11:52:23 AM UTC+2, David Pilato wrote:

Hi Jörg,

Have you seen this thread :
https://groups.google.com/forum/?hl=fr&fromgroups#!topic/elasticsearch/A4791hc_x2Q
?

You can not sort on _version. I don't know if you can search on _version ?
If not, that means that you will have to make a scan & scroll on each doc
on ES side to detect the version ? It could be a huge overhead for the
river...

David.

Le mercredi 20 juin 2012 22:47:20 UTC+2, Jörg Prante a écrit :

Right now, I'm heading towards the versioning feature of Elasticsearch to
get used by the JDBC river.

Basic idea is, let the river first run, and set version of all indexed ES
documents to 1, then save this version to a river internal index, before
waiting for the next run. Subsequent river runs are updates and will use
version 2,3 ... and so on. And, at the end of each cycle, the documents in
the JDBC index with version of "version - 1" (one lower than the current
version) are deleted, making only the current state of the SQL DB being
indexed and searchable.

Best regards,

Jörg

On Monday, June 18, 2012 6:09:27 PM UTC+2, Frederic Esnault wrote:

Really nice !

I'm waiting to see the update process.
I think to options for object id should be available : using db id of
object (but how to manage compound ids), or let elasticsearch generate one
(but how to manage update/insert).
Maybe letting the user provide a primary key comparator may be nice,
allowing to let elasticsearch handle the id generation but recognize when
an object is being updated rather than inserted.

Frederic


(David Pilato) #13

Yes. I think that it could be a nice option (not mandatory) for those that can
provide such a table.
For others, there will always have some overhead... As you said, it will be a
debate between teams without the same interest.

David.

Le 22 juin 2012 à 15:01, "Jörg Prante" joergprante@gmail.com a écrit :

Yes, unfortunately it is a huge overhead. Improvements are welcome.

There is one option not implemented, where the SQL DB could manage
updates/deletes in a special table on its own. This option would enforce
structural changes at the side of the SQL DB which is often not possible.
Consider DBA team opinions being different from the search engineer team
opinions.

Best regards,

Jörg

On Thursday, June 21, 2012 11:52:23 AM UTC+2, David Pilato wrote:

Hi Jörg,

Have you seen this thread :
https://groups.google.com/forum/?hl=fr&fromgroups#!topic/elasticsearch/A4791hc_x2Q
?
https://groups.google.com/forum/?hl=fr&fromgroups#!topic/elasticsearch/A4791hc_x2Q

You can not sort on _version. I don't know if you can search on _version
?
If not, that means that you will have to make a scan & scroll on each doc
on ES side to detect the version ? It could be a huge overhead for the
river...

David.

Le mercredi 20 juin 2012 22:47:20 UTC+2, Jörg Prante a écrit :
> > > Right now, I'm heading towards the versioning feature of
> > > Elasticsearch to get used by the JDBC river.

 Basic idea is, let the river first run, and set version of all

indexed ES documents to 1, then save this version to a river internal
index, before waiting for the next run. Subsequent river runs are updates
and will use version 2,3 ... and so on. And, at the end of each cycle, the
documents in the JDBC index with version of "version - 1" (one lower than
the current version) are deleted, making only the current state of the SQL
DB being indexed and searchable.

 Best regards,

 Jörg

 On Monday, June 18, 2012 6:09:27 PM UTC+2, Frederic Esnault wrote:
   > > > > Really nice !
   I'm waiting to see the update process.
   I think to options for object id should be available : using db

id of object (but how to manage compound ids), or let elasticsearch
generate one (but how to manage update/insert).
Maybe letting the user provide a primary key comparator may be
nice, allowing to let elasticsearch handle the id generation but
recognize when an object is being updated rather than inserted.

   Frederic

 > > >    > >  > 

https://groups.google.com/forum/?hl=fr&fromgroups#!topic/elasticsearch/A4791hc_x2Q

--
David Pilato
http://dev.david.pilato.fr/
Twitter : @dadoonet


(Jörg Prante) #14

Yes, I am thinking about a handshake mechanism so the SQL DB gets noticed
about successful Elasticsearch river runs, with three separate channels:
new objects, modified objects, and deleted objects. Each channel will
provide the ID sets so updating them in ES is straightforward. It will
require additional SQL statement configuration of the river and a special
"object indexing table" at the SQL DB side.

Best regards,

Jörg

On Friday, June 22, 2012 3:27:32 PM UTC+2, David Pilato wrote:

Yes. I think that it could be a nice option (not mandatory) for those
that can provide such a table.

For others, there will always have some overhead... As you said, it will
be a debate between teams without the same interest.


(dgabm) #15

Hi Jörg,

1)I' m using a Oracle XE server on a win7 x64, where I've done:
1.1)unzip elasticsearch-0.20.2.zip
1.2)copy ojdbc14.jar to %ES_HOME%/lib/ojdbc14.jar
1.3)./bin/plugin -url http://bit.ly/U75w1N -install river-jdbc

  1. Within oracle squirrel client:
    2.1)Create Table ORDERS(
    ORDER_ID Number(7) Primary Key,
    description VARCHAR2(100));

Create Table OPTIONS(
OPTION_ID Number(7) Primary Key,
order_id Number(7),
option_name VARCHAR2(50),
CONSTRAINT fk_order
FOREIGN KEY (order_id)
REFERENCES ORDERS(order_id))

INSERT INTO ORDERS(ORDER_ID, description) values(1, '1st order');

INSERT INTO OPTIONS(OPTION_ID, ORDER_ID, option_name) values(1, 1, '1st order option1');
INSERT INTO OPTIONS(OPTION_ID, ORDER_ID, option_name) values(2, 1, '1st order option2');

2.2)sqplus->

SQL> select * from orders;

ORDER_ID DESCRIPTION


     1		1st order

SQL> select * from options;

OPTION_ID ORDER_ID OPTION_NAME


     1          1 1st order option1
     2          1 1st order option2

SQL> select 'order' as "_index", ord.order_id as "_id",ord.order_id as "order.oId",option_name as "order.options" from orders ord inner join
options opt on ord.order_id = opt.order_id;

_index _id order.oId order.options


order 1 1 1st order option1
order 1 1 1st order option2

  1. On a ubuntu VM box, hosted on the same win7 x64, I executed:
    ubuntu@ubuntu-VirtualBox:~$ uname -a
    Linux ubuntu-VirtualBox 3.2.0-24-generic-pae #37-Ubuntu SMP Wed Apr 25 10:47:59 UTC 2012 i686 i686 i386 GNU/Linux

ubuntu@ubuntu-VirtualBox:~$ curl -XPUT '192.168.56.1:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "oracle.jdbc.OracleDriver",
"url" : "jdbc:oracle:thin:@localhost:1521/XE",
"user" : "",
"password" : "",
"poll" : "10s",
"sql" : "select \u0027order\u0027 as "_index", ord.order_id as "_id",ord.order_id as "order.oId",option_name as "order.option" from orders ord inner join

options opt on ord.order_id = opt.order_id"
}
}'

4)I'm searching for:
ubuntu@ubuntu-VirtualBox:~$ curl -XGET '192.168.56.1:9200/order/jdbc/_search?pretty'
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [ {
"_index" : "order",
"_type" : "jdbc",
"_id" : "1",
"_score" : 1.0, "_source" : {"order":{"oId":1,"option":"1st order option2"},"_id":1,"_index":"order"}
} ]
}

Obviously is fetching only the 2nd record from the one-to-many relation, the "1st order option1" is missing...where I've done wrong?

5)But I would like to have the following result:

"_score" : 1.0, "_source" : {"order":{"oId":1,"options":["1st order option1","1st order option2"]},"_id":1,"_index":"order"}

that should be an equivalent result from your example:

index=relations id=Good {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}

Can you please help me?with any hint?
Thanks
GM


(Vasanth) #16

Hi

I would like to introduce myself to this site...I am beginner to this topic so kindly get me into this to become a good..
1.First of all i need to know abt few lines regd Elastic Search like can we use it for all kind of java application (WebApplication and desktop application) If so clarify me how ?
2.I could in get proper vision how to integrate this elastic search in my java swing application.(JDBC Appl)
3.What is meant by curl here ? How to Use this ? what are the necessary to be initiated ?

Note :
Give me one small example how ths Elastic search adapted.
Can i develop the same in Eclipse ? How ?Available sw or plugins ?

Rds,
Vasanth


(Santosh B) #17

Hi,
Its a very good feature.
I was trying to use JDBC driver to import from hive/Impala but it never
works whereas mysql connector works perfectly fine.
Is it something it was specifically designed to work for mysql,MSSQl...and
few of them or any other databases which supports JDBC.

Thanks,
Santosh B

On Sunday, 17 June 2012 02:29:18 UTC+5:30, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON objects
for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database manually
into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good
{"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests to
cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


(Jörg Prante) #18

From the docs

https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients

I conclude that Hive2 is not a JDBC Type 4 driver. Only JDBC Type 4 drivers
are supported by JDBC plugin. JDBC Type 4 does no longer need Class.forName.

Jörg

On Mon, Jul 28, 2014 at 1:27 PM, Santosh B contactsantoshb@gmail.com
wrote:

Hi,
Its a very good feature.
I was trying to use JDBC driver to import from hive/Impala but it never
works whereas mysql connector works perfectly fine.
Is it something it was specifically designed to work for mysql,MSSQl...and
few of them or any other databases which supports JDBC.

Thanks,
Santosh B

On Sunday, 17 June 2012 02:29:18 UTC+5:30, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from JDBC
sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON
objects for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database
manually into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to
select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department | product | quantity | created |
+----------+-----------------+---------+----------+---------------------+
| Big | American Fruits | Apples | 1 | 0000-00-00 00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00 00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"
Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price > ?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests
to cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHRZw-sFaPrR6VmJqNDFUtRMTYuKHA6h1_sukWZsZWWMA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(Santosh B) #19

Thanks a lot for sharing....

On Mon, Jul 28, 2014 at 6:16 PM, joergprante@gmail.com <
joergprante@gmail.com> wrote:

From the docs

https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients

I conclude that Hive2 is not a JDBC Type 4 driver. Only JDBC Type 4
drivers are supported by JDBC plugin. JDBC Type 4 does no longer need
Class.forName.

Jörg

On Mon, Jul 28, 2014 at 1:27 PM, Santosh B contactsantoshb@gmail.com
wrote:

Hi,
Its a very good feature.
I was trying to use JDBC driver to import from hive/Impala but it never
works whereas mysql connector works perfectly fine.
Is it something it was specifically designed to work for
mysql,MSSQl...and few of them or any other databases which supports JDBC.

Thanks,
Santosh B

On Sunday, 17 June 2012 02:29:18 UTC+5:30, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from
JDBC sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON
objects for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database
manually into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to
select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver], sql [select

  • from orders], indexing to [jdbc]/[jdbc], poll [1h]
    [2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
    [jdbc] creating index, cause [api], shards [5]/[1], mappings []
    [2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
    [_river] update_mapping [my_jdbc_river] (dynamic)
    [2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] got 5 rows
    [2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
    [jdbc][my_jdbc_river] next run, waiting 1h, URL
    [jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
    [select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------
------------+
| customer | department | product | quantity | created
|
+----------+-----------------+---------+----------+---------
------------+
| Big | American Fruits | Apples | 1 | 0000-00-00
00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00 |
| Huge | German Fruits | Oranges | 2 | 0000-00-00
00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00 |
| Bad | English Fruits | Oranges | 3 | 2012-06-01
00:00:00 |
+----------+-----------------+---------+----------+---------
------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by the
JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"
Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"
Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"
Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones"
,"customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price >
?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"
Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"
Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and ?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"
Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size is
not specified.

A bulk size can be defined, also a maximum size of active bulk requests
to cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.

To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/elasticsearch/TonBKhpdjsA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHRZw-sFaPrR6VmJqNDFUtRMTYuKHA6h1_sukWZsZWWMA%40mail.gmail.com
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHRZw-sFaPrR6VmJqNDFUtRMTYuKHA6h1_sukWZsZWWMA%40mail.gmail.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAG%2Bs7e3arvL%2B9p3AM9FeT0fTHh7P_-_m1wZwzLQaMk76ex1_Ow%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(Madhavan Ramachandran) #20

Hi JP

I setup a single node ES 1.2.1 on windows 2008 R2. Need to know the JDBC
river version that support to import data from Microsoft SQL Server 2012.

Regards
Madhavan.TR

On Monday, July 28, 2014 8:59:45 AM UTC-5, Santosh B wrote:

Thanks a lot for sharing....

On Mon, Jul 28, 2014 at 6:16 PM, joerg...@gmail.com <javascript:> <
joerg...@gmail.com <javascript:>> wrote:

From the docs

https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients

I conclude that Hive2 is not a JDBC Type 4 driver. Only JDBC Type 4
drivers are supported by JDBC plugin. JDBC Type 4 does no longer need
Class.forName.

Jörg

On Mon, Jul 28, 2014 at 1:27 PM, Santosh B <contact...@gmail.com
<javascript:>> wrote:

Hi,
Its a very good feature.
I was trying to use JDBC driver to import from hive/Impala but it never
works whereas mysql connector works perfectly fine.
Is it something it was specifically designed to work for
mysql,MSSQl...and few of them or any other databases which supports JDBC.

Thanks,
Santosh B

On Sunday, 17 June 2012 02:29:18 UTC+5:30, Jörg Prante wrote:

Hi,

I'd like to announce a JDBC river implementation.

It can be found at https://github.com/jprante/elasticsearch-river-jdbc

I hope it is useful for all of you who need to index data from SQL
databases into ElasticSearch.

Suggestions, corrections, improvements are welcome!

Introduction

The Java Database Connection (JDBC) river allows to select data from
JDBC sources for indexing into ElasticSearch.

It is implemented as an Elasticsearch plugin.

The relational data is internally transformed into structured JSON
objects for ElasticSearch schema-less indexing.

Setting it up is as simple as executing something like the following
against ElasticSearch:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
}
}'

This HTTP PUT statement will create a river named my_jdbc_river
that fetches all the rows from the orders table in the MySQL database
test at localhost.

You have to install the JDBC driver jar of your favorite database
manually into
the plugins directory where the jar file of the JDBC river plugin
resides.

By default, the JDBC river re-executes the SQL statement on a regular
basis (60 minutes).

In case of a failover, the JDBC river will automatically be restarted
on another ElasticSearch node, and continue indexing.

Many JDBC rivers can run in parallel. Each river opens one thread to
select
the data.

Installation

In order to install the plugin, simply run: bin/plugin -install jprante/elasticsearch-river-jdbc/1.0.0.

Log example of river creation

[2012-06-16 18:50:10,035][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,046][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] starting JDBC connector: URL
[jdbc:mysql://localhost:3306/test], driver [com.mysql.jdbc.Driver],
sql [select * from orders], indexing to [jdbc]/[jdbc], poll [1h]
[2012-06-16 18:50:10,129][INFO ][cluster.metadata ] [Anomaly]
[jdbc] creating index, cause [api], shards [5]/[1], mappings []
[2012-06-16 18:50:10,353][INFO ][cluster.metadata ] [Anomaly]
[_river] update_mapping [my_jdbc_river] (dynamic)
[2012-06-16 18:50:10,714][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] got 5 rows
[2012-06-16 18:50:10,719][INFO ][river.jdbc ] [Anomaly]
[jdbc][my_jdbc_river] next run, waiting 1h, URL
[jdbc:mysql://localhost:3306/test] driver [com.mysql.jdbc.Driver] sql
[select * from orders]

Configuration

The SQL statements used for selecting can be configured as follows.

Star query

Star queries are the simplest variant of selecting data. They can be
used
to dump tables into ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
}
}'

For example

mysql> select * from orders;
+----------+-----------------+---------+----------+---------
------------+
| customer | department | product | quantity | created
|
+----------+-----------------+---------+----------+---------
------------+
| Big | American Fruits | Apples | 1 | 0000-00-00
00:00:00 |
| Large | German Fruits | Bananas | 1 | 0000-00-00 00:00:00
|
| Huge | German Fruits | Oranges | 2 | 0000-00-00
00:00:00 |
| Good | German Fruits | Apples | 2 | 2012-06-01 00:00:00
|
| Bad | English Fruits | Oranges | 3 | 2012-06-01
00:00:00 |
+----------+-----------------+---------+----------+---------
------------+
5 rows in set (0.00 sec)

The JSON objects are flat, the id
of the documents is generated automatically, it is the row number.

id=0 {"product":"Apples","created":null,"department":"American
Fruits","quantity":1,"customer":"Big"}
id=1 {"product":"Bananas","created":null,"department":"German
Fruits","quantity":1,"customer":"Large"}
id=2 {"product":"Oranges","created":null,"department":"German
Fruits","quantity":2,"customer":"Huge"}
id=3 {"product":"Apples","created":1338501600000,"department":"German
Fruits","quantity":2,"customer":"Good"}
id=4 {"product":"Oranges","created":1338501600000,"department":"English
Fruits","quantity":3,"customer":"Bad"}

Labeled columns

In SQL, each column may be labeled with a name. This name is used by
the JDBC river to JSON object construction.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product"
}
}'

In this query, the columns selected are described as product.name,
product.customer.name, and product.customer.bill.

mysql> select products.name as "product.name", orders.customer as
"product.customer", orders.quantity * products.price as
"product.customer.bill" from products, orders where products.name =
orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples | Big | 1 |
| Bananas | Large | 2 |
| Oranges | Huge | 6 |
| Apples | Good | 2 |
| Oranges | Bad | 9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)

The JSON objects are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"
Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"
Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"
Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"
Bad"}}}

There are three column labels with an underscore as prefix
that are mapped to the Elasticsearch index/type/id.

_id
_type
_index

Structured objects

One of the advantage of SQL queries is the join operation. From many
tables, new tuples can be formed.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select "relations" as "_index", orders.customer as
"_id", orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on
employees.department = orders.department"
}
}'

For example, these rows from SQL

mysql> select "relations" as "_index", orders.customer as "_id",
orders.customer as "contact.customer", employees.name as
"contact.employee" from orders left join employees on employees.department
= orders.department;
+-----------+-------+------------------+------------------+
| _index | _id | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big | Big | Smith |
| relations | Large | Large | Müller |
| relations | Large | Large | Meier |
| relations | Large | Large | Schulze |
| relations | Huge | Huge | Müller |
| relations | Huge | Huge | Meier |
| relations | Huge | Huge | Schulze |
| relations | Good | Good | Müller |
| relations | Good | Good | Meier |
| relations | Good | Good | Schulze |
| relations | Bad | Bad | Jones |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)

will generate fewer JSON objects for the index relations.

index=relations id=Big {"contact":{"employee":"Smith"
,"customer":"Big"}}
index=relations id=Large {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["
Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones"
,"customer":"Bad"}}

Note how the employee column is collapsed into a JSON array. The
repeated occurence of the _id column
controls how values are folded into arrays for making use of the
ElasticSearch JSON data model.

Bind parameter

Bind parameters are useful for selecting rows according to a matching
condition
where the match criteria is not known beforehand.

For example, only rows matching certain conditions can be indexed into
ElasticSearch.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.quantity * products.price >
?",
"params: [ 5.0 ]
}
}'

Example result

id=0 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"
Huge"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"
Bad"}}}

Time-based selecting

Because the JDBC river is running repeatedly, time-based selecting is
useful.
The current time is represented by the parameter value $now.

In this example, all rows beginning with a certain date up to now are
selected.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select products.name as "product.name",
orders.customer as "product.customer.name", orders.quantity *
products.price as "product.customer.bill" from products, orders where
products.name = orders.product and orders.created between ? - 14 and
?",
"params: [ 2012-06-01", "$now" ]
}
}'

Example result:

id=0 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"
Good"}}}
id=1 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"
Bad"}}}

Index

Each river can index into a specified index. Example:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc"
}
}'

Bulk indexing

Bulk indexing is automatically used in order to speed up the indexing
process.

Each SQL result set will be indexed by a single bulk if the bulk size
is not specified.

A bulk size can be defined, also a maximum size of active bulk requests
to cope with high load situations.
A bulk timeout defines the time period after which bulk feeds continue.

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders",
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"bulk_size" : 100,
"max_bulk_requests" : 30,
"bulk_timeout" : "60s"
}
}'

Stopping/deleting the river

curl -XDELETE 'localhost:9200/_river/my_jdbc_river/'

Best regards,,

Jörg

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com <javascript:>.

To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/17dc1eed-3a7c-4b7c-992d-bd0c65b2e3ff%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/elasticsearch/TonBKhpdjsA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHRZw-sFaPrR6VmJqNDFUtRMTYuKHA6h1_sukWZsZWWMA%40mail.gmail.com
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHRZw-sFaPrR6VmJqNDFUtRMTYuKHA6h1_sukWZsZWWMA%40mail.gmail.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/dba6831c-9b0b-475a-9348-d9df6f716379%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.