Keeping ES in sync with MySQL changes - JDBC River?


(Justin Doles) #1

I'm using ES 0.90.7 along with JDBC River 2.2.3. I'm attempting to find a
mechanic to periodically update ES from a MySQL table. I need to ensure
that I have every row from MySQL indexed into ES. Below is my test
structure:

orders table
CREATE TABLE orders (
id bigint(20) NOT NULL,
name varchar(45) DEFAULT NULL,
description text,
guid char(16) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
number int(11) unsigned DEFAULT NULL,
date datetime DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

orders_meta table
CREATE TABLE orders_meta (
id bigint(20) NOT NULL AUTO_INCREMENT,
last_run datetime DEFAULT NULL,
last_id bigint(20) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;

orders ES index
"mappings" : {
"order" : {
"properties" : {
"_id" : {
"type" : "long",
"index" : "not_analyzed"
},
"name" : {
"type" : "string"
},
"description" : {
"type" : "string"
},
"guid" : {
"type" : "string",
"index" : "not_analyzed"
},
"number" : {
"type" : "integer",
"index" : "not_analyzed"
},
"date" : {
"type" : "date"
}
}
}
}

orders JDBC river
{
"type" : "jdbc",
"jdbc" : {
"strategy" : "simple",
"poll" : "5s",
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://127.0.0.1:6033/test",
"user" : "username",
"password" : "password",
"sql" : "SELECT orders.id AS _id, orders.name,
orders.description, BinaryToGuid(orders.guid), orders.number,
orders.date FROM test.orders WHERE orders.id > (SELECT
orders_meta.last_id FROM test.orders_meta);",
"acksql" : "REPLACE INTO test.orders_meta (last_run, last_id)
VALUES (UTC_TIMESTAMP(), 0);"
},
"index" : {
"index" : "orders",
"type" : "order"
}
}

As you can see from the river, my thought was to keep a meta table that
tracked the last id that pulled into ES. The trouble is that I don't how
or if you can get that last (highest) _id from ES (note the 0 in the acksql
portion).

This may not be the right way to even tackle this. I'm open to ideas. My
production tables get ~2-6 million rows inserted daily. I need to update
ES relatively quickly for searching.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/62bc56a9-52e9-438f-8ca6-e6b22c6810e5%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(David Pilato) #2

My opinion is that if you need to update as fast as possible elasticsearch, then polling a database every minute is not the best option.
I mean that if possible I'd prefer to push to elasticsearch my documents as soon as they are stored in your database.

I did that in the past from a service layer. Service layer was sending SQL inserts (updates, deletes) into postgresql and in the same time into elasticsearch as JSon docs.

My 0.05 cents.

--
David Pilato | Technical Advocate | Elasticsearch.com
@dadoonet | @elasticsearchfr

Le 9 décembre 2013 at 22:13:37, Justin Doles (jmdoles@gmail.com) a écrit:

I'm using ES 0.90.7 along with JDBC River 2.2.3. I'm attempting to find a mechanic to periodically update ES from a MySQL table. I need to ensure that I have every row from MySQL indexed into ES. Below is my test structure:

orders table
CREATE TABLE orders (
id bigint(20) NOT NULL,
name varchar(45) DEFAULT NULL,
description text,
guid char(16) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
number int(11) unsigned DEFAULT NULL,
date datetime DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

orders_meta table
CREATE TABLE orders_meta (
id bigint(20) NOT NULL AUTO_INCREMENT,
last_run datetime DEFAULT NULL,
last_id bigint(20) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;

orders ES index
"mappings" : {
"order" : {
"properties" : {
"_id" : {
"type" : "long",
"index" : "not_analyzed"
},
"name" : {
"type" : "string"
},
"description" : {
"type" : "string"
},
"guid" : {
"type" : "string",
"index" : "not_analyzed"
},
"number" : {
"type" : "integer",
"index" : "not_analyzed"
},
"date" : {
"type" : "date"
}
}
}
}

orders JDBC river
{
"type" : "jdbc",
"jdbc" : {
"strategy" : "simple",
"poll" : "5s",
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://127.0.0.1:6033/test",
"user" : "username",
"password" : "password",
"sql" : "SELECT orders.id AS _id, orders.name, orders.description, BinaryToGuid(orders.guid), orders.number, orders.date FROM test.orders WHERE orders.id > (SELECT orders_meta.last_id FROM test.orders_meta);",
"acksql" : "REPLACE INTO test.orders_meta (last_run, last_id) VALUES (UTC_TIMESTAMP(), 0);"
},
"index" : {
"index" : "orders",
"type" : "order"
}
}

As you can see from the river, my thought was to keep a meta table that tracked the last id that pulled into ES. The trouble is that I don't how or if you can get that last (highest) _id from ES (note the 0 in the acksql portion).

This may not be the right way to even tackle this. I'm open to ideas. My production tables get ~2-6 million rows inserted daily. I need to update ES relatively quickly for searching.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/62bc56a9-52e9-438f-8ca6-e6b22c6810e5%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/etPan.52a634af.1befd79f.2987%40MacBook-Air-de-David.local.
For more options, visit https://groups.google.com/groups/opt_out.


(pellyadolfo) #3

I find this a common issue when working with ES. Alternatively, is there
any way to enable distributed query joins between ES and another storage
backend?

I see 2 cases (some dirty example):

  • when querying ES allow fetching a record in other storage backend, in
    Java something as:

SearchResponse response = getClient().prepareSearch()
.setIndices("myindice")
.setTypes("mytype")

  • .preprocessRecord(*
    // space to fetch this record in another backend by using a handler
    Object remoteRecord = handler.getRecord(id);

// add fields of this remote record to the output
this.put("field", remoteRecord.get("field"));

  • )*
    .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
    .setQuery(QueryBuilders
    .boolQuery()
    .should(QueryBuilders.matchQuery("content", "SQL"))
    )
    .addFields("vtitle", "content")
    .setFrom(0)
    .setSize(25)
    .execute()
    .actionGet();

  • and the other way around, from the secondary backend allow running
    queries to obtain ES documents and their features:

  • // space to fetch this record in another backend by using a handler*

  • Object ESDoc = ES.getDoc(id);*

  • // add fields of this remote record to the output*

  • int countWords = ESDoc.countWords();*

Would be this schema feasible to operate with enough performance between ES
and another storage backend? Thoughts?

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/25015b31-d99e-457d-9e62-6aa9c0742fb4%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Jörg Prante) #4

If you insert ~2-6 million rows, I guess you have around 100-1000 peak docs
per second to expect.

Well that is tough. Selecting 1000 rows per second by a single instance
river, plus indexing, is a challenge. I have a tweaked application where I
read from 12 Oracle DB connections in parallel and receive a lousy 400-800
rows per sec in the JDBC river, with JSON docs having around 1k size in
average. ES is not a bottleneck, I can index >10k docs per second.

I agree with David, a better method would be integrating a doc pusher in
the app layer which can use straight bulk indexing, without the expensive
"select" step of a river.

The "last id" question is an open request to the JDBC river, and would have
been considered already, but I don't know a good solution. Not sure how I
can implement this. SQL result rows do not map 1:1 to the generated JSON
docs. The obstacles are: SQL result rows do not offer row identity
information that can be used as a key for continuation (like
"suspending/resuming an SQL query"). Also looking into a "last" value of a
given column name can't work because such values are not guaranteed to be
ordered, since I can't force all SQL statement executing in "order by" mode.

Jörg

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHExU6yLS3ZKkGcNHonsO4XbOejrZY3-abcv0pg8yNTDA%40mail.gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Justin Doles) #5

I think you guys are probably right. The 2-6 million inserts will grow
over the next few years as well. I was hoping to avoid that only because
the service that inserts all this data is extremely fast (and has to be)
and this might cause a bit of a slow down. It's also written in C/C++ so
there's not a "native" ES client. Though maybe I could leverage the bulk
API via the REST interface. ...sorry just thinking out loud here. :slight_smile:

For JDBC river, I map my autoincrement id column as _id. Wouldn't that
make it near 1:1 since we have the doc id to work with? If there was a way
to get the MAX of the result set, that could work. Not sure how that looks
on the ES/JDBC river side though. I know I can't get that value within my
SQL statements (or I haven't been able to yet).

Thanks for the tips so far.

Justin

On Monday, December 9, 2013 5:30:00 PM UTC-5, Jörg Prante wrote:

If you insert ~2-6 million rows, I guess you have around 100-1000 peak
docs per second to expect.

Well that is tough. Selecting 1000 rows per second by a single instance
river, plus indexing, is a challenge. I have a tweaked application where I
read from 12 Oracle DB connections in parallel and receive a lousy 400-800
rows per sec in the JDBC river, with JSON docs having around 1k size in
average. ES is not a bottleneck, I can index >10k docs per second.

I agree with David, a better method would be integrating a doc pusher in
the app layer which can use straight bulk indexing, without the expensive
"select" step of a river.

The "last id" question is an open request to the JDBC river, and would
have been considered already, but I don't know a good solution. Not sure
how I can implement this. SQL result rows do not map 1:1 to the generated
JSON docs. The obstacles are: SQL result rows do not offer row identity
information that can be used as a key for continuation (like
"suspending/resuming an SQL query"). Also looking into a "last" value of a
given column name can't work because such values are not guaranteed to be
ordered, since I can't force all SQL statement executing in "order by" mode.

Jörg

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/95966c80-a8ca-45b0-a5ca-7ba0509db887%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(David Pilato) #6

What I actually did for my project was to send to a MessageQ (ActiveMQ) my inserts in order to keep insertion as fast as possible and to manage as well Elasticsearch interruption (think of cluster upgrade for example).

--
David :wink:
Twitter : @dadoonet / @elasticsearchfr / @scrutmydocs

Le 10 déc. 2013 à 04:01, Justin Doles jmdoles@gmail.com a écrit :

I think you guys are probably right. The 2-6 million inserts will grow over the next few years as well. I was hoping to avoid that only because the service that inserts all this data is extremely fast (and has to be) and this might cause a bit of a slow down. It's also written in C/C++ so there's not a "native" ES client. Though maybe I could leverage the bulk API via the REST interface. ...sorry just thinking out loud here. :slight_smile:

For JDBC river, I map my autoincrement id column as _id. Wouldn't that make it near 1:1 since we have the doc id to work with? If there was a way to get the MAX of the result set, that could work. Not sure how that looks on the ES/JDBC river side though. I know I can't get that value within my SQL statements (or I haven't been able to yet).

Thanks for the tips so far.

Justin

On Monday, December 9, 2013 5:30:00 PM UTC-5, Jörg Prante wrote:
If you insert ~2-6 million rows, I guess you have around 100-1000 peak docs per second to expect.

Well that is tough. Selecting 1000 rows per second by a single instance river, plus indexing, is a challenge. I have a tweaked application where I read from 12 Oracle DB connections in parallel and receive a lousy 400-800 rows per sec in the JDBC river, with JSON docs having around 1k size in average. ES is not a bottleneck, I can index >10k docs per second.

I agree with David, a better method would be integrating a doc pusher in the app layer which can use straight bulk indexing, without the expensive "select" step of a river.

The "last id" question is an open request to the JDBC river, and would have been considered already, but I don't know a good solution. Not sure how I can implement this. SQL result rows do not map 1:1 to the generated JSON docs. The obstacles are: SQL result rows do not offer row identity information that can be used as a key for continuation (like "suspending/resuming an SQL query"). Also looking into a "last" value of a given column name can't work because such values are not guaranteed to be ordered, since I can't force all SQL statement executing in "order by" mode.

Jörg

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/95966c80-a8ca-45b0-a5ca-7ba0509db887%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/C31F737D-D0A1-4604-903C-CAC761576810%40pilato.fr.
For more options, visit https://groups.google.com/groups/opt_out.


(Jörg Prante) #7

In fact, you do not need a "native C client". Check if you can use libcurl
for your purpose to write a simple C client for the HTTP REST bulk API.

Jörg

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoHDsPJV0pyuzjWsqz456hzfpuvcKTFdQPqDCoSVeT7b3Q%40mail.gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Justin Doles) #8

Not a bad idea. That would keep the process separate enough to not
interfere. We do have a queuing mechanic already so maybe I can leverage
that.

On Tuesday, December 10, 2013 1:47:15 AM UTC-5, David Pilato wrote:

What I actually did for my project was to send to a MessageQ (ActiveMQ) my
inserts in order to keep insertion as fast as possible and to manage as
well Elasticsearch interruption (think of cluster upgrade for example).

--
David :wink:
Twitter : @dadoonet / @elasticsearchfr / @scrutmydocs

Le 10 déc. 2013 à 04:01, Justin Doles <jmd...@gmail.com <javascript:>> a
écrit :

I think you guys are probably right. The 2-6 million inserts will grow
over the next few years as well. I was hoping to avoid that only because
the service that inserts all this data is extremely fast (and has to be)
and this might cause a bit of a slow down. It's also written in C/C++ so
there's not a "native" ES client. Though maybe I could leverage the bulk
API via the REST interface. ...sorry just thinking out loud here. :slight_smile:

For JDBC river, I map my autoincrement id column as _id. Wouldn't
that make it near 1:1 since we have the doc id to work with? If there was
a way to get the MAX of the result set, that could work. Not sure how that
looks on the ES/JDBC river side though. I know I can't get that value
within my SQL statements (or I haven't been able to yet).

Thanks for the tips so far.

Justin

On Monday, December 9, 2013 5:30:00 PM UTC-5, Jörg Prante wrote:

If you insert ~2-6 million rows, I guess you have around 100-1000 peak
docs per second to expect.

Well that is tough. Selecting 1000 rows per second by a single instance
river, plus indexing, is a challenge. I have a tweaked application where I
read from 12 Oracle DB connections in parallel and receive a lousy 400-800
rows per sec in the JDBC river, with JSON docs having around 1k size in
average. ES is not a bottleneck, I can index >10k docs per second.

I agree with David, a better method would be integrating a doc pusher in
the app layer which can use straight bulk indexing, without the expensive
"select" step of a river.

The "last id" question is an open request to the JDBC river, and would
have been considered already, but I don't know a good solution. Not sure
how I can implement this. SQL result rows do not map 1:1 to the generated
JSON docs. The obstacles are: SQL result rows do not offer row identity
information that can be used as a key for continuation (like
"suspending/resuming an SQL query"). Also looking into a "last" value of a
given column name can't work because such values are not guaranteed to be
ordered, since I can't force all SQL statement executing in "order by" mode.

Jörg

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/95966c80-a8ca-45b0-a5ca-7ba0509db887%40googlegroups.com
.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/52b8151b-2bfa-4512-9a00-2ed244e5b317%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(system) #9