(Updated 2019/06/19) Seeking suggestion for uploading data via logstash

Hi all,

I have some data that I want to upload via logstash.

The data is about 100MB a day and stored locally.

I want to do the following:

  1. Upload the old data from the past 3 years (about 100GB).

  2. Upload the new data to elasticsearch daily. (One index per day)

Any suggestion on how to tackle down these?

Thanks.

----------------updated: 2019/06/19----------------

Thanks to Kiran, I think I got the daily update part.

Let me be a bit more specific about the old data.

Here is the logstash config file we have right now.

(the repetitive part in the filter is removed)

input{
  file{
      path => "/data/threat_event/201801/*/*.csv"
      start_position => "beginning"
      sincedb_path => "/dev/null"
      max_open_files => 65535
  }
}

filter{

  dissect {
    mapping => {
      "path" => "/%{}/%{attack}/%{month}/%{}/%{type}_%{}"
    }
    add_tag => ["%{type}"]
  }

  if "all" in [tags] {
    csv {
      autodetect_column_names => "true"
      autogenerate_column_names => "true"
      skip_header => "true"
      separator => ","

    translate {
      iterate_on => "defender_id"
      field => "defender_id"
      destination => "defender_id_name"
      override => false
      dictionary_path => ["/data/mapping-table/defender-mapping.csv"]
    }

  date {
    locale => "en"
    match =>  [ "timestamp", "EEE dd MMM yyyy HH:mm:ss z"]
  }
}

output
{
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "cv_%{[attack]}_%{[type]}_%{[month]}"
  }
}

The index template is like this.

PUT /_template/template_all
{
  "index_patterns": ["cv_threat_event_all_*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_source": {
      "enabled": true
    },
   "properties": {
    "event_count": {"type": "long"}
      },
    "date_detection": true,
    "dynamic_date_formats": ["EEE dd MMM yyyy HH:mm:ss z"]
  }
}

Since creating one index per day makes too many shards, we decide to go with one index per month.

I know the code might be choppy but we are still figuring things out.

Feel free to make any comment or suggestion.

Thank you.

Can you please describe your requirement briefly.

Hi @leeyu

If you want to upload the data daily basis then you need to use the functionality of the crond sheduler services.

Managing the cron service is easy because it doesn't need to be reloaded or restarted to activate changes to their configuration. The cron daemon wakes up every minute and checks its configuration to see whether anything needs to be started.

A cron table includes six fields separated by space or tab characters. The first five fields specify the times to run the command, and the sixth field is the absolute pathname to the command to be executed. You can place commands in a shell script and schedule it to run repetitively while the sixth field is the absolute pathname to the shell script. The initial section of the cron table specifies the environment used while executing commands. The remainder of the file contains comments that identify the format of a cron table entry.

# cat /etc/crontab 
SHELL=/bin/bash
PATH=/sbin:/bin:/usr/sbin:/usr/bin
MAILTO=root

# For details see man 4 crontabs

# Example of job definition:
# .---------------- minute (0 - 59)
# |  .------------- hour (0 - 23)
# |  |  .---------- day of month (1 - 31)
# |  |  |  .------- month (1 - 12) OR jan,feb,mar,apr ...
# |  |  |  |  .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# |  |  |  |  |
# *  *  *  *  * user-name  command to be executed

So you can schedule a Elasticsearch backup as below

# the cron 0 0 * * * /usr/local/bin/backup.sh can be define as below
@daily bin/logstash -f /path/to/config/logstash.conf

You have to set the path to the Logstash config file correctly.

Put this in a file e.g. /path/to/config/logstash.conf

input {
  file {
    path => "/home/system/Documents/LOG Data File/app2.log"
    start_position => "beginning"
  }

}

filter {}

output {
  elasticsearch {
    hosts => "["127.0.0.1:9200"]"
  }
}

Thanks
Kiran

2 Likes

Hi John,

I already updated the post.

Please take a look.

Thanks.

Hi Kiran,

What a well-detailed response! It is very helpful, I really appreciate it.

I also updated my post about my current approach for the historical data.

Feel free to make any comment about it.

Thank you very much.

Note that with your configuration logstash will not exit, it will just keep waiting for new data, so you will need to kill the process when it is finished, or prior to starting the next one.

For the load of the historical data I suggest you look at using the file input in read mode rather than tail mode, but make sure you are aware of the default for file_completed_action. I don't think it makes any sense to have max_open_files much larger than the number of CPUs on the server. There is no reason to open files that the server cannot process -- better to just keep them in the queue. Also, for the historical data you probably do want to persist the sincedb.

1 Like

Hi Badger,

Thank you for another great response.

I am still very lack of experience so any suggestion is welcomed.

I will research about it.

Hi @leeyu

You need to use the two Logstash process for both operation.

First Operation. (Logstash 1) ==> Upload the old data from the past 3 years (about 100GB).

Second Operation. (Logstash 2) ==> Upload the new data to elasticsearch daily. (One index per day)

For First Operation , you need to use the logstash persistent Queue to avoid the loss of data because size of data is 100GB.

In order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which will store the message queue on disk. Persistent queues provide durability of data within Logstash.

The queue sits between the input and filter stages in the same process:

input → queue → filter + output

When an input has events ready to process, it writes them to the queue. When the write to the queue is successful, the input can send an acknowledgement to its data source.

2 Likes

Hi Kiran,

Thank you very much for the information about the queue.

I did not know about this feature before.

I will look into it today.

Hi Kiran,

After changing the queue.type: persisted and use the default setting.

It doesn't start uploading.

The log is like this

[2019-06-21T09:50:28,533][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.1.1"}
[2019-06-21T09:50:33,196][WARN ][logstash.runner          ] SIGTERM received. Shutting down.
[2019-06-21T09:50:34,293][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2019-06-21T09:50:34,430][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2019-06-21T09:50:34,469][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2019-06-21T09:50:34,472][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2019-06-21T09:50:34,494][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2019-06-21T09:50:34,506][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2019-06-21T09:50:34,611][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-06-21T09:50:34,873][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, :thread=>"#<Thread:0x5ac8e89 run>"}
[2019-06-21T09:50:35,137][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
[2019-06-21T09:50:35,200][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch with file and sincedb collections
[2019-06-21T09:50:35,205][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-06-21T09:50:35,492][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2019-06-21T09:50:40,419][INFO ][filewatch.observingtail  ] QUIT - closing all files and shutting down.

Am I missing any setting?

It hasn't even finished starting before it gets a SIGTERM telling it to shut down.

1 Like

Hi Badger,

Sorry I did not notice that but I don't think I sent any sigterm to it.

I'll look into it. Thanks.

Hi @leeyu,

Have you resolve this issue ?

If you haven't resolve yet this issue then please provide the logstash settings file like "logstash.yml" and "logstash.conf".

Thanks,
Kiran

2 Likes

Hi Kiran,

I apologize for the delay.

I have been working with elk in the past 6 weeks and new tasks keep coming so I totally forgot about this post.

I have figured it out with the suggestion provided in this post.

Thanks for asking.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.