Hello,
I am getting this in my logstash log
"{:timestamp=>"2016-04-04T19:27:10.116000+0000", :message=>"Flushing buffer at interval",
:instance=>"#<LogStash::Outputs::Elasticsearch::Buffer:0x635fc4ba operations_mutex=#Mutex:0x6d9836a1,
max_size=500, operations_lock=#Java::JavaUtilConcurrentLocks::ReentrantLock:0x69a7afca,
submit_proc=#Proc:0x314247a0/home/t/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.5.1-java/lib/logstash/outputs/elasticsearch/common.rb:57,
logger=#<Cabin::Channel:0x7502b298 metrics=#<Cabin::Metrics:0x4234fde metrics_lock=#Mutex:0x24a36adf,
metrics={}, channel=#<Cabin::Channel:0x7502b298 ...>>, subscriber_lock=#Mutex:0x61124ce1,
level=:debug, subscribers={12906=>#<Cabin::Outputs::IO:0x1f274674 io=#<File:/home/t/logstash-2.2.2/var/log/kafka_logstash.log>,
lock=#Mutex:0x20ed32dd>}, data={}>, last_flush=2016-04-04 19:27:09 +0000, flush_interval=1, stopping=#Concurrent::AtomicBoolean:0x1d435217,
buffer=, flush_thread=#<Thread:0x494e778a run>>", :interval=>1, :level=>:debug, :file=>"logstash/outputs/elasticsearch/buffer.rb",
:line=>"90", :method=>"interval_flush"} "
This is my config
input {
kafka {
zk_connect => "blah.com:2181"
topic_id => "f5-logs-wc1"
codec => plain {
format => "%{message}"
}
}
}
filter {
grok {
break_on_match => false
match => [ "message", "%{SYSLOGTIMESTAMP} %{HOSTNAME:logsource} %{LOGLEVEL:severity_label} %{SYSLOGPROG}: %{GREEDYDATA:info}" ]
}
mutate {
gsub => [
"severity_label", "err", "error",
"severity_label", "info", "informational",
"severity_label", "crit", "critical"
]
remove_field => ["message","host"]
}
}
output {
elasticsearch {
hosts => [ "blah1.com:9200", "blah2.com:9200"]
index => "f5-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
I have verified that topic has data. So writing to kafka is working