Advice on writing ES plugin


(wires) #1

Hello,

I have developed a solution for a customer (http://vpro.nl) to consistently
denormalize data while indexing it.
The system allows you to extract graph structure and compute attributes
based on walks on the graph. I tried to explain it on this github
page: https://github.com/0x01/degraphmalizer

VPRO kindly allowed me to contribute the code and I would like to turn this
into an ES plugin (it is currently a Netty pipeline in front of ES index
operations). I would like some guidance on how to do a few things in an ES
plugin:

1). For every document indexed into ES, we need to trigger subgraph
extraction and start jobs te recompute the invalid documents, how to hook
into this? What about multiple shards? Do I need to install the plugin on
every node?

2). I am not segmenting the graph, so it is on one machine. So I would like
to start one instance of the system; I can probably base this on the river
code?

3). But I would like to replicate the graph to other nodes, can I use
some existing ES infrastructure for his?

4). I would like to push the configuration to a special index, much like
the rivers. However the conf is not JSON but true Javascript, any hint on
how to achieve this? (I'm currently using Rhino on local files, i can
probably just get a stream from some REST endpoint...)

So,

I understand this is a bit vague, but any pointers to source or examples
would be greatly appreciated. I'll then probably come back with so more
specific questions :slight_smile:

Thanks,

bye,
Jelle

--


(sujoysett) #2

Hi,

Probably elasticsearch-mapper-attachments can be a guide to what u
explained u want.


Here metadata of attachments is extracted, and that extra meta-information
is indexed as extra fields, while indexing.

Yes, u need to apply the plugin on every node (probably every data node, to
be exact, but I am not entirely sure on this).

Implementing a river would probably be an ever-running unnecessary overhead
for your requirement.

-- Sujoy.

On Friday, October 5, 2012 7:55:43 PM UTC+5:30, Jelle Herold wrote:

Hello,

I have developed a solution for a customer (http://vpro.nl) to
consistently denormalize data while indexing it.
The system allows you to extract graph structure and compute attributes
based on walks on the graph. I tried to explain it on this github page:
https://github.com/0x01/degraphmalizer

VPRO kindly allowed me to contribute the code and I would like to turn
this into an ES plugin (it is currently a Netty pipeline in front of ES
index operations). I would like some guidance on how to do a few things in
an ES plugin:

1). For every document indexed into ES, we need to trigger subgraph
extraction and start jobs te recompute the invalid documents, how to hook
into this? What about multiple shards? Do I need to install the plugin on
every node?

2). I am not segmenting the graph, so it is on one machine. So I would
like to start one instance of the system; I can probably base this on the
river code?

3). But I would like to replicate the graph to other nodes, can I use
some existing ES infrastructure for his?

4). I would like to push the configuration to a special index, much like
the rivers. However the conf is not JSON but true Javascript, any hint on
how to achieve this? (I'm currently using Rhino on local files, i can
probably just get a stream from some REST endpoint...)

So,

I understand this is a bit vague, but any pointers to source or examples
would be greatly appreciated. I'll then probably come back with so more
specific questions :slight_smile:

Thanks,

bye,
Jelle

--


(Jörg Prante) #3

I'm very happy to see your "degraphmalizer", it is exactly what I have in
mind for indexing bibliographic data into Elasticsearch that are
referencing to authority file data. In library terms, adding de-normalized
data is often referred to as "Verweisungsformen" (i.e. adding reference
forms).

I think the best is to put the graph into an ES index. So it is globally
visible.

The plugin architecture of Elasticsearch is a little bit complex, there are
some points developers can hook up their code:

  • "river plugins" operating at cluster level

  • "generic plugins" working at node level

  • "index plugins" working at indices level, having access to mappings and
    indices within a cluster

  • "shard plugins" working at shard level, having access to Lucene index
    structures

  • "transport client plugins" working out-of-scope of a cluster

A river plugin is a single-instance plugin managed by the cluster (in its
metadata), where the main purpose is to connect to a single external
source. A challenge might be that the river lifecycle is maintained and
initiated at server side by ES.

You could try to insert the "degraphmalizer" as a transport client plugin
so it could be initiated at client side by an external application, an
index feeder program or the like. Such a transport client plugin can access
the ES cluster state by transport client methods or sourcing documents from
other indexes.

I would love to see more of such "content plugins". Such plugins could be
hooked into the XContentBuilder for enhancing the document content before
it got indexed. Similar to analyzers, but analyzers are server side and
restricted to a single Lucene field. Content plugins should be able to
operate at document level and should have access to the ES field mapper
(for adding new fields).

Jörg

On Friday, October 5, 2012 4:25:43 PM UTC+2, Jelle Herold wrote:

Hello,

I have developed a solution for a customer (http://vpro.nl) to
consistently denormalize data while indexing it.
The system allows you to extract graph structure and compute attributes
based on walks on the graph. I tried to explain it on this github page:
https://github.com/0x01/degraphmalizer

VPRO kindly allowed me to contribute the code and I would like to turn
this into an ES plugin (it is currently a Netty pipeline in front of ES
index operations). I would like some guidance on how to do a few things in
an ES plugin:

1). For every document indexed into ES, we need to trigger subgraph
extraction and start jobs te recompute the invalid documents, how to hook
into this? What about multiple shards? Do I need to install the plugin on
every node?

2). I am not segmenting the graph, so it is on one machine. So I would
like to start one instance of the system; I can probably base this on the
river code?

3). But I would like to replicate the graph to other nodes, can I use
some existing ES infrastructure for his?

4). I would like to push the configuration to a special index, much like
the rivers. However the conf is not JSON but true Javascript, any hint on
how to achieve this? (I'm currently using Rhino on local files, i can
probably just get a stream from some REST endpoint...)

So,

I understand this is a bit vague, but any pointers to source or examples
would be greatly appreciated. I'll then probably come back with so more
specific questions :slight_smile:

Thanks,

bye,
Jelle

--


(wires) #4

Hello Jörg!

Op maandag 8 oktober 2012 16:33:18 UTC+2 schreef Jörg Prante het volgende:

I'm very happy to see your "degraphmalizer", it is exactly what I have in
mind for indexing bibliographic data into Elasticsearch that are
referencing to authority file data. In library terms, adding de-normalized
data is often referred to as "Verweisungsformen" (i.e. adding reference
forms).

Great! :^)

Maybe we can join forces? I will be cleaning up the code from our current
prototype and slowly move it into the github repository, I am very open to
any criticism on the code or other suggestions.

I think the best is to put the graph into an ES index. So it is globally

visible.

Yes, indeed!

I kept this in mind when writing the application: it is based on the tinkerpop
blueprints API https://github.com/tinkerpop/blueprints/wiki so you are
free to use any supported graph db. It should be quite straightforward to
write an ES implementation of blueprints and swap that in.

But I am a bit worried about the performance when storing the graph in ES.
The "obvious" implementation would be to use "adjacency list" format to
store the graph: one document per vertex + it's adjacent edges; but it
seems that this would not perform very well... just guessing, I would need
to experiment.

In any case, efficiently "sharding" a graph is far from trivial, so there
are limits to what you can do with this.

An alternative is to store the graph on a different box and communicate
over the network. This can be done right now using Rexterhttps://github.com/tinkerpop/blueprints/wiki/Rexster-Implementation
.

Another option would be to have two identical copies of the graph on two
nodes. Maybe this can be somehow hooked into's ES's "sync replication"
mechanism. The indexing request would only return 'succes' to the client
after all graphs processed the request..?

The plugin architecture of Elasticsearch is a little bit complex, there are

some points developers can hook up their code:

  • "river plugins" operating at cluster level

  • "generic plugins" working at node level

  • "index plugins" working at indices level, having access to mappings and
    indices within a cluster

  • "shard plugins" working at shard level, having access to Lucene index
    structures

  • "transport client plugins" working out-of-scope of a cluster

Right. Thanks for explaining this, I wish there was slightly more developer
documentation, but I'm slowly getting used to the ES codebase...

A river plugin is a single-instance plugin managed by the cluster (in its
metadata), where the main purpose is to connect to a single external
source. A challenge might be that the river lifecycle is maintained and
initiated at server side by ES.

Initially I thought about this. The "river" could keep a connection to a
Rexter server. It would push changes to the graph over this connection.
This isn't really a river, it's an inverse river :-0. But ok, this would
only work if we can somehow scroll an ES 'changes feed'... does this exist?

You could try to insert the "degraphmalizer" as a transport client plugin
so it could be initiated at client side by an external application, an
index feeder program or the like. Such a transport client plugin can access
the ES cluster state by transport client methods or sourcing documents from
other indexes.

Ah, that is a nice idea. So if I understand correctly, you suggest that
instead of a regular "Client" or "TransportClient" you would use a
DegraphmalizerClient that wraps a regular client and does it's work for
every index request.

While I like this idea, I thought it would be useful to sit in between a
River and ES. That way rivers can still be used to get the data into ES,
while the degraphmalizer sticks to degraphmalizing :wink:

Any thoughts? There is no API to write such plugins, right?

I would love to see more of such "content plugins". Such plugins could be

hooked into the XContentBuilder for enhancing the document content before
it got indexed. Similar to analyzers, but analyzers are server side and
restricted to a single Lucene field. Content plugins should be able to
operate at document level and should have access to the ES field mapper
(for adding new fields).

Yes, this sounds like what I need. Do you know if there is any API to do
this?

I am currently working through your tutorial and the sources of various
other ES plugins.
Hopefully soon I'll understand this a bit better.

Thanks again for your response,

Bye!
Jelle.

--


(wires) #5

Op maandag 8 oktober 2012 13:02:08 UTC+2 schreef Sujoy Sett het volgende:

Hi,

Probably elasticsearch-mapper-attachments can be a guide to what u
explained u want.
https://github.com/elasticsearch/elasticsearch-mapper-attachments
Here metadata of attachments is extracted, and that extra meta-information
is indexed as extra fields, while indexing.

Yes, u need to apply the plugin on every node (probably every data node,
to be exact, but I am not entirely sure on this).

Thanks!
I'm currently looking into this plugin to see if I can adjust it to my
needs.

Thanks for your pointers,

Best,
Jelle.

--


(Jörg Prante) #6

Hi Jelle,

On Friday, October 12, 2012 4:07:25 PM UTC+2, wires wrote:

Maybe we can join forces? I will be cleaning up the code from our current
prototype and slowly move it into the github repository, I am very open to
any criticism on the code or other suggestions.

Sure. In the next few weeks, while Elasticsearch is moving towards Lucene
4, I will focus on connecting RDF graph storage systems to Elasticsearch.
See Apache Stanbol for an approach of the CMS community to store semantic
content.

In any case, efficiently "sharding" a graph is far from trivial, so there
are limits to what you can do with this.

It depends on the granularity. Be aware that storing single nodes and edges
as Lucene documents will be problematic in an inverted indexing environment
because it produces a large overhead that does not work efficently. The
idea is to segment a graph in subgraphs that can be stored as documents,
where documents contain lots of literals...

I wasn't aware of projects like tinkerpop blueprints but it looks like they
focus on graph traversal operations on node and edge level. Unfortunately
this is not matching the Lucene document retrieval model very well.

Ah, that is a nice idea. So if I understand correctly, you suggest that
instead of a regular "Client" or "TransportClient" you would use a
DegraphmalizerClient that wraps a regular client and does it's work for
every index request.

It think of an ES Client, wrap it up into a service that knows about the
object model of the graph, and that can read and write in both directions...

Any thoughts? There is no API to write such plugins, right?

I don't know if it is an API that is the missing part, it is just a
question of combining some methods that exist in the ES test cases examples
and in some river plugins.

A "content API" would help in the sense that it could ease the development
of plugins that are constructing ES documents by guiding the devloper how
to interact with existing data in an ES cluster, or by including data from
external sources into the ES XContent (the JSON document).

Best regards,

Jörg

--


(wires) #7

Hello Jörg,

Op vrijdag 12 oktober 2012 18:45:51 UTC+2 schreef Jörg Prante het volgende:

Hi Jelle,

On Friday, October 12, 2012 4:07:25 PM UTC+2, wires wrote:

Maybe we can join forces? I will be cleaning up the code from our
current prototype and slowly move it into the github repository, I am very
open to any criticism on the code or other suggestions.

Sure. In the next few weeks, while Elasticsearch is moving towards Lucene
4, I will focus on connecting RDF graph storage systems to Elasticsearch.
See Apache Stanbol for an approach of the CMS community to store semantic
content.

Very interesting! nice project, didn't know about it.

In any case, efficiently "sharding" a graph is far from trivial, so
there are limits to what you can do with this.

It depends on the granularity. Be aware that storing single nodes and
edges as Lucene documents will be problematic in an inverted indexing
environment because it produces a large overhead that does not work
efficently. The idea is to segment a graph in subgraphs that can be stored
as documents, where documents contain lots of literals...

Exactly. Do you know how such segmentation can be efficiently computed?

I suppose for my case one could keep track of nodes with a high degree and
select those nodes as "centers" and perform a small 'breadth first' walk
around them. The subgraphs generated this way could overlap a bit. If the
graph consists of many small disconnected components then I can "fatten"
the subgraphs by picking multiple starting nodes for the walk.

But this is speculation, if you know about any algorithms etc, I'm very
interested.

As an initial implementation, can't I just create an index that doesn't
index but only stores the document, effectively turning it into a key/value
store without the Lucene overhead?
I won't be doing any queries on it anyway, just lots of gets. You can then
run the denormalization process on a dedicated node which has that index
locally.
What is your opinion on this?

Another remark that might be interesting: Documents are identified by
(id,version). To each document I associate it's subgraph and label all the
vertices and edges in the subgraph with the id and version of the
document. This way I can just take a subgraph out of the graph and replace
it with a newer and keep the graph consistent.

Several of these subgraphs together might be natural candidates for the
documents in the "graph index".

I wasn't aware of projects like tinkerpop blueprints but it looks like they

focus on graph traversal operations on node and edge level. Unfortunately
this is not matching the Lucene document retrieval model very well.

Yes, indeed.

But a good implementation could hide this overhead by caching subgraphs as
you explained above, right?

Anyway, it is a good fit for our project (I'm only interested in dependency
tracking of denormalized attributes ATM); but it might be a good idea to
see how we can store and process graphs on ES. The distributed nature
should make ES quite suited for BSP style graph processing. I'm interested
in thoughts you might have on this.

Ah, that is a nice idea. So if I understand correctly, you suggest that
instead of a regular "Client" or "TransportClient" you would use a
DegraphmalizerClient that wraps a regular client and does it's work for
every index request.

It think of an ES Client, wrap it up into a service that knows about the
object model of the graph, and that can read and write in both directions...

So while I like this approach, we could mix it with our ES proxy
application, I don't think you can still use the rivers etc to get data
into ES and have it automatically denormalized?
(as you need to explicitly pass it through the wrapped client)

Any thoughts? There is no API to write such plugins, right?

I don't know if it is an API that is the missing part, it is just a
question of combining some methods that exist in the ES test cases examples
and in some river plugins.

A "content API" would help in the sense that it could ease the development
of plugins that are constructing ES documents by guiding the devloper how
to interact with existing data in an ES cluster, or by including data from
external sources into the ES XContent (the JSON document).

I think I can do this using the IndexingOperationsListener; I've committed
some initial code to the git repository:

I think that instead of just returning the object I can modify it before
returning, I'm going to test this this week.

bye,
Jelle.

--


(Jörg Prante) #8

Hi Jelle,

On Tuesday, October 16, 2012 3:10:30 PM UTC+2, wires wrote:

Exactly. Do you know how such segmentation can be efficiently computed?

You know, graph algorithms are hard in general, for practical cases, there
are a lot of papers in the net (labelled graphs, DAGs, Trees). Few code
that is related to document retrieval with graph data, but it's
understandable, it's a hard topic.

I suppose for my case one could keep track of nodes with a high degree and
select those nodes as "centers" and perform a small 'breadth first' walk
around them. The subgraphs generated this way could overlap a bit. If the
graph consists of many small disconnected components then I can "fatten"
the subgraphs by picking multiple starting nodes for the walk.

There are two approaches. First, you have a ready-built graph and you can't
create it by yourself. This needs traversing the graph as you describe, and
get the valuable data out if it. You can segment it or not, as long as the
data can be indexed into ES somehow.

Another approach is, you have only a graph model and you can build your
data by yourself. Let's start by the simplest data structures (known as
tuples and sequences) and end up at the most complex data structures. It's
not that complex if you implement the graph traversal as iterating over an
ordered stream of nodes and go attribute by attribute. In RDF, you have a
huge sequence of such traversed elements, called triples (or statements).

The graphs I work with are RDF graphs (directed edge-labeled graphs, also
known as linked data, LD) and have a lot of predefined identifiers in it
(URIs), so the idea is to catch up with these IDs (the subject IDs), and
interpret the connected nodes as features (or attribute lists) of an object
(connected by value URIs or blank nodes). These features of an object can
be iterated and expressed in JSON syntax. The JSON document can be indexed
in ES, carrying all the hierarchical information of the original RDF.

But this is speculation, if you know about any algorithms etc, I'm very
interested.

As an initial implementation, can't I just create an index that doesn't
index but only stores the document, effectively turning it into a key/value
store without the Lucene overhead?

Exactly, I have done some work in constructing key/value streams, mapping
them to an indexable representation, create RDF triples, convert them to
JSON (now there is JSON-LD which makes the process easier as it is an
evolving W3C standard) and pushing the JSON-LD to an ES cluster.

I won't be doing any queries on it anyway, just lots of gets. You can
then run the denormalization process on a dedicated node which has that
index locally.
What is your opinion on this?

Please consider that querying is the strength of ES. A lot of gets is
challenging, because such an I/O-style does not scale. Soon the system will
bog down under heavy I/O, if you mix reads and writes.

If you consider a document being a collection of attributes relating to s
unique ID, all you need to do is to build a sequential list of subgraphs
and index them into ES. The unique ID is crucial for the denormalization.

I think of denormalization as being a part of an input data processing
pipeline. While using the XContentBuilder, it should be convenient to
connect to external source via unique IDs/URIs (perhaps to key/value
streams) and fetch additional fields by evaluating the current fields.

A demonstration of SKOS/RDF-based denormalization targeting a single Lucene
field can be found in Bernhard Haslhofer's SKOS for Lucene. It has some
limitations as it requires the SKOS graph in memory. I ported it to ES, see

Another remark that might be interesting: Documents are identified by
(id,version). To each document I associate it's subgraph and label all the
vertices and edges in the subgraph with the id and version of the document.
This way I can just take a subgraph out of the graph and replace it with a
newer and keep the graph consistent. Several of these subgraphs together
might be natural candidates for the documents in the "graph index".

Cool idea, versioning of data (documents) is also on my list of challenges
I need to tackle. Right now, I use versioning only ES style, that is, an
atomic counter to prevent collisions.

But a good implementation could hide this overhead by caching subgraphs
as you explained above, right?

Yes, but be aware, ES is not something like neo4j (a graph database),
though the REST GET API looks very tempting to do so. Its strength is the
inverted index and the fast search over large amounts of text literals.

Anyway, it is a good fit for our project (I'm only interested in
dependency tracking of denormalized attributes ATM); but it might be a good
idea to see how we can store and process graphs on ES. The distributed
nature should make ES quite suited for BSP style graph processing. I'm
interested in thoughts you might have on this.

Hm, BSP is something for Apache Hama. But I agree that distributed
computing makes sense with ES, as long as search and indexing is the main
part of the computation.

So while I like this approach, we could mix it with our ES proxy
application, I don't think you can still use the rivers etc to get data
into ES and have it automatically denormalized? (as you need to explicitly
pass it through the wrapped client)

Rivers are no magic when it comes to indexing, then you need a client (such
as a node client) to get the data into ES. The process of "content
scripting" could run in a river, for example (or just in a normal ingest
client).

I think I can do this using the IndexingOperationsListener; I've
committed some initial code to the git repository:
https://github.com/0x01/degraphmalizer/blob/030be1c7ae35759f1a6e4bbfbde0ea2684b06b37/src/main/java/org/elasticsearch/plugin/degraphmalizer/DegraphmalizerListener.java

Thanks for sharing this. Afaik the indexing operation listener's job is to
catch write/update events inside of a Lucene index within an ES shard
(which is not the scope of a whole index, only a part of it), just as the
translog works. Pretty much low-level stuff. I'm not sure if shard level
operations are the best place for adding denormalized data to documents, I
would prefer the node client level, where you have access to the field
mapping and the cluster state.

Best regards

Jörg

--


(system) #9