Logstash and databse

First of all thanks to team that they provided us with jdbc plugin .
I really needed it .
Now i am stuck again
I connected this plugin , ran the config file and yippie i could see the reflection in kibana.
But now i wanted to add new row to my database so i did it and ran the command on logstash but now all of my rows got duplicated so what should we do now???? Now elasticsearch shows me double data plus 1 which i added .Isnt there a way by which logstash is constantly running and in certain way polls the database and bring all the newly added rows??

Help needed
reply soon team.

Thank you

This is unique to your setup.

If I am understanding your data correctly, you have some sort of primary key on
your table. If this is the case, you can maybe try something like this:

input {
  jdbc {
    statement => "select my_id, another_col from table"
  }
}

output {
  elasticsearch {
    protocol => http
    document_id => "%{my_id}"
    action => index
  }
}

This way, Elasticsearch will use your data's primary key as the document id (or some mapping of it) and will update the document with the new content. Unfortunately, this is not aware of whether the document exists already, but it does assure that you will not have duplicates.

would you mind sharing the nature of your ids for your data? I am playing around with an idea of watermarking that would automate this for ordered fields for newer data. for example, remembering that all IDs less than 132 has been processed, so only fetch later data with greater id values.

does that make sense?

1 Like

see i have tables which checks CPU utilization for particular work so it has fields like timestamp ,hostname ,os_type , data sent , data received,, etc etc..
Now i want to work on live data and so my database gets updated automatically. so using kibana i ll portray graphs .. but i want to work on real time but if jdbc plugin is to ingest data just once then in that case i dont think so it will work . as logstash shutdowns immediately.... unlike CSV file in which logstash doesnt shutdown until we wish for.
Please help me out then which plugin i should use to fetch data from postgresql database

For now. you may want to take a look at the :sql_last_start computed parameter. This field represents the last time (in current time) for which a sql statement was run. So if your timestamps are based on real time as well, this may work for you.

you can set it up like this:

input {
  jdbc {
    ...
    statement => "select * from table where timestamp > :sql_last_start"
    schedule => "* * * * *"
  }
}

This input configuration will only fetch latest records and run according to this cron-like schedule which means "run every minute, on the minute". By adding a schedule, the input plugin will stay alive and run the query every minute.

Regarding postgresql. this plugin supports that. You just have to download the driver jars yourself from here: https://jdbc.postgresql.org/download.html. and configure the jdbc_driver_library with the path of the downloaded driver. And set the jdbc_driver_class option to org.postgresql.Driver.

does that make sense? for a number of reasons, we chose not to include these jars within the plugin itself.

Thanx its working.. it solved 90% of my problem

cool. what is 10% of the problem that still needs to be solved?

i also have timestamp field and logstash also gives timestamp of its own so if i use "select * from table where timestamp > :sql_last_start" sql_last_start takes timefield of logstash so its conflicting with my timestamp field ..
suppose my timestamp field has value 30-07-2015 09:04:08 and logstash has current value so next data which it takes from database will have timestamp greater than current logstash timestamp so its conflicting

i tried using date{ } with match{ } and target { } but i am getting error

i used this
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss " , "ISO8601" ]
target => "@timestamp"
}

and i am getting error

"cannot convert instance of class org.jruby.RubyTime to class java.lang.String "

can we replace logstash timestamp field with my timestamp?? i guess this problem is due to different time zones...

The default target is already "@timestamp" for the date filter.

assuming your timestamp field is a string of value: 30-07-2015 09:04:08, this
date filter to do the trick.

date {
  match => ["timestamp", "dd-mm-YYYY HH:mm:ss"]
  remove_field => "timestamp" # remove now redundant field
}

you seem to have mixed up the parsing, and it may have complained about that.

If this is not the problem. run logstash with --debug for more information and stacktrace.

my timestamp field is of timestamp type without timezone .. I used postgresql database

I do not assume a timezone is provided with this match

the default timezone that Logstash will set for it is UTC I believe. If this is an issue, it is configurable.

what i want to do is use my timestamp field which is in database instead of logstash timestamp (@timestamp). what should i do ? because when i use >: sql_last_start , it uses logstash by default and i get duplicacy again as my timestamp differ

oh, that is slightly different. :sql_last_start uses the system current time.

input {
jdbc{
jdbc_driver_library => "C:\anmol\postgresql-9.4-1201.jdbc4.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5434/mm_db_Reporting"
jdbc_user => "mmsuper"
jdbc_password => "mmsuper"
statement => "select * from om_report where timestamp > :sql_last_start "
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
date { match => ["timestamp", "YYYY-mm-dd HH:mm:ss"] remove_field => "timestamp"}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
action => "index"
host => "localhost"
index => "mmsuper_om_report"
}
}

this is my config file




this is the error


C:\Program Files\logstash-1.5.2\bin>logstash agent -f example.conf
io/console not supported; tty will not be manipulated
←[33mFailed parsing date from field {:field=>"timestamp", :value=>2015-07-30 09:
04:54 +0530, :exception=>"cannot convert instance of class org.jruby.RubyTime to
class java.lang.String", :config_parsers=>"YYYY-mm-dd HH:mm:ss", :config_locale
=>"default=en_US", :level=>:warn}←[0m
←[33mFailed parsing date from field {:field=>"timestamp", :value=>2015-07-30 13:
04:54 +0530, :exception=>"cannot convert instance of class org.jruby.RubyTime to
class java.lang.String", :config_parsers=>"YYYY-mm-dd HH:mm:ss", :config_locale
=>"default=en_US", :level=>:warn}←[0m
←[33mFailed parsing date from field {:field=>"timestamp", :value=>2015-07-30 15:
04:54 +0530, :exception=>"cannot convert instance of class org.jruby.RubyTime to
class java.lang.String", :config_parsers=>"YYYY-mm-dd HH:mm:ss", :config_locale
=>"default=en_US", :level=>:warn}←[0m
Jul 30, 2015 10:51:44 AM org.elasticsearch.node.internal.InternalNode
INFO: [logstash-HFX2WS1-4800-11768] version[1.5.1], pid[4800], build[5e38401/201
5-04-09T13:41:35Z]
Jul 30, 2015 10:51:44 AM org.elasticsearch.node.internal.InternalNode
INFO: [logstash-HFX2WS1-4800-11768] initializing ...
Jul 30, 2015 10:51:44 AM org.elasticsearch.plugins.PluginsService
INFO: [logstash-HFX2WS1-4800-11768] loaded [], sites []
Jul 30, 2015 10:51:46 AM org.elasticsearch.node.internal.InternalNode
INFO: [logstash-HFX2WS1-4800-11768] initialized
Jul 30, 2015 10:51:46 AM org.elasticsearch.node.internal.InternalNode start
INFO: [logstash-HFX2WS1-4800-11768] starting ...
Jul 30, 2015 10:51:46 AM org.elasticsearch.transport.TransportService doStart
INFO: [logstash-HFX2WS1-4800-11768] bound_address {inet[/0:0:0:0:0:0:0:0:9301]},
publish_address {inet[/100.96.85.51:9301]}
Jul 30, 2015 10:51:46 AM org.elasticsearch.discovery.DiscoveryService doStart
INFO: [logstash-HFX2WS1-4800-11768] elasticsearch/gFEXMhWBTEOfBW4nICa-Sg
Jul 30, 2015 10:51:51 AM org.elasticsearch.cluster.service.InternalClusterServic
e$UpdateTask run
INFO: [logstash-HFX2WS1-4800-11768] detected_master [Crossbones][WxFjSVf1QO6EfT6
AXA68vg][7LH4ZR1][inet[/100.96.85.52:9300]], added {[Doctor Spectrum][9BlOCK4KRm
6KOruMhVoI3g][HFX2WS1][inet[/100.96.85.51:9300]],[Crossbones][WxFjSVf1QO6EfT6AXA
68vg][7LH4ZR1][inet[/100.96.85.52:9300]],}, reason: zen-disco-receive(from maste
r [[Crossbones][WxFjSVf1QO6EfT6AXA68vg][7LH4ZR1][inet[/100.96.85.52:9300]]])
Jul 30, 2015 10:51:51 AM org.elasticsearch.node.internal.InternalNode start
INFO: [logstash-HFX2WS1-4800-11768] started
Logstash startup completed
{
"timestamp" => 2015-07-30 09:04:54 +0530,
"physicalserver" => "103.84.68.861",
"logicalserver" => "190.57.590.87",
"configuration" => "eric5",
"collection" => "bus5",
"servicename" => "gm",
"configposition" => "middle",
"activitytype" => "back-end",
"activityname" => "log4",
"countertype" => "request",
"networkelement" => "node4",
"additionalinfo" => "",
"request" => 444,
"success" => 407,
"rejected" => 8,
"timeout" => 4,
"discarded" => 10,
"failed" => 15,
"@version" => "1",
"@timestamp" => "2015-07-30T05:21:43.864Z",
"tags" => [
[0] "_dateparsefailure"
]
}
{
"timestamp" => 2015-07-30 13:04:54 +0530,
"physicalserver" => "109.84.68.861",
"logicalserver" => "198.57.590.87",
"configuration" => "eric3",
"collection" => "buss",
"servicename" => "mm",
"configposition" => "front",
"activitytype" => "back-end",
"activityname" => "log",
"countertype" => "request",
"networkelement" => "node",
"additionalinfo" => "rj65",
"request" => 467,
"success" => 408,
"rejected" => 17,
"timeout" => 13,
"discarded" => 12,
"failed" => 17,
"@version" => "1",
"@timestamp" => "2015-07-30T05:21:43.864Z",
"tags" => [
[0] "_dateparsefailure"
]
}
{
"timestamp" => 2015-07-30 15:04:54 +0530,
"physicalserver" => "10.84.68.861",
"logicalserver" => "198.58.590.87",
"configuration" => "eric3",
"collection" => "bus5",
"servicename" => "gm",
"configposition" => "middle",
"activitytype" => "back-end",
"activityname" => "log",
"countertype" => "response",
"networkelement" => "node",
"additionalinfo" => "rj65",
"request" => 468,
"success" => 409,
"rejected" => 13,
"timeout" => 17,
"discarded" => 17,
"failed" => 12,
"@version" => "1",
"@timestamp" => "2015-07-30T05:21:43.864Z",
"tags" => [
[0] "_dateparsefailure"
]
}
Logstash shutdown complete


this is the error

if that is so then sql_last_start wont solve my problem, :cry:
need to find something else?