Hi, I'm facing an issue when trying to run this logstash configuration that uses http_poller to run a query and then compare the results from it with a local data table on my pc.
Here's the output and the ERROR in it:
[2024-05-21T16:03:45,523][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:logstash, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "(" at line 93, column 28 (byte 2170) after output {\r\n if [new_entries] == true ", :backtrace=>["C:/elastic-stack/logstash-8.13.2/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:239:in `initialize'", "org/logstash/execution/AbstractPipelineExt.java:173:in `initialize'", "C:/elastic-stack/logstash-8.13.2/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "org/jruby/RubyClass.java:931:in `new'", "C:/elastic-stack/logstash-8.13.2/logstash-core/lib/logstash/pipeline_action/create.rb:49:in `execute'", "C:/elastic-stack/logstash-8.13.2/logstash-core/lib/logstash/agent.rb:386:in `block in converge_state'"]}
Here is my logstash config:
input {
http_poller {
urls => {
es_data => {
method => get
url => "http://localhost:9200/earthquake-v2/_search"
headers => {
Accept => "application/json"
}
body => '{
"sort": [
{
"DateTime": {
"order": "desc"
}
}
],
"query": {
"match_all": {}
},
"size": 1
}'
}
}
request_timeout => 60
schedule => { cron => "* * * * * UTC" }
codec => "json"
metadata_target => "http_poller_metadata"
}
file {
path => "C:/elastic-stack/earthquakes.txt"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
if [path] {
mutate {
add_field => {
"message" => "%{message}"
}
}
dissect {
mapping => {
"message" => "%{DateTime},%{Latitude},%{Longitude},%{Depth},%{Magnitude},%{MagType},%{NbStations},%{Gap},%{Distance},%{RMS},%{Source},%{EventID}"
}
}
date {
match => ["DateTime", "yyyy-MM-dd HH:mm:ss"]
}
}
if [http_poller_metadata] {
aggregate {
task_id => "es_data"
code => "map['es_datetime'] ||= event.get('[hits][hits][0][_source][DateTime]')"
map_action => "create"
}
}
if [DateTime] {
aggregate {
task_id => "local_data"
code => "
datetimes = map['local_datetimes'] ||= []
datetimes << event.get('DateTime')
"
map_action => "update"
}
}
if [path] and [@metadata][end_of_file] {
aggregate {
task_id => "local_data"
code => "
es_datetime = execution.get('es_data')['es_datetime']
local_datetimes = map['local_datetimes'] || []
if local_datetimes.any? { |datetime| datetime > es_datetime }
event.set('new_entries', true)
else
event.set('new_entries', false)
end
"
push_previous_map_as_event => true
end_of_task => true
}
}
}
output {
if [new_entries] == true {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "earthquake-v3"
document_type => "_doc"
user => "elastic"
password => ""
template_name => "earthquake-2"
template => "C:/elastic-stack/logstash-8.13.2/conf/earthquake-2.json"
ssl_verification_mode => "none"
ssl_enabled => false
manage_template => false
template_overwrite => true
}
}
stdout {
codec => rubydebug
}
}
I have been through this multiple times and I can't see where I'm missing a character or a syntax error. TIA!