How to send a json object to elasticsearch throw logstash

I am creating "bill" feature in my nodejs application that basically will save in Elasticsearch the username every time any user access any rest service.
I have struggling for the last two days with no success. It is my first time working with ELK.
Here is my last tentative.

step 1)
I start the logstash with ./logstash-5.2.2/bin/logstash -f "logstash.conf"
logstash.conf contains:

input {
tcp {
port => 5000
type => document_type
}
}

filter {
grok {
match => { "message" => "data=%{GREEDYDATA:request}"}
}
json{
source => "request"
target => "parsedJson"
remove_field=>["message"]
}
mutate {
add_field => {
"firstname" => "%{[parsedJson][firstname]}}"
"surname" => "%{[parsedJson][surname]}}"
}
}
kv {
source => "message"
remove_field => ["message"]
}
}

output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "my_index_previously_mapped"
}
}

step 2)
nodejs sends the json object to logstash

var Logstash = require('logstash-client');
var logstash = new Logstash({
type: tcp,
host: localhost,
port: 5000
});

var user = {
firstname: req.body.username,
surname: req.body.surname
};

logstash.send(user);

step 3)
I was expecting to get "two columns", firstname and surname, the datetime from server when it was saved and not get message "column" which seems to me to be redudant (pointless in my scenario). The next step would be to agregate and filter something like how many access for certain user from this datetime to that datetime. But I am facing several issues naturally because I am dummy on ELK.

PS. Obviously, I am going to add few more info when finished but in order to keep things simple, lets say just log firstname and surname are enough.

First issue: why message "column" is still there?

Second issue: why firstname "column" just contain a static string "%{[parsedJson][surname]}}" (surname as well)?

Third issue: why @timestamp is always 3 hours ahead of my system datetime?

Fourth issue: I understand that I have to create a mapping if I want to aggregate. Why the bellow command is trying to re-create the index?

curl -XPUT 'http://localhost:9200/my_index_previously_mapped/' -d '

{
"mappings" : {
"my_document_type" : {
"properties" : {
"firstname" : { "type" : "text" } }
}
}
}'
{"error":{"root_cause":[{"type":"index_already_exists_exception","reason":"index [my_index_previously_mapped/_GiiT8JGSruBt9ytm8L6zQ] already exists","index_uuid":"_GiiT8JGSruBt9ytm8L6zQ","index":"my_index_previously_mapped"}],"type":"index_already_exists_exception","reason":"index [my_index_previously_mapped/_GiiT8JGSruBt9ytm8L6zQ] already exists","index_uuid":"_GiiT8JGSruBt9ytm8L6zQ","index":"my_index_previously_mapped"},"status":400}

*** Added in March 22 2017 at 11am UTC -3
I started the logstash with

input {
tcp {
port => 5000
type => document_type
}
}

output {
stdout { codec => rubydebug }
}

then I got:

{
"@timestamp" => 2017-03-22T13:43:01.443Z,
"port" => 58794,
"@version" => "1",
"host" => "127.0.0.1",
"message" => "{"firstname":"a","surname":"a"}",
"type" => "document_type"
}

As a previous SQL and NoSql (mongodb) user, my intention is to get three "columns" in ElastiSearch. One for timestamp, another for firstname and other one for surname. Then I can search aggregating how many access were done by certain user in certain period or, just as didactic example, let's say query how many users with firstname john are in Elasticsearch ignoring the surname. If I can learrn how to achieve these two tasks it will probably be a large step forward.

*** Added March 22 at 12am UTC -3

{
"@timestamp" => 2017-03-22T14:56:53.064Z,
"port" => 33666,
"@version" => "1",
"host" => "127.0.0.1",
"message" => "{"firstname":"a","surname":"a"}",
"type" => "document_type",
"tags" => [
[0] "_grokparsefailure"
]
}
is the result of

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} \ %{GREEDYDATA:msg}" }
}
}

*** Added in Mar 22 at 12:10 PM UTC -3
my filter is:

filter {
json {
source => "message"
}
}

and the output is:
{
"firstname" => "a",
"@timestamp" => 2017-03-22T15:04:29.108Z,
"port" => 34102,
"surname" => "a",
"@version" => "1",
"host" => "127.0.0.1",
"message" => "{"firstname":"a","surname":"a"}",
"type" => "document_type"
}

I successfully search using:

{"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2017-03-22","lte":"2017-03-22"}}}],"must_not":,"should":}},"from":0,"size":10,"sort":,"aggs":{}}

Nevertheless, I get such error while trying to aggregate:

... Fielddata is disabled on text fields by default. Set fielddata=true on [firstname] in order to load fielddata in memory by uninverting the inverted index...

Then, I tried:

curl -XPUT 'http://localhost:9200/greencard_indice/_mapping/cpfTipo

{
"properties": {
"firstname": {
"type": "text",
"fielddata": true
}
}
}'
curl: (3) [globbing] nested brace in column 75

and I am still getting the same error while aggregating.

Remove codec => "json" from your elasticsearch output.

Please show exactly what you're posting to Logstash. Suggestion: Comment out all your filters and outputs and add a stdout { codec => rubydebug } output that'll dump the events to one of the log files. Then we can start adding back filters.

Second issue: why firstname "column" just contain a static string "%{[parsedJson][surname]}}" (surname as well)?

That indicates that there was no [parsedJson][surname] field so the string was taken literally. This indicates that the json filter failed, perhaps because the grok filter failed. This is exactly why I advocate a gradual build-up of filters.

Third issue: why @timestamp is always 3 hours ahead of my system datetime?

Perhaps because your local timezone is UTC-3? @timestamp is always UTC.

Fourth issue: I understand that I have to create a mapping if I want to aggregate. Why the bellow command is trying to re-create the index?

A PUT operation always tries to create an index. To update the mappings of an existing index, see Update mapping API | Elasticsearch Guide [8.11] | Elastic.

I edited my question adding the console output. Basically , it is:
{
"@timestamp" => 2017-03-22T13:43:01.443Z,
"port" => 58794,
"@version" => "1",
"host" => "127.0.0.1",
"message" => "{"firstname":"a","surname":"a"}",
"type" => "document_type"
}

message only contains a JSON object so your grok filter won't work. Because of that no request field is extracted so your json filter fails, and because of that your mutate filter fails.

Magnusbaeck, thank you, But, how to achieve what I want? I mean, I want three columns: timestamp, firstname and surname. I guess that the most relevant part from my question could be translate as "how to split the json message in order to persist each field". Is my mistake either on nodejs side or elasticsearch side? I guess not. I guess it is some error in logstash.conf related to filter somehow.

Just drop the grok filter and parse the message field. You used the grok filter to strip a "data=" prefix but it didn't actually exist.

please, can you provide any example? I just added what I tried on my original question.

json {
  source => "message"
}

Thanks. What is wrong with:

curl -XPUT 'http://localhost:9200/greencard_indice/_mapping/cpfTipo

{
"properties": {
"firstname": {
"type": "text",
"fielddata": true
}
}
}'
curl: (3) [globbing] nested brace in column 75

Basically, I want to aggregate/sum/group by (I want to know how many users with same firstname)

The JSON stuff is supposed to be posted in the body of the request, i.e. as an argument to a -d option to curl. See examples in the docs.

Sorry, I didn't see such silly error with curl. Now it is working. You gave wonderful help. Please, a last and the most important part from my question, can you comment on such design: it is my first time using ELK. And after I read a lot some user cases, I want to use ELK for "bill" feature. I mean, every time an user reach any rest service, I will log from NodeJs to ElasticSearch throw LogStash basically his identification, time and the service involved.
Are these assumptions good "rule of thumb"?
1 - I decided not to log straight from NodeJs to ElasticSearch mainly beacuse, if I understood correctly, LogStash is not "thread-block". I mean, LogStash provides similar advantage as we get when we use asynchronous message queue to log. Additionally, Logstash will make my life easier to manage in/output, filter, re-process and so on the logs.
2 - I decided not to rely on Kibana for create the visualizations per user for create the invoice because it seems to me that Kibana will fit better analytic studies for business decision since it shows graphs, dashboard and tables easily.

I decided not to log straight from NodeJs to Elasticsearch mainly beacuse, if I understood correctly, LogStash is not "thread-block". I mean, LogStash provides similar advantage as we get when we use asynchronous message queue to log.

Logstash has a very limited internal queue so you shouldn't rely on it to not block. I generally recommend having applications log to disk that Filebeat can tail and ship to Logstash. Then the logfile itself because the buffer.

I decided not to rely on Kibana for create the visualizations per user for create the invoice because it seems to me that Kibana will fit better analytic studies for business decision since it shows graphs, dashboard and tables easily.

I don't know what kind of visualization you're after but yes, showing invoices isn't something Kibana excels at.

Does it make sense for invoice control based on service access (my focus is only related to performance, not business at all):
1 - create an index with three fields: user_identification (text), access_time (timestamp), accessed_service (text)
2 - log in files via logstash all three fields accordingly to the access
3 - extract periodically from such files to ElasticSearch (using Filebeat for instance)
4 - (the main part of my question) create a mapping to the user_identification so I can aggregate it and generate the user invoice

PS. I read:

"Fielddata can consume a lot of heap space, especially when loading high cardinality text fields. Once fielddata has been loaded into the heap, it remains there for the lifetime of the segment. Also, loading fielddata is an expensive process which can cause users to experience latency hits. This is why fielddata is disabled by default"
in https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html
which drive me to the idea that I am doing something wrong in my step 4.

Your approach is reasonable.

which drive me to the idea that I am doing something wrong in my step 4.

No, you're not doing anything wrong.

Thanks. Wonderful help! I still have a lot of new doubts but I need to study, try bymyself before start a new question on this forum. Now, at least, I am able to do the basic operations with Logstah+ElasticSearch and I am filling I have a good north.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.