How to create sub indexes for an index in config file of logstash

I have QueueMessage and MessageID as two fields which i defined using csv as shown below.

input {
log4j {
mode => "server"
host => "0.0.0.0"
port => 3456
type => "log4j"
}
}
filter{
csv{
columns=> ["QueueMessage","MessageID"]
separator => ","
}
}
output {
stdout {}
elasticsearch { hosts => ["localhost:9200"] }
}

Example :
QueueMessage="John,A001,$3000"
MessageID= ID:E4EMS-SERVER.1F0578DF8CA60:114

Now my QueueMessage has 3 more sub fields (say Name,PersonID,Salary). How should I change the config file of logstash accordingly to reflect these sub fields in kibana.

Hi @pavana,

I think the best solution at this point is to parse out the remaining values using the Grok filter.

Assuming the QueueMessage has double quotes surrounding the string, the Grok filter would look like this:

 grok {
      match => ["QueueMessage","\"%{DATA:Name},%{DATA:PersonID},\$%{NUMBER:Salary:int}\""]
  }

Note the int keyword after the salary, and also that we are not including the dollar sign in the parsed variable. This way, you will be able to use the salary value as a number in Kibana.

In my test, the output produced was like this:

{
       "message" => "\"John,A001,$3000\"",
      "@version" => "1",
    "@timestamp" => "2016-07-19T14:40:32.103Z",
          "host" => "MacBook-Pro.local",
          "name" => "John",
      "PersonID" => "A001",
        "Salary" => 3000
}

Let me know if this works for you!

If you meant that the filter should be like this then i am not getting the required output.
QueueMessage=John
MessageID=A001
This is what i am getting. Also Name,PersonID and Salary fields are not created in my kibana.

filter{
csv{
columns=> ["QueueMessage","MessageID"]
separator => ","
}
grok {
match => ["QueueMessage",""%{DATA:Name},

%{DATA:PersonID},$%{NUMBER:Salary:int}""]
}
}

@pavana,

Can you please share with me a sample of the source input message?

Here "message" has 4 fields....John, A0001, $5000, ID:E4EMS-SERVER.E18578E50F61B:32
I am retrieving first 3 data values from Queue and 4th one is automatically generated.
Indirectly my "message" is made of only 2 fields "QueueMessage"( which has John, A0001, $5000) and "MessageID" (which is ID:E4EMS-SERVER.E18578E50F61B:32).
The whole point is there are sub fields in a field.

message:John,A0001,$5000,ID:E4EMS-SERVER.E18578E50F61B:32 @version:1 @timestamp:July 19th 2016, 21:54:07.050 timestamp:1468945447050 path:QueueReader priority:DEBUG logger_name:QueueReader thread:main class:? file:?:? method:? application:playground host:127.0.0.1:50357 type:log4j tags:_grokparsefailure QueueMessage:John MessageID:A0001 column3:$5000 column4:ID:E4EMS-SERVER.E18578E50F61B:32 _id:AVYD-Lw07ZpGBU8wlvJY _type:log4j _index:logstash-2016.07.19 _score:

@pavana,

If the original message is always 4 fields, then why bother using the CSV parser? Perhaps just using Grok like this will work?

filter{
 grok { 
 match => ["message","%{DATA:Name}, %{DATA:PersonID}, \$%{NUMBER:Salary:int}, %{DATA:MessageID}"]
 }
}

But this wont work if i want to select the whole message excluding MessageID.
I cant keep on select the fields present in my data right?
MessageID is generated by the system and other 3 fields are sent by an application.
MessageID is required only if there is error in the data sent.
This is my requirement.
Hope u got me. It's just like Message is composite attribute which has Name,PersonID,Salary.
First of all is it possible to include complex attributes and divide them in logstash config file?

@pavana,

I do not understand what you are trying to accomplish. What do you mean by "Select the whole message" ?