Http output plugin json_batch format to Http input plugin


#1

In the documentation there is an alternative to send output through the Http output plugin with the "json_batch" format. It states:

If json_batch, each batch of events received by this output will be placed into a single JSON array and sent in one request. This is particularly useful for high throughput scenarios such as sending data between Logstash instaces.

This is exactly what we want to do, however, we cannot find any example how this could be acchieved in the receiving Logstash instance.

We are sending like so:

http {
    http_compression => true
    http_method => put
    cacert => "../cert/server.pem"
    format => "json_batch"
    content_type => "application/json;charset=UTF-8"
    url => "https://address.xyz"
    headers => ["Authorization", "Basic blaablaablaa"]
}

And receiving like this:

input {
    http {
        port => 8080
        user => blaa
        password => blaablaa
        ssl => true
        keystore => "cert/keystore.jks"
        keystore_password => "blaablaablaa"
    }
}
filter {
    split {
        field => "message"
    }
    json {
        source => "message"
    }
}

The split is probably wrong. We are getting the messages nicely, but we do not know how to process the batch into separate messages for the elasticsearch output plugin.

Thank you for any help that you might provide!

Logstash documentation on json_batch


(Magnus Bäck) #2

So what are you currently getting on the receiving end? Comment out the current filters and add a stdout { codec => rubydebug } output so you can show an example event that you receive.


#3

Hi Magnus,

sorry for the delay and thanks for the help.
Here is an example message that we received with json_batch:

{
          "host" => "123.123.123.123",
       "message" => "[{\"fields\":{\"env\":\"TEST\"},\"els_index\":\"metricbeat\",\"metricset\":{\"name\":\"process\",\"rtt\":13999,\"module\":\"system\"},\"fields.org\":\"organization1\",\"tags\":[\"Metricbeat\",\"beats_input_raw_event\"],\"host\":\"server2\",\"@timestamp\":\"2018-05-02T09:45:31.260Z\",\"beat\":{\"hostname\":\"server2\",\"name\":\"server2_Metricbeat\",\"version\":\"6.1.1\"},\"@version\":\"1\",\"system\":{\"process\":{\"name\":\"java.exe\",\"cmdline\":\"\\\"C:\\\\Program Files\\\\Java\\\\jdk1.8.0_151\\\\jre\\\\bin\\\\java.exe\\\" \\\"-Djdk.home=C:\\\\Program Files\\\\Java\\\\jdk1.8.0_151\\\" \\\"-Djruby.home=D:\\\\Elastic\\\\Logstash\\\\vendor\\\\jruby\\\" \\\"-Djruby.script=jruby\\\" \\\"-Djruby.shell=cmd.exe\\\" \\\"-Djffi.boot.library.path=D:\\\\Elastic\\\\Logstash\\\\vendor\\\\jruby\\\\lib\\\\jni;D:\\\\Elastic\\\\Logstash\\\\vendor\\\\jruby\\\\lib\\\\jni\\\\i386-Windows;D:\\\\Elastic\\\\Logstash\\\\vendor\\\\jruby\\\\lib\\\\jni\\\\x86_64-Windows\\\" \\\"-Xss2048k\\\" \\\"-Dsun.java.command=org.jruby.Main\\\" \\\"-Djava.class.path=\\\" \\\"-Xbootclasspath/a:D:\\\\Elastic\\\\Logstash\\\\vendor\\\\jruby\\\\lib\\\\jruby.jar\\\" \\\"-Xms1g\\\" \\\"-Xmx3g\\\" \\\"-XX:+UseParNewGC\\\" \\\"-XX:+UseConcMarkSweepGC\\\" \\\"-XX:CMSInitiatingOccupancyFraction=75\\\" \\\"-XX:+UseCMSInitiatingOccupancyOnly\\\" \\\"-Djava.awt.headless=true\\\" \\\"-Dfile.encoding=UTF-8\\\" \\\"-Djruby.compile.invokedynamic=true\\\" \\\"-Djruby.jit.threshold=0\\\" \\\"-Djava.security.egd=file:/dev/urandom\\\" org/jruby/Main \\\"D:\\\\Elastic\\\\Logstash\\\\lib\\\\bootstrap\\\\environment.rb\\\" \\\"logstash\\\\runner.rb\\\"\",\"state\":\"running\",\"pid\":4788,\"cpu\":{\"total\":{\"norm\":{\"pct\":0.0156},\"pct\":0.0625},\"start_time\":\"2018-05-02T09:39:22.669Z\"},\"username\":\"NT AUTHORITY\\\\SYSTEM\",\"pgid\":0,\"ppid\":9752,\"memory\":{\"rss\":{\"bytes\":999051264,\"pct\":0.0582},\"size\":1505955840,\"share\":0}}}},
                      {\"fields\":{\"env\":\"TEST\"},\"els_index\":\"metricbeat\",\"metricset\":{\"rtt\":13999,\"name\":\"process\",\"module\":\"system\"},\"fields.org\":\"organization1\",\"tags\":[\"Metricbeat\",\"beats_input_raw_event\"],\"host\":\"server2\",\"@timestamp\":\"2018-05-02T09:45:31.260Z\",\"beat\":{\"hostname\":\"server2\",\"name\":\"server2_Metricbeat\",\"version\":\"6.1.1\"},\"@version\":\"1\",\"system\":{\"process\":{\"name\":\"java.exe\",\"cmdline\":\"\\\"C:\\\\Program Files (x86)\\\\Java\\\\jre7\\\\bin\\\\java\\\" -Dactivemq.home=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\..\\\" -Dactivemq.base=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\..\\\" -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../conf/broker.ks\\\" -Djavax.net.ssl.trustStore=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../conf/broker.ts\\\" -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../conf\\\" -Dactivemq.data=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../data\\\" -Djava.security.auth.login.config=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../conf/login.config\\\" -Xmx1024m -Djava.library.path=\\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../bin/win32\\\" -classpath \\\"C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../bin/wrapper.jar;C:\\\\apache-activemq-5.14.4\\\\bin\\\\win32\\\\..\\\\../bin/activemq.jar\\\" -Dwrapper.key=\\\"dcFChKyx60qgkbTs\\\" -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=1236 -Dwrapper.version=\\\"3.2.3\\\" -Dwrapper.native_library=\\\"wrapper\\\" -Dwrapper.service=\\\"TRUE\\\" -Dwrapper.cpu.timeout=\\\"10\\\" -Dwrapper.jvmid=1 org.tanukisoftware.wrapper.WrapperSimpleApp org.apache.activemq.console.Main start\",\"cpu\":{\"total\":{\"norm\":{\"pct\":0},\"pct\":0},\"start_time\":\"2018-04-22T02:17:57.323Z\"},\"pid\":1380,\"username\":\"NT AUTHORITY\\\\SYSTEM\",\"state\":\"running\",\"pgid\":0,\"ppid\":1236,\"memory\":{\"rss\":{\"bytes\":773529600,\"pct\":0.045},\"size\":795480064,\"share\":0}}}}
                     ]",
      "@version" => "1",
       "headers" => {
              "request_method" => "PUT",
                 "request_uri" => "/organization1/test",
                "content_type" => "application/json;charset=UTF-8",
             "http_connection" => "Keep-Alive",
          "http_authorization" => "Basic blaablaablaa",
              "content_length" => "3698",
                       "https" => "https",
                "request_path" => "/organization1/test",
             "http_user_agent" => "Manticore 0.6.1",
        "http_accept_encoding" => "gzip,deflate",
                "http_version" => "HTTP/1.1",
                   "http_host" => "address.xyz:8080",
                 "remote_user" => "user"
    },
          "tags" => [
        [0] "_jsonparsefailure"
    ],
    "@timestamp" => 2018-05-02T09:46:01.427Z
}

They obviously have a jsonparsefailure. Unlike the regular json messages, these have the "message" as an array of json objects instead of a single json object. Somehow, this array would need to be split, but this config is not doing it right at the moment:

split {
        field => "message"
    }

(Magnus Bäck) #4

Since message contains an array you need to set the json filter's target option to the name of the field where you want to store the array. Then you can point the split filter to that field (and the split filter obviously needs to come after the json filter in the configuration file).


#5

Thank you for the reply.

The solution you proposed seems to work otherwise fine, but there is still one hurdle.

This JSON filter splits to the root level (top level) because target is not defined. This is the desired functionality in our case, because we want to unpack the messages from the "message" field into regular messages.

json {
    source => "message"
}

This split filter does not target the root level, instead the message data will remain inside the message element.

split {
        field => "message"
}

How could we split the field to the root level?


(Magnus Bäck) #6

I don't think the split filter ever stores field at the root level. If that's where you want the fields to turn up you may have to move then there with mutate filter (or a ruby filter if you don't want to list each field).


#7

Hmm, ok, I tried my luck with a Ruby filter like so:

json {
    source => "message"
    target => "messageArray"
}
split {
    field => "messageArray"
}
ruby {
    code => '
        if (event.get("messageArray"))
            event.get("messageArray").each { |k,v|
                event.set(k,v);
            }
        end
    '
}

However, this runs into trouble with the timestamps I assume:

[2018-05-02T17:39:31,370][ERROR][logstash.filters.ruby    ] Ruby exception occurred: wrong argument type String (expected LogStash::Timestamp)

Any idea on fixing this issue?


#9
date { match => [ "[messageArray][@timestamp]", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" ] target => "[messageArray][@timestamp]" }

will fix that.


#10

Thank you Badger. Your idea was correct, but there is something wrong with the format pattern. I had to replace it with "ISO8601" to get it to work.

Now everything seems to be working and the messages are flowing through HTTP in gzip JSON packages.
Here is the final configuration for documentation purposes.
Sending conf:

output {
    http {
        http_compression => true
        http_method => put
        cacert => "../cert/server.pem"
        format => "json_batch"
        content_type => "application/json;charset=UTF-8"
        url => "https://address.xyz:8080/organization1/env"
        headers => ["Authorization", "Basic blaablaablaa"]
    }
}

Receiving conf:

input {
    http {
        port => 8080
        user => user
        password => password
        ssl => true
        keystore => "/path/to/keystore.jks"
        keystore_password => "keystore_password"
    }
}
filter {
    json {
        source => "message"
        target => "messageArray"
    }
    split {
        field => "messageArray"
    }
    date {
        match => [ "[messageArray][@timestamp]", "ISO8601" ]
        target => "[messageArray][@timestamp]"
    }
    ruby {
        code => '
            if (event.get("messageArray"))
                event.get("messageArray").each { |k,v|
                    event.set(k,v);
                }
            end
        '
    }
    mutate {
        remove_field => [ "messageArray" ]
    }
}

(system) #11

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.