[SOLVED] Chaining Pipelines Together

Hey All,

Has anyone tried to chain multiple pipelines together on a single host? I'm trying to send from http->A->B->C->logfile. Connecting the pipelines together via the tcp input/output plugins.

I have debug level logs turned on and see that pipeline A receives the input on http:8080 and sends out tcp:8081 but then nothing after that. Pipeline B is listening on tcp:8081 and will send out tcp:8082 and so on.

Debug Logs:

[2017-12-20T19:38:13,635][DEBUG][logstash.pipeline        ] filter received {"event"=>{"start_time"=>"1483228800", "network"=>"network_a", "duration"=>60, "node"=>"node_a", "severity"=>"critical", "@timestamp"=>2017-12-20T19:38:13.613Z, "creation_time"=>1513798693, "headers"=>{"http_connection"=>"TE, close", "content_length"=>"206", "request_uri"=>"/", "http_user_agent"=>"libwww-perl/6.05", "http_te"=>"deflate,gzip;q=0.3", "request_path"=>"/", "content_type"=>"application/json", "http_host"=>"127.0.0.1:8080", "http_version"=>"HTTP/1.1", "request_method"=>"PUT"}, "end_time"=>"1491004800", "device"=>"device_0", "@version"=>"1", "description"=>"The description_0", "host"=>"127.0.0.1"}}
[2017-12-20T19:38:13,644][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Mutate: adding value to field {"field"=>"rule_matched", "value"=>["NONE"]}
[2017-12-20T19:38:13,648][DEBUG][logstash.pipeline        ] output received {"event"=>{"start_time"=>"1483228800", "network"=>"network_a", "duration"=>60, "node"=>"node_a", "severity"=>"critical", "@timestamp"=>2017-12-20T19:38:13.613Z, "creation_time"=>1513798693, "headers"=>{"http_connection"=>"TE, close", "content_length"=>"206", "request_uri"=>"/", "http_user_agent"=>"libwww-perl/6.05", "http_te"=>"deflate,gzip;q=0.3", "request_path"=>"/", "content_type"=>"application/json", "http_host"=>"127.0.0.1:8080", "http_version"=>"HTTP/1.1", "request_method"=>"PUT"}, "end_time"=>"1491004800", "rule_matched"=>"NONE", "device"=>"device_0", "@version"=>"1", "description"=>"The description_0", "host"=>"127.0.0.1"}}
[2017-12-20T19:38:13,663][DEBUG][logstash.outputs.tcp     ] Opened connection {:client=>"127.0.0.1:8081"}
[2017-12-20T19:38:13,678][DEBUG][logstash.codecs.line     ] config LogStash::Codecs::Line/@id = "line_1300f797-5be3-4bda-b825-3f847bd821c1"
[2017-12-20T19:38:13,678][DEBUG][logstash.codecs.line     ] config LogStash::Codecs::Line/@enable_metric = true
[2017-12-20T19:38:13,679][DEBUG][logstash.codecs.line     ] config LogStash::Codecs::Line/@charset = "UTF-8"
[2017-12-20T19:38:13,679][DEBUG][logstash.codecs.line     ] config LogStash::Codecs::Line/@delimiter = "\n"
[2017-12-20T19:38:13,687][DEBUG][io.netty.util.Recycler   ] -Dio.netty.recycler.maxCapacity: 262144
[2017-12-20T19:38:13,687][DEBUG][io.netty.util.Recycler   ] -Dio.netty.recycler.maxSharedCapacityFactor: 2
[2017-12-20T19:38:13,687][DEBUG][io.netty.util.Recycler   ] -Dio.netty.recycler.linkCapacity: 16
[2017-12-20T19:38:13,704][DEBUG][io.netty.buffer.AbstractByteBuf] -Dio.netty.buffer.bytebuf.checkAccessible: true
[2017-12-20T19:38:13,707][DEBUG][io.netty.util.ResourceLeakDetectorFactory] Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@4f57aac0

Configs:

What if you add a stdout to each section to see what is happening? Also debug logging may help.

I'll try adding the stdout to the sections. Debug logging is currently enabled, I've included the output in the post above.

Ok I got a little bit closer by applying the json_lines codec to the output block of the first pipeline. Now I see the event received and forwarded by the second pipeline. I can get the event forwarded through all the pipelines by setting this codec but it looks like the json gets escaped each time this happens.

So it seems like I have to explicitly set some codec to get this to work correctly. Is there a codec I can use to ouput just the decorated event to the next pipeline for processing? I've included the debug logs of the event going through two pipelines below.

Thanks for your help!

[2017-12-21T15:05:03,122][DEBUG][logstash.pipeline        ] filter received {"event"=>{"end_time"=>"1491004800", "description"=>"The description_0", "headers"=>{"http_connection"=>"TE, close", "http_user_agent"=>"libwww-perl/6.05", "request_method"=>"PUT", "request_path"=>"/", "request_uri"=>"/", "http_host"=>"127.0.0.1:8080", "content_length"=>"206", "http_version"=>"HTTP/1.1", "http_te"=>"deflate,gzip;q=0.3", "content_type"=>"application/json"}, "network"=>"network_a", "start_time"=>"1483228800", "creation_time"=>1513868703, "host"=>"127.0.0.1", "node"=>"node_a", "@timestamp"=>2017-12-21T15:05:03.111Z, "severity"=>"critical", "duration"=>60, "device"=>"device_0", "@version"=>"1"}}
[2017-12-21T15:05:03,122][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Mutate: adding value to field {"field"=>"rule_matched", "value"=>["NONE"]}
[2017-12-21T15:05:03,123][DEBUG][logstash.pipeline        ] output received {"event"=>{"end_time"=>"1491004800", "description"=>"The description_0", "headers"=>{"http_connection"=>"TE, close", "http_user_agent"=>"libwww-perl/6.05", "request_method"=>"PUT", "request_path"=>"/", "request_uri"=>"/", "http_host"=>"127.0.0.1:8080", "content_length"=>"206", "http_version"=>"HTTP/1.1", "http_te"=>"deflate,gzip;q=0.3", "content_type"=>"application/json"}, "network"=>"network_a", "start_time"=>"1483228800", "creation_time"=>1513868703, "rule_matched"=>"NONE", "host"=>"127.0.0.1", "node"=>"node_a", "@timestamp"=>2017-12-21T15:05:03.111Z, "severity"=>"critical", "duration"=>60, "device"=>"device_0", "@version"=>"1"}}
[2017-12-21T15:05:03,135][DEBUG][logstash.pipeline        ] filter received {"event"=>{"message"=>"{\"end_time\":\"1491004800\",\"description\":\"The description_0\",\"headers\":{\"http_connection\":\"TE, close\",\"http_user_agent\":\"libwww-perl/6.05\",\"request_method\":\"PUT\",\"request_path\":\"/\",\"request_uri\":\"/\",\"http_host\":\"127.0.0.1:8080\",\"content_length\":\"206\",\"http_version\":\"HTTP/1.1\",\"http_te\":\"deflate,gzip;q=0.3\",\"content_type\":\"application/json\"},\"network\":\"network_a\",\"start_time\":\"1483228800\",\"creation_time\":1513868703,\"rule_matched\":\"NONE\",\"host\":\"127.0.0.1\",\"node\":\"node_a\",\"@timestamp\":\"2017-12-21T15:05:03.111Z\",\"severity\":\"critical\",\"duration\":60,\"device\":\"device_0\",\"@version\":\"1\"}", "@timestamp"=>2017-12-21T15:05:03.124Z, "host"=>"localhost", "@metdata"=>{"ip_address"=>"127.0.0.1"}, "port"=>44098, "@version"=>"1"}}
[2017-12-21T15:05:03,135][DEBUG][logstash.pipeline        ] output received {"event"=>{"message"=>"{\"end_time\":\"1491004800\",\"description\":\"The description_0\",\"headers\":{\"http_connection\":\"TE, close\",\"http_user_agent\":\"libwww-perl/6.05\",\"request_method\":\"PUT\",\"request_path\":\"/\",\"request_uri\":\"/\",\"http_host\":\"127.0.0.1:8080\",\"content_length\":\"206\",\"http_version\":\"HTTP/1.1\",\"http_te\":\"deflate,gzip;q=0.3\",\"content_type\":\"application/json\"},\"network\":\"network_a\",\"start_time\":\"1483228800\",\"creation_time\":1513868703,\"rule_matched\":\"NONE\",\"host\":\"127.0.0.1\",\"node\":\"node_a\",\"@timestamp\":\"2017-12-21T15:05:03.111Z\",\"severity\":\"critical\",\"duration\":60,\"device\":\"device_0\",\"@version\":\"1\"}", "@timestamp"=>2017-12-21T15:05:03.124Z, "host"=>"localhost", "@metdata"=>{"ip_address"=>"127.0.0.1"}, "port"=>44098, "@version"=>"1"}}

And after more trial and error I found that setting json_lines as the codec in both the input and output blocks allows this to work the way I would expect. Although I'm not entirely sure why that is needed.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.