How to process event multiple times

Summary:
In our monitoring solution we have events that are reported from CI evenironment to monitoring server using http.
Each event represents build event and contains some info about builds and, what is important, start and end dates.
In our logstash pipeline we want to report this events in two different manners:

  1. Input event as is to elasticsearch
  2. Split event using its start and end dateTimes to multiple (interval 10 sec for example)

To achieve this goal we implemented our own filter plugin to split event using some simple logics and data from start and end dateTime fields, plugin works fine, but we've encountered a problem because we need to process same event twice - first time to send it to ES as is, second to split it and send to ES to another index.

To resolve this during filtering on first time we add tag indicating that it is original_event and sending it to both ES and logstash itself for next time processing, and then we detect this tag and if it is present, we split event and send it to another index.

Details:
This is how our pipline looks like:

input { 
	http {
		port=> "9400"
		threads=> 4
		type=> "teamcity-builds"
	}
	http {
		port=> "9401"
		threads=> 4
		type=> "teamcity-builds-split"
	}
}
filter{
	prune {
		remove_field => ["[testrunner][testrun.user.password]","[testrunner][testrun.user.name]","headers"]
	}
	if "original_event" in [tags]{
		ruby{
			code=>'event.set("IsMaster",event.get("[buildType][id]") != event.get("[testrunner][testrun.child.build.id]"))'
		}
		grok{
			match=>{ "[testrunner][testrun.statistics.team.name]" => "(?<TeamName>^[^_]+)"}
		}
		ruby{
			code=> 'team = event.get("TeamName"); team = team=="TestAutomation" ? "Casa":team; team = team=="AutomationFramework"? "AA": team; event.set("TeamName",team)'
		}
		mutate {
			replace => { "type" => "teamcity-build-splits" }
			remove_tag => [ "original_event" ]
			remove_field => [ "testrunner" ]
		}
		date{
			id=>"start_date_filter"
			match=>[ "[build][started]", "ISO8601" ]
			target=>"[build][started]"
		}
		date{
			id=>"end_date_filter"
			match=>[ "[build][finished]", "ISO8601" ]
			target=>"[build][finished]"
		}
		splitter{
			startDate=>"[build][started]"
			endDate=>"[build][finished]"
			interval=> 10
		}
		mutate {
			remove_field => [ "build" ]
		}
	} else if "original_event" not in [tags]{
		mutate {
			add_tag => [ "original_event"]
		}
	}

}
output {
	if "original_event" in [tags]{
		elasticsearch {
			hosts => ["http://localhost:9200/"]
			index => "testrunner-%{+YYYY-MM-dd}"
		}
		http {
			url=> "http://localhost:9401/"
			http_method => "post"
			codec=> json
		}
	} else {
		elasticsearch {
			hosts => ["http://localhost:9200/"]
			index => "testrunner_split-%{+YYYY-MM-dd}"
		}
	}
}

I have a feeling that we do it in very wrong way, but not sure where and how to alter it to achieve our goal, and this is where we are getting to the point of this topic.

Problem:
After some random amount of time (few hours - few weeks - few months sometime) logstash stops processing of further requests with foloowing error:
[2017-10-20T10:02:46,991][ERROR][logstash.outputs.http] [HTTP Output Failure] Could not fetch URL {:url=>"http://localhost:9401/", :method=>:post, :body=>"{someData ... "tags\":[\"original_event\"]}", :headers=>{"Content-Type"=>"application/json"}, :message=>"Read timed out", :class=>"Manticore::SocketTimeout", :backtrace=>nil, :will_retry=>true}

When logstash first hits it, it stops further processing at all and just spamms such errors for all futher requests and we lose our data until we restart it manually... If it would crash, nssm (we use it to host it as a service) would've restarted it automatically, but it doesn't crash.

Environment:
We use latest recommended version for today: 5.6.1.
We host logstash on windows machine.
ElasticSearch is hosted on same machine but on another port.
Logstash is launched using nssm as service.

Please advice on this topic as we are out of ideas, we scoped through numerous topics that were looking similar to ours without any results.

Any ideas will be much appreciated. Thanks!

Guys please advice, no progress on this one :frowning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.