Indexing sysstat data from sar and sadf

I am already collecting syslog data via logstash into elasticsearch. My customer now wants sysstat date as well. So, I created a new index and types (using for example field names and types from sar -d -p 1 1)

PUT /monitor/_mapping/disk
{ "disk": {
"properties": {
"event_time": {"type": "date"},
"logsource": {"type": "string"},
"DEV": {"type": "string"},
"avgqu-sz": {"type": "float"},
"avgrq-sz": {"type": "float"},
"await": {"type": "float"},
"%util": {"type": "float"},
"rd_sec/s": {"type": "float"},
"svctm": {"type": "float"},
"tps": {"type": "float"},
"wr_sec/s": {"type": "float"}
}}}

Populating data with curl directly to elasticsearch works...
curl -XPOST http://127.0.0.1:9200/monitor/disk -d '{"logsource":"intense","event_time":"1455723240000","DEV":"sda","tps":"25.82","rd_sec/s":"119.09","wr_sec/s":"4836.90","avgrq-sz":"191.98","avgqu-sz":"1.08","await":"41.82","svctm":"4.14","%util":"10.70"}'

I have a script that runs periodically that parses the output of sadf into json.

I am also forwarding the data via syslog so the message part of the event is the above json prepended with the sar data type and the program field is the index name (monitor)...

monitor disk '{"logsource":"intense","event_time":"1455723240000","DEV":"sda","tps":"25.82","rd_sec/s":"119.09","wr_sec/s":"4836.90","avgrq-sz":"191.98","avgqu-sz":"1.08","await":"41.82","svctm":"4.14","%util":"10.70"}'

I also have a very plain vanilla logstash config file for syslog...

input {
syslog {
type => syslog
port => 5544
facility_labels => ["kernel", "user-level", "mail", "system", "sec/auth", "syslogd", "line printer", "network news", "UUCP", "clock", "sec/auth", "FTP", "NTP", "log audit", "log alert", "clock", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7"]
}
}

output {
if [type] == "syslog" and "_grokparsefailure_sysloginput" in [tags] {
file { path => "/var/log/logstash/syslog_failures-%{+YYYY-MM-dd}" }}
else {
elasticsearch { index => "logstash-%{+YYYY.MM.dd.HH}"}}
}

What I want to do (and don't know how) is to add another conditional to the logstash config that will check for program = monitor and if true do the right thing with the json from the syslog message field. Any help would be greatly appreciated.

filter {
  if [program] == "monitor" {
    json {
      source => "name of syslog field"
    }
  }
}

Thanks a lot. I think I misunderstood the whole filter concept. After I saw your repsonse I redid my solution using the kv filter which also works fine. This makes it possible to encapsulate just about any kind of data within syslog.

Rob