How to mapping a json field when using logstash?

Hi everybody!!!

I have a postgres table that have a json field. it looks like this:

customer_id ==> integer
categories ==> json ==> please note that this field is type json

and the table have some records like this:
1 [{"first_level":359,"second_level":null}]
2 [{"first_level":62,"second_level":null}]
3 null
4 [{"first_level":585,"second_level":[1559,2445]},{"first_level":987,"second_level":[2}]
5 [{"first_level":592,"second_level":[20521]},{"first_level":335,"second_level":null}]

now I want to load this data in an elasticsearch index. to do this I have created a mapping like this:
POST index_to_test/
{
"settings": {
"number_of_shards": 3
},
"mappings": {
"docu": {
"properties": {
"customer_id": {
"type": "integer"
},
"categories": {
"type": "nested",
"properties": {
"firs_level": {
"type": "integer"
},
"second_level": {
"type": "integer"
}
}
}
}
}
}
}

I will need the field categories as "nested" type, because I will need to search by separate first_level and second_level

the config logstash looks like this:
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://dev:5432/database"
jdbc_user => "myuser"
jdbc_password => "mypassword"
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/elasticsearch/lib/postgresql-9.4.1208.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * from table
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
type => "docu"
}
}

output {
elasticsearch{
index => "index_to_test"
document_id => "%{customer_id}"
}
}

When I execute the process to load the data, the mapping has been changed for the categories field. They has created:
"categories": {
"type": "nested",
"properties": {
"first_level": {
"type": "long"
},
"second_level": {
"type": "long"
},
"type": {
"type": "string"
},
"value": {
"type": "string"
}
}
},

and the document look like this:
"customer_id": "1",
"categories": {
"type": "json",
"value": "[{"first_level":26,"second_level":[342]},{"first_level":826,"second_level":null}]"
},
"@version": "1",
"@timestamp": "2016-04-18T08:02:06.243Z",
"type": "docu"
}

So, may anyone tell me how I have to mapping this field in order to receive it as a nested field?

In advance thanks a lot for your suppor

Jorge von Rudno

Use a json filter to parse the [categories][value] field.

Hi Magnus, Thanks a lot for your replay.
I have reviewed the settings for the plugins filter json (https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html) and unfortunately I don't understand very well the way to do it. Perhaps could you be a little more specific or perhaps do you have an example to illustrate the filter.

In advance many thanks for your help.

Regards.

Jorge von Rudno

Example:

filter {
  json {
    source => "[categories][value]"
  }
}

You may want to add a target option to store the parsed contents under the categories field, and probably also a remove_field option to remove the [categories][value] field that you're probable no longer interested in.

Hi Magnus!!

Thanks for your quickly reply!!
I am so sorry, but I am new in logstash and I can't understand very well how it works. Perhaps can you recommend me some documentation?.
I am using your last post to create the filter but I am confuse because you define the source with a value, but it is dynamic for every record (document)?.
2) if I understand well, I should parse the json object to other type. for which one? How can I do this?

Perhaps if you give me an example, it can help me a lot of!!

Thanks a lot for your patience!!

Regards

Jorge

I am so sorry, but I am new in logstash and I can't understand very well how it works. Perhaps can you recommend me some documentation?.

Have you read what's available on elastic.co?

I am using your last post to create the filter but I am confuse because you define the source with a value, but it is dynamic for every record (document)?.

The source option contains the name of the field that should be parsed. As documented, subfields are accessed via the [field][subfield] notation.

if I understand well, I should parse the json object to other type. for which one? How can I do this?

I don't understand this question.

Perhaps if you give me an example, it can help me a lot of!!

I have given you an example that I believe should work or at least be very close to what you want. Did you try it?

Hi magnus, please give a hand. I trying to following your suggestions, but I get an error: This is my config file:

input {
jdbc {
}
}

filter {
json {
source => "[categories]"
add_field => {"{categories_by_level}" => source => "[categories]"}
remove_field => [ "%{categories}" ]
}
}

output {
elasticsearch{
index => "customers_v1"
document_type => "myType"
document_id => "%{myId}"
}
}

The error say:
Error: Expected one of #, {, } at line 23, column 57 (byte 700) after filter {

Thanks a lot for all your help!!!

Regards

Jorge

Hi Magnus, I have changed a little bit the file and it solved the last problem. Now my filter looks like this:

filter {
json {
source => "categories"
target => "categories_by_level"
remove_field => ["categories_by_level"]
}

but now I have expected in the document a field "categories_by_level" and I get the same field "categories" and the data still as a string not as a object (nested field)

Any suggestions?

Thanks a lot for your patience

Regards.

Jorge

In addition when I run logstash I get the following error message:

Error parsing json {:source=>"categories", :raw=>#Java::OrgPostgresqlUtil::PGobject:0x14cae44b, :exception=>java.lang.ClassCastException: org.jruby.java.proxies.ConcreteJavaProxy cannot be cast to org.jruby.RubyIO, :level=>:warn}

filter {
json {
source => "categories"
target => "categories_by_level"
remove_field => ["categories_by_level"]
}

This is backwards. The field you want to remove after the filter is done is categories, not categories_by_level.

Error parsing json {:source=>"categories", :raw=>#, :exception=>java.lang.ClassCastException: org.jruby.java.proxies.ConcreteJavaProxy cannot be cast to org.jruby.RubyIO, :level=>:warn}

I don't think I've seen this before. What does your JSON look like?

Good Morning Magnus,

My config file in logstash looks like this:

input {
jdbc {
jdbc_connection_string => "mystringconection"
jdbc_user => "myuser"
jdbc_password => "mypassword"
jdbc_validate_connection => true
jdbc_driver_library => "mypath"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * FROM customer WHERE customerno = '1'"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}

filter {
mutate {
add_field => {"[@metadata][index_type]" => "%{index_type}"}
remove_field => ["index_type"]
}

json {
source => "categories_by_level"
target => "categories_by_level_json"
remove_field => ["categories_by_level"]
}
}

output {
elasticsearch{
index => "customers_v1"
document_type => "%{[@metadata][index_type]}"
document_id => "%{customerno}"
}
}

The fied categories_by_level that come from postgres look like this: (this field in table database is type json):

[{"first_level":592,"second_level":[20521]},{"first_level":335,"second_level":null},{"first_level":380,"second_level":null},{"first_level":661,"second_level":null},{"first_level":391,"second_level":[662,20277]}]

The error message when I execute logstash is:

Error parsing json {:source=>"categories_by_level", :raw=>#Java::OrgPostgresqlUtil::PGobject:0x53b3a282, :exception=>java.lang.ClassCastException: org.jruby.java.proxies.ConcreteJavaProxy cannot be cast to org.jruby.RubyIO, :level=>:warn}

Best regards

Jorge von Rudno

Hi magnusbaeck!!

In your first replay you suggest me to use a json filter to parse the [categories][value] field. I am include the filter json, some thing like this:

filter {
json {
source => "[categories][value]"
target => "categories_nested"
remove_field => ["categories"]
}
}

and now I get this error:
Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {"exception"=>#<NoMethodError: undefined method []' for #<Java::OrgPostgresqlUtil::PGobject:0x3bc10a4>>, "backtrace"=>["/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.2-java/lib/logstash/util/accessors.rb:56:inget'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.2-java/lib/logstash/event.rb:122:in []'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-json-2.0.6/lib/logstash/filters/json.rb:69:infilter'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/filters/base.rb:151:in multi_filter'", "org/jruby/RubyArray.java:1613:ineach'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/filters/base.rb:148:in multi_filter'", "(eval):41:infilter_func'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:267:in filter_batch'", "org/jruby/RubyArray.java:1613:ineach'", "org/jruby/RubyEnumerable.java:852:in inject'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:265:infilter_batch'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:223:in worker_loop'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:201:instart_workers'"], :level=>:error}
NoMethodError: undefined method `[]' for #Java::OrgPostgresqlUtil::PGobject:0x3bc10a4
get at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.2-java/lib/logstash/util/accessors.rb:56
[] at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.2-java/lib/logstash/event.rb:122
filter at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-json-2.0.6/lib/logstash/filters/json.rb:69
multi_filter at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/filters/base.rb:151
each at org/jruby/RubyArray.java:1613
multi_filter at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/filters/base.rb:148
filter_func at (eval):41
filter_batch at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:267
each at org/jruby/RubyArray.java:1613
inject at org/jruby/RubyEnumerable.java:852
filter_batch at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:265
worker_loop at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:223
start_workers at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.2-java/lib/logstash/pipeline.rb:201

Regards

Jorge

1 Like