LogStash - Upload using MySQL

I am new to the world of ELK and trying to learn. But Logstash is proving to be a big hurdle. I tried loading data from a CSV file and that caused lot of issues and I have been unable to upload using CSV. So I tried a new option to upload via MySQL tables.

I am using a Windows DataCenter Server 2012 with ELK 7.9.1 installed. My Conf file is as below

 input {
          jdbc { 
            jdbc_connection_string => "jdbc:mysql://localhost:3306/elk"
            # The user we wish to execute our statement as
            jdbc_user => "root"
            jdbc_password => "root"
            # The path to our downloaded jdbc driver
            jdbc_driver_library => "D:\ELK\mysql-connector-java-8.0.22\mysql-connector-java-8.0.22.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            # our query
            statement => "SELECT * FROM elk.unhealthydata where queue ='tez1' LIMIT 1"
            }
          }
        output {
        	  stdout { codec => rubydebug { metadata => true } }
        	  elasticsearch 
        		{
        	  "hosts" => "localhost:9600"
        	  "index" => "Unhealthy"
        	  "document_type" => "data"
        	}
        	}

I tried to execute as below from cmd but i am unable to view anything on Kibana
logstash -f C:\ELK\LogStash\unhealthyMutateData.conf

Post Execution of above

C:\ELK\logstash-7.9.1\bin>logstash -f C:\ELK\LogStash\unhealthyMutateData.conf
Sending Logstash logs to C:/ELK/logstash-7.9.1/logs which is now configured via
log4j2.properties
[2020-12-04T08:51:42,524][INFO ][logstash.runner          ] Starting Logstash {"
logstash.version"=>"7.9.1", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03
9a89c94bcc Java HotSpot(TM) 64-Bit Server VM 25.271-b09 on 1.8.0_271-b09 +indy +
jit [mswin32-x86_64]"}
[2020-12-04T08:51:42,765][WARN ][logstash.config.source.multilocal] Ignoring the
 'pipelines.yml' file because modules or command line options are specified
[2020-12-04T08:51:44,641][INFO ][org.reflections.Reflections] Reflections took 3
8 ms to scan 1 urls, producing 22 keys and 45 values
[2020-12-04T08:51:46,351][WARN ][logstash.outputs.elasticsearch] You are using a
 deprecated config setting "document_type" set in elasticsearch. Deprecated sett
ings will continue to work, but are scheduled for removal from logstash in the f
uture. Document types are being deprecated in Elasticsearch 6.0, and removed ent
irely in 7.0. You should avoid this feature If you have any questions about this
, please visit the #logstash channel on freenode irc. {:name=>"document_type", :
plugin=><LogStash::Outputs::ElasticSearch index=>"Unhealthy", id=>"f65c92e777326
c95923ea953e665f42f9b63b1458f5f817c1c7671e57e344b37", hosts=>[//localhost:9600],
 document_type=>"data", enable_metric=>true, codec=><LogStash::Codecs::Plain id=
>"plain_c9d47435-b1a4-426a-a354-e74d23eee920", enable_metric=>true, charset=>"UT
F-8">, workers=>1, manage_template=>true, template_overwrite=>false, doc_as_upse
rt=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"eve
nt", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64,
retry_on_conflict=>1, ilm_enabled=>"auto", ilm_pattern=>"{now/d}-000001", ilm_po
licy=>"logstash-policy", ecs_compatibility=>:disabled, action=>"index", ssl_cert
ificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, poo
l_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactiv
ity=>10000, http_compression=>false>}
[2020-12-04T08:51:46,853][INFO ][logstash.outputs.elasticsearch][main] Elasticse
arch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9600/
]}}
[2020-12-04T08:51:49,063][WARN ][logstash.outputs.elasticsearch][main] Attempted
 to resurrect connection to dead ES instance, but got an error. {:url=>"http://l
ocalhost:9600/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool
::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:96
00/][Manticore::SocketException] Connection refused: connect"}
[2020-12-04T08:51:49,084][INFO ][logstash.outputs.elasticsearch][main] New Elast
icsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localho
st:9600"]}
[2020-12-04T08:51:49,165][INFO ][logstash.javapipeline    ][main] Starting pipel
ine {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "p
ipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["C
:/ELK/LogStash/unhealthyMutateData.conf"], :thread=>"#<Thread:0x59199d7f run>"}
[2020-12-04T08:51:49,965][INFO ][logstash.javapipeline    ][main] Pipeline Java
execution initialization time {"seconds"=>0.79}
[2020-12-04T08:51:50,136][INFO ][logstash.javapipeline    ][main] Pipeline start
ed {"pipeline.id"=>"main"}
[2020-12-04T08:51:50,199][INFO ][logstash.agent           ] Pipelines running {:
count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-12-04T08:51:50,527][INFO ][logstash.agent           ] Successfully started
 Logstash API endpoint {:port=>9600}
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class
is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SP
I and manual loading of the driver class is generally unnecessary.
[2020-12-04T08:51:51,459][INFO ][logstash.inputs.jdbc     ][main][45b0db6a18f5f5
9857375d42054b05f80c1cecf282c326a949854a619ef9b350] (0.021069s) SELECT * FROM el
k.unhealthydata where queue ='tez1' LIMIT 1
{
         "running_containers" => 121,
               "cluster_name" => "South",
            "unhealthy_nodes" => "axxxxxxxxxx01w1.prod.dluk.cloud.aws.co
m",
           "application_type" => "SPARK",
      "queue_utilization_pct" => 43.6,
                     "userid" => "hxxxxxxxxxxatch",
                 "@timestamp" => 2020-12-04T03:21:51.512Z,
    "cluster_utilization_pct" => 5.2,
          "running_memory_mb" => 990208,
               "elapsed_time" => 7.87,
                   "@version" => "1",
                "impact_host" => "259724784\t/mnt/resource/yarn/local/usercache/
hdp_batch/appcache/application_1594440874827_252403",
              "applicationid" => "applicatioxxxxxxxxxx827_252403",
             "tech_datestamp" => "7/29/2020",
                      "queue" => "tez1"
}
[2020-12-04T08:51:54,665][WARN ][logstash.outputs.elasticsearch][main] Restored
connection to ES instance {:url=>"http://localhost:9600/"}
[2020-12-04T08:51:54,859][WARN ][logstash.outputs.elasticsearch][main] Error whi
le performing resurrection {:error_message=>"undefined method `split' for nil:Ni
lClass", :class=>"NoMethodError", :backtrace=>["C:/ELK/logstash-7.9.1/vendor/bun
dle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outp
uts/elasticsearch/http_client/pool.rb:194:in `major_version'", "C:/ELK/logstash-
7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/l
ib/logstash/outputs/elasticsearch/http_client/pool.rb:277:in `block in healthche
ck!'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "C:/ELK/logstash-
7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/l
ib/logstash/outputs/elasticsearch/http_client/pool.rb:275:in `block in healthche
ck!'", "org/jruby/RubyHash.java:1415:in `each'", "C:/ELK/logstash-7.9.1/vendor/b
undle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/ou
tputs/elasticsearch/http_client/pool.rb:266:in `healthcheck!'", "C:/ELK/logstash
-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/
lib/logstash/outputs/elasticsearch/http_client/pool.rb:243:in `block in start_re
surrectionist'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-
output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/
pool.rb:137:in `until_stopped'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.
0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsea
rch/http_client/pool.rb:242:in `block in start_resurrectionist'"]}
[2020-12-04T08:51:59,915][WARN ][logstash.outputs.elasticsearch][main] Restored
connection to ES instance {:url=>"http://localhost:9600/"}
[2020-12-04T08:51:59,952][WARN ][logstash.outputs.elasticsearch][main] Error whi
le performing resurrection {:error_message=>"undefined method `split' for nil:Ni
lClass", :class=>"NoMethodError", :backtrace=>["C:/ELK/logstash-7.9.1/vendor/bun
dle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outp
uts/elasticsearch/http_client/pool.rb:194:in `major_version'", "C:/ELK/logstash-
7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/l
ib/logstash/outputs/elasticsearch/http_client/pool.rb:277:in `block in healthche
ck!'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "C:/ELK/logstash-
7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/l
ib/logstash/outputs/elasticsearch/http_client/pool.rb:275:in `block in healthche
ck!'", "org/jruby/RubyHash.java:1415:in `each'", "C:/ELK/logstash-7.9.1/vendor/b
undle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/ou
tputs/elasticsearch/http_client/pool.rb:266:in `healthcheck!'", "C:/ELK/logstash
-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/
lib/logstash/outputs/elasticsearch/http_client/pool.rb:243:in `block in start_re
surrectionist'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-
output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/
pool.rb:137:in `until_stopped'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.
0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsea
rch/http_client/pool.rb:242:in `block in start_resurrectionist'"]}
[2020-12-04T08:52:04,993][WARN ][logstash.outputs.elasticsearch][main] Restored
connection to ES instance {:url=>"http://localhost:9600/"}
[2020-12-04T08:52:05,022][WARN ][logstash.outputs.elasticsearch][main] Error whi
le performing resurrection {:error_message=>"undefined method `split' for nil:Ni
lClass", :class=>"NoMethodError", :backtrace=>["C:/ELK/logstash-7.9.1/vendor/bun
dle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outp
uts/elasticsearch/http_client/pool.rb:194:in `major_version'", "C:/ELK/logstash-
7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/l
ib/logstash/outputs/elasticsearch/http_client/pool.rb:277:in `block in healthche
ck!'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "C:/ELK/logstash-
7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/l
ib/logstash/outputs/elasticsearch/http_client/pool.rb:275:in `block in healthche
ck!'", "org/jruby/RubyHash.java:1415:in `each'", "C:/ELK/logstash-7.9.1/vendor/b
undle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/ou
tputs/elasticsearch/http_client/pool.rb:266:in `healthcheck!'", "C:/ELK/logstash
-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/
lib/logstash/outputs/elasticsearch/http_client/pool.rb:243:in `block in start_re
surrectionist'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-
output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/
pool.rb:137:in `until_stopped'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.
0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsea
rch/http_client/pool.rb:242:in `block in start_resurrectionist'"]}
[2020-12-04T08:52:10,077][WARN ][logstash.outputs.elasticsearch][main] Restored
connection to ES instance {:url=>"http://localhost:9600/"}

Help would be appreciated.

Drop that, you don't need or want it.

The rest has a few connection warnings, but it seems to be ok as it reconnects.
What is not happening that you are expecting?

I should be seeing the output in Kibana in the Index Management page right. However I don't see that appearing there. The end result that is needed is
creating a Visualization in Kibana

Are you getting more in stdout?
Otherwise, the query has retrieved all results. You might want to look at https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_scheduling_2 to make sure it's re-running the query.

I keep getting as below:-
Data from the Select Query
{
"running_containers" => 121,
"cluster_name" => "South",
"unhealthy_nodes" => "axxxxxxxxxx01w14.prod.dluk.cloud.aws.com",
"application_type" => "SPARK",
"queue_utilization_pct" => 43.6,
"userid" => "hxxxxxxxxxxatch",
"@timestamp" => 2020-12-04T06:05:04.401Z,
"cluster_utilization_pct" => 5.2,
"running_memory_mb" => 990208,
"@version" => "1",
"elapsed_time" => 7.87,
"impact_host" => "244480476\t/mnt/resource/yarn/local/usercache/hdp_batch/appcache/application_1594440874827_252403",
"applicationid" => "applicatioxxxxxxxxxx827_252403",
"tech_datestamp" => "7/29/2020",
"queue" => "tez1"
}
Section 1 Start

[2020-12-04T11:35:07,323][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9600/"}
[2020-12-04T11:35:07,494][WARN ][logstash.outputs.elasticsearch][main] Error while performing resurrection {:error_message=>"undefined method 'split' for nil:NilClass", :class=>"NoMethodError", :backtrace=>["C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:194:in `major_version'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:277:in `block in healthcheck!'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:275:in `block in healthcheck!'", "org/jruby/RubyHash.java:1415:in `each'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:266:in `healthcheck!'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:243:in `block in start_resurrectionist'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:137:in `until_stopped'", "C:/ELK/logstash-7.9.1/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.6.2-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:242:in `block in start_resurrectionist'"]}

Section 1 End

The section 1 keeps repeating over and over again and now I have a 1MB file with only the section 1 and finally I had to Force Terminate using Ctrl +C

Thanks

Any assistance will be appreciated

Again, it sounds like Logstash has collected the table and is waiting for more data in the pipeline. You may want to look at the schedule.

You have pointed the elasticsearch output at the logstash API (i.e. the monitoring port). You probably want to use port 9200 rather than 9600 in the elasticsearch output.

1 Like

Good catch!

1 Like

Thanks... Will look at it... However I am sure that the Port I was suggested by the IT team was 9600 and not 9200. Will check on that once again.

This is expected, because your query without any tracking column of the sql_last_value.
In your query

statement => "SELECT * FROM elk.unhealthydata where queue ='tez1' LIMIT 1"

it doesn't have any tracking column to do limitation, so the logstash will get the same value over and over again with the same value until you have new values.
I suggest to modify the query to have the tracking column, either timestamp or id to limit the query.

Thanks to Mark Walkom and Badger for the assistance. It was the 9200 that was causing the issue and I had also not started ElasticSearch. So used TCPView to check if 9200 Port was OPEN and able to RECEIVE Data.

Also thanks for the inputs Fadjar Tandabawana for the suggestion to add a ID or a TimeStamp. Can I add TimeStamp Values at Runtime?

Thanks

Yes...

Belo my sample input for logstash

input {
  jdbc {
    jdbc_driver_library => "/etc/logstash/drivers/postgresql-42.2.16.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://<IP>:5432/<DB>?sslmode=disable"
    jdbc_user => "user"
    jdbc_password => "password"
    jdbc_default_timezone => "Asia/Jakarta"
    jdbc_paging_enabled => "true"
    use_column_value => "true"
    tracking_column => "some_date"
    tracking_column_type => "timestamp"
    jdbc_page_size => 100000
    schedule => "* * * * *"
    last_run_metadata_path => "/etc/logstash/metadata/timestamp_last_run.txt"
    statement_filepath => "/etc/logstash/scripts/some.sql"
    type => "whatever"
  }
}

filter {
  mutate {
   convert => { "amount" => "float" }
  }
}

output {
  if [type] == "whatever" 
     {
        elasticsearch {
        hosts => ["http://elastic01:9200"]
        index => "logstash-%{+YYYY.MM}"
        user => "elastic"
        password => "changeme"
     }
#  stdout {
#    codec => rubydebug
#  }
  }
}

Below the sql statement sample some.sql:

SELECT
  created_on::TIMESTAMPTZ as "@timestamp",
  to_char(some_date,'Mon') as "Month",
  EXTRACT(ISODOW FROM some_date) as "dow",
  EXTRACT(DOY FROM some_date) as "doy",
  EXTRACT(WEEK FROM some_date) as "week",
  EXTRACT(MONTH FROM some_date) as "Mon",
  EXTRACT(QUARTER FROM some_date) as "Quarter",
  EXTRACT(YEAR FROM some_date) as "Year",
  amount AS "amount"
FROM app.t_some_table
WHERE 
created_on > :sql_last_value AND created_on <= NOW()

Sorry, my example using postgresql as data source.
Then you can restart the logstash, after that check the index and refresh the Index Patterns of your index. See if the data ingested into the index using discovery. Last, visualize it using Kibana

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.