I have different Logstash configurations, for example, one for pulling AWS Cloudwatch logs, one for AWS Athena, etc. Therefore, I have created multiple configuration files, and in my pipeline.yml file, my configuration is set to:
pipeline.id: pipeline1
pipeline.workers: 1
queue.type: persisted
path.config: "/usr/share/logstash/pipeline/*.conf"
However, all my configurations end up writing to one index which doesn't make sense. I have assigned different index names for each Elasticsearch output plugin in my Logstash. Does anyone know where the source of the problem might be?
Before you read the configurations... All my exec input plugin are supposed to write to ES_PYTHON_INDEX, because that's where all my python scripts results go, but for a JDBC connection, I don't get why that data should end up in ES_PYTHON_INDEX according to my code.
Configuration 1:
input{
exec {
command => "python3 /usr/share/logstash/script/customer_model/run.py"
schedule => "${SCHEDULER}"
type => "CS"
}
}
output{
if [type] == "CS"{
elasticsearch{
hosts => ["${ES_HOST}"]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "${ES_PYTHON_INDEX}"
workers => 1
}
stdout { }
}
}
Configuration 2:
input {
exec {
command => "python3 ${LOGSTASH_HOME}/script/firmware/backend_s3_download/s3_file_download_script.py"
schedule => "${SCHEDULER}"
type => "Backend"
}
exec {
command => "python2.7 ${LOGSTASH_HOME}/script/firmware/firmware_error_parser/parsers/log_parser/eis_logparser.py -pp true -lf ${LOGSTASH_HOME}/firmware_data"
schedule => "${FIRMWARE_SCHEDULER}"
type => "Firmware"
}
file {
path => "${LOGSTASH_HOME}/firmware_data/errorcode_diagnostics/*.diag"
codec => json
start_position => "beginning"
sincedb_path => "${LOGSTASH_HOME}/file/.sincedb*"
type => "S3"
}
}
output{
if [type] == "S3" {
elasticsearch {
hosts => ["${ES_HOST}"]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "${ES_FIRMWARE_ERROR_CODES_INDEX}"
workers => 1
}
}
else if [type] == "Backend" {
elasticsearch {
hosts => ["${ES_HOST}"]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "${ES_PYTHON_INDEX}"
workers => 1
}
stdout { }
}
else if [type] == "Firmware" {
elasticsearch {
hosts => ["${ES_HOST}"]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "${ES_PYTHON_INDEX}"
workers => 1
}
stdout { }
}
}
Configuration 3:
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "${JDBC_DRIVER_CLASS}"
jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
jdbc_user => "${JDBC_USER_ATHENA}"
schedule => "${SCHEDULER}"
statement => "${CUSTOMER_SUCCESS_QUERY_1}"
type => "Customer_success_1"
}
}
output {
if [type] == "Customer_success_1"{
elasticsearch{
hosts => ["${ES_HOST}"]
user => "${ES_USERNAME}"
password => "${ES_PASSWORD}"
index => "${ES_CUSTOMER_SUCCESS_INDEX_1}"
workers => 1
}
}
}