Index Naming

How do I get my index named properly?

I have an index appearing as:
%{[@metadata][beat]}-2018.04.03

While my beats appear properly as:
winlogbeat-2017.12.19
winlogbeat-2017.12.20

My conf:
input {
beats {
port => 5044
}
}

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

input {
jdbc {
jdbc_driver_library => "/usr/local/sbin/mssql-jdbc-6.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://brt1-trackit01\BRT1TRACKIT01;databaseName=TRACKIT8_DATA2"
jdbc_user => "reports_jsp"
jdbc_password => "letmeinNOW!"
type => "sql_tk_tasks"
statement => "select WO_NUM,OPENDATE,CLSDDATE,TYPE,RESPONS from tasks where OPENDATE BETWEEN DATEADD(day,-30,GETDATE()) AND GETDATE()"
}
}

output {
if [type] == "sql_tk_tasks" {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "tasks2-%{+YYYY.MM.dd}"
document_type => "tasks2"
}
}
}

Thank You!

Hi Mike,

It looks as though this config is configured as one single pipeline, as opposed to multiple pipelines which became available in Logstash 6 (Multiple Pipelines | Logstash Reference [6.7] | Elastic)

If this is the case, since your first output has no conditionals on it, your JDBC logs will be outputting to both the 'sql tasks' output, but also your 'beat' output. Since those JDBC logs presumably don't have any beats metadata, it takes the name literally and creates an index with that name (i.e. %{[@metadata][beat]}-2018.04.03)

)

I think you can wrap your beats output in something like if ("" in [@metadata][beat]) so that only beat related documents hit this output.

The other option is to split your config into multiple pipelines, as per the link I posted above

Cheers,
Mike

Thank You.

I built a test ELK stack yesterday afternoon to take advantage of multiple piplines. I have one beats server shipping events. This index gets created using the first pipline and is named correctly. The second pipeline never creates an index (JDBC input). When I run the config using --dubug, I can see the database rows being returned as they should be, just never indexed. the JDBC plugin is installed.

piplines.ylm

  • pipeline.id: beats
    path.config: "/etc/logstash/conf.d/beats.conf"
  • pipeline.id: tasks
    path.config: "/etc/logstash/conf.d/tasks/tasks.conf"

tasks.conf
input {
jdbc {
jdbc_driver_library => "/usr/local/sbin/mssql-jdbc-6.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://brt1-trackit01\BRT1TRACKIT01;databaseName=TRACKIT8_DATA2"
jdbc_user => "reports_jsp"
jdbc_password => "letmeinNOW!"
type => "sql_tk_tasks"
statement => "select WO_NUM,OPENDATE,CLSDDATE,TYPE,RESPONS from tasks where OPENDATE BETWEEN DATEADD(day,-60,GETDATE()) AND GETDATE()"
}
}

output {
if [type] == "sql_tk_tasks" {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "tasks2-%{+YYYY.MM.dd}"
document_type => "tasks2"
}
}
}

I'd probably suggest upping the log level via the API for Logstash on your JDBC input, double check data is being read correctly: https://www.elastic.co/guide/en/logstash/current/logging.html.

Your output block looks OK to me

Thanks. Upped to debug. I can see the pipeline starting in logstash-plain.log. Still dont see the index though.

[2018-04-04T10:22:12,362][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"tasks", :thread=>"#<Thread:0x48a1a52f@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
[2018-04-04T10:22:12,365][INFO ][logstash.agent ] Pipelines running {:count=>2, :pipelines=>["beats", "tasks"]}
[2018-04-04T10:22:12,873][INFO ][logstash.inputs.jdbc ] (0.038025s) select WO_NUM,OPENDATE,CLSDDATE,TYPE,RESPONS from tasks where OPENDATE BETWEEN DATEADD(day,-60,GETDATE()) AND GETDATE()
[2018-04-04T10:22:14,260][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"tasks", :thread=>"#<Thread:0x48a1a52f@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 run>"}
[2018-04-04T10:22:16,512][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[http://localhost:9200/], :added=>[http://127.0.0.1:9200/]}}
[2018-04-04T10:22:16,513][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2018-04-04T10:22:16,527][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}

Further testing: I peeled a beat off into a pipeline separate from other beats. I did see the new index in kibana. This verifies that multi-pipeline is working. MYSTERY: Why is the JDBC input not creating an output index? How can watch for errors on the index creation?

beats.conf:
input {
beats {
port => 5044
}
}

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

beats2.conf:
input {
beats {
port => 5045
}
}

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "beats2-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

tasks.conf (no index being created):
input {
jdbc {
jdbc_driver_library => "/usr/local/sbin/mssql-jdbc-6.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://brt1-trackit01\BRT1TRACKIT01;databaseName=TRACKIT8_DATA2"
jdbc_user => "reports_jsp"
jdbc_password => "letmeinNOW!"
type => "sql_tk_tasks"
statement => "select WO_NUM,OPENDATE,CLSDDATE,TYPE,RESPONS from tasks where OPENDATE BETWEEN DATEADD(day,-60,GETDATE()) AND GETDATE()"
}
}

output {
if [type] == "sql_tk_tasks" {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "tasks2-%{+YYYY.MM.dd}"
document_type => "tasks2"
}
}
}

pipelines.yml:

  • pipeline.id: beats
    path.config: "/etc/logstash/conf.d/beats.conf"
  • pipeline.id: beats2
    path.config: "/etc/logstash/conf.d/beats2/beats2.conf"
  • pipeline.id: tasks
    path.config: "/etc/logstash/conf.d/tasks/tasks.conf"

If you're using multiple pipelines then you don't need to worry about wrapping the output in if [type] == "sql_tk_tasks", since you want all documents in that pipeline to use that output.

I'd try removing that first, see if that gets you anywhere. If you also have Logstash monitoring enabled you could view the pipeline in Kibana, and double check the JDBC input is receiving events, and if the output is receiving any. https://www.elastic.co/guide/en/logstash/current/logstash-pipeline-viewer.html

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.