MySQL to Elasticsearch

Please propose a better way to import selected MySQL tables to Elasticsearch using Logstash or similar latest way with examples.

I already did the same successfully using my Own PHP script directly without more issue. My sample methods are following,

public function createIndex($index){  
    $params = [
      'index' => $index,
        'body' => [
        'settings' => [
            'max_result_window' =>2147483647
          ]
        ]        
    ];
    
    $client=  $this->getClient();
    if(!$this->exists($index)){
      $client->indices()->create($params);
    }
  }

  function  getDataType($type){
    $temp=array();
    switch($type){
      case 'int':{
        $temp['type']= 'integer';
        break;
      }      
      case 'bigint':{
        $temp['type']= 'long';
        break;
      }      
      case 'varchar':{
        $temp['type']= 'string';
        $temp['analyzer']= 'standard';
        break;
      }      
      case 'double':{
        $temp['type']= 'double';
        break;
      }      
      case 'datetime':{
        $temp['type']= 'date';
        $temp['format']= 'yyyy-MM-dd HH:mm:ss';
        break;
      }
      case 'timestamp':{
        $temp['type']= 'date';
        $temp['format']= 'yyyy-MM-dd HH:mm:ss';
        break;
      }
      case 'time':{
        $temp['type']= 'date';
        $temp['format']= 'HH:mm:ss';
        break;
      }
      case 'date':{
        $temp['type']= 'date';
        $temp['format']= 'yyyy-MM-dd';
        break;
      }
      default :{
        $temp['type']= 'string';
        $temp['analyzer']= 'standard';
        break;
      }
    }
    return $temp;
  }
  
  function sinzTable($index,$table){
    $reuarray=$this->moData->descTable($table);
    $myField1=array(); 
    $tempField=array(); 
    foreach ($reuarray as $key=>$val){
      $tempField=$this->getDataType(strtok($val['Type'],'('));
      $myField1[$val['Field']]=$tempField;
    }   
    $this->createType($index, $table, $myField1);      
  }
  
  function sinzData($index,$table){
    $client=  $this->getClient();
    $data=$this->moData->selectAll($table);
   
    foreach($data as $row){
      $client->index([      
            'index'=>$index,
            'type'=>$table,
            'body'=>$row
      ]);
    }
  }

You could use bulk API instead to insert data.

Why not use the jdbc input plugin?

1 Like

@ebuildy, I already used one REST API that is Postman from Google. bulk API is like that or it have more features than postman? I have to update the server everyday automatically. Is it possible in bulk API?

@ebuildy, I already used one REST API that is Postman from Google. bulk API is like that or it have more features than postman?

Postman is a tool to make HTTP requests, including REST APIs. It's not an API in itself. You are comparing apples and oranges.

I have to update the server everyday automatically. Is it possible in bulk API?

You can call the bulk API any time you like.

Again, I do suggest you look into the jdbc input.

1 Like

As @magnusbaeck said, you are typically doing the job of logstash, a tool perfectly fit to ingest data from mysql (via JDBC input plugin https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html), process data (mutate fields...), and output to elasticsearch ( https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html ) .

Bulk API I am talking about is an elasticsearch API allows to insert several documents via a single API call:

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html.

Give a try to LogStash!

1 Like

Actually I'm new to logstash. As you said, I'm reading for logstash with jdbc input plugin. Would you please suggest me any example with demo about the same.

Regards,
Croos.

The jdbc input documentation contains an example that should get you started. If it's unclear please ask a specific question, including your current configuration, what happens, and what you expected to happen.

1 Like

Ok @magnusbaeck, I will try and let you know.

When I start the logstash.bat getting the following error

io/console not supported; tty will not be manipulated

I'm using:
elasticsearch-2.1.1 and logstash-2.1.1

My system info:

C:\Users\Croos>systeminfo
OS Name:                   Microsoft Windows 8.1 Pro
OS Version:                6.3.9600 N/A Build 9600
OS Manufacturer:           Microsoft Corporation
OS Configuration:          Standalone Workstation 

my java version:

C:\Users\Croos>java -version
java version "1.8.0_66"
Java(TM) SE Runtime Environment (build 1.8.0_66-b18)
Java HotSpot(TM) 64-Bit Server VM (build 25.66-b18, mixed mode)

io/console not supported; tty will not be manipulated

You can ignore that message.

After that message the cmd is suddenly close. What can I do?

Ok now I have started successfully..!

C:\ES_PACK\logstash-2.1.1\bin>logstash -e 'input { stdin { } } output { stdout {} }'
io/console not supported; tty will not be manipulated
Settings: Default filter workers: 1
Logstash startup completed
hello world
2016-02-12T07:20:32.414Z Lenovo-PC hello world

@magnusbaeck,
I have successfully export the MySql table to Elasticsearch. Now my problem is,
Frequently increasing the number of documents after every update by the jdbc-logstash, Even the the database have no changes. How to control it, or Over right the previous record by new, or Delete the previous document form the elasticsearch, avoid the duplication.. Give me some idea..

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.38-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.10.6/asterisk?user=cron&password=1234"
    jdbc_user => "Croos"
    parameters => {
     }
    schedule => "* * * * *"
    statement => "SELECT * from vicidial_log"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
}

Have a look at the use_column_value and tracking_column options and the sql_last_value query parameter.

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_state

1 Like
statement => "SELECT * from vicidial_log WHERE start_epoch > :sql_last_value"
use_column_value => true
tracking_column => start_epoch

Actually our main db is in mariaDB not MySQL. So when I run the logstash with above config, The following error occur.

io/console not supported; tty will not be manipulated
←[31mUnknown setting 'use_column_value' for jdbc {:level=>:error}←[0m
←[31mUnknown setting 'tracking_column' for jdbc {:level=>:error}←[0m
Error: Something is wrong with your configuration.
You may be interested in the '--configtest' flag which you can
use to validate logstash's configuration before you choose
to restart a running system.

If I run with only first line,

statement => "SELECT * from vicidial_log WHERE start_epoch > :sql_last_value"

I show the following exception:

C:\xampp\htdocs\ES_PACK\logstash-2.1.1\bin>logstash -f logstash.conf
io/console not supported; tty will not be manipulated
Settings: Default filter workers: 1
Logstash startup completed
←[33mException when executing JDBC query {:exception=>#<Sequel::DatabaseError: J
ava::ComMysqlJdbcExceptionsJdbc4::MySQLSyntaxErrorException: You have an error i
n your SQL syntax; check the manual that corresponds to your MariaDB server vers
ion for the right syntax to use near ':sql_last_value' at line 1>, :level=>:warn
}←[0m
←[33mException when executing JDBC query {:exception=>#<Sequel::DatabaseError: J
ava::ComMysqlJdbcExceptionsJdbc4::MySQLSyntaxErrorException: You have an error i
n your SQL syntax; check the manual that corresponds to your MariaDB server vers
ion for the right syntax to use near ':sql_last_value' at line 1>, :level=>:warn
}←[0m 

How can I rewrite the config?

Those parameters are apparently only available in recent releases of the jdbc plugin. Upgrade the plugin (preferred) or consult the documentation of the plugin you have.

where is it available for download? or I have to download latest version of logstash?

Please consult the documentation: https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html#updating-plugins

(But you might as well upgrade to Logstash 2.2.1. There's no reason to run anything older.)

As you said I updated the logstash. Now that is running without any error but import nothing to elsatic..?

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.38-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost/student?user=root&password="
    jdbc_user => "Croos"
    parameters => {
    }
    
    schedule => "* * * * *"
    statement => "SELECT * from subject WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => id
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
}