Compare two datasets (failed)

I am configure LOGSTASH to compare two dataset but something is wrong.

tail /var/log/logstash/logstash-plain.log

  [2019-12-16T10:12:14,264][ERROR][logstash.javapipeline    ][main] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<LogStash::Filters::Dictionary::DictionaryFileError: Translate: (<unknown>): expected '<document start>', but found '<scalar>' at line 1 column 17 when loading dictionary file at /opt/talos/talos.yaml>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:238:in `parse'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/psych.rb:459:in `parse_stream'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-translate-3.2.3/lib/logstash/filters/dictionary/yaml_file.rb:19:in `read_file_into_dictionary'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-translate-3.2.3/lib/logstash/filters/dictionary/file.rb:101:in `merge_dictionary'", "org/jruby/RubyMethod.java:132:in `call'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-translate-3.2.3/lib/logstash/filters/dictionary/file.rb:66:in `load_dictionary'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-translate-3.2.3/lib/logstash/filters/dictionary/file.rb:53:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-translate-3.2.3/lib/logstash/filters/dictionary/file.rb:15:in `create'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-translate-3.2.3/lib/logstash/filters/translate.rb:166:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:199:in `block in register_plugins'", "org/jruby/RubyArray.java:1800:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:198:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:475:in `maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:211:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:153:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:108:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/01-input-beats.conf", "/etc/logstash/conf.d/11-filter-beats.conf", "/etc/logstash/conf.d/21-elasticsearch-output.conf", "/etc/logstash/conf.d/intelmq.conf"], :thread=>"#<Thread:0x2e8efcfa run>"}

My configuration of TALOS.YAML

  cat /opt/talos/talos.yaml 
  "199.249.230.73":"true"
  "199.249.230.74":"true"  
  "199.249.230.75":"true"

My configuration on .CONF files

cat /etc/logstash/conf.d/01-input-beats.conf

  # Inputs 
  input {
    # Ingest logs that match the Beat template
    beats {
      # Accept connections on port 5044
      port => 5044
      } 
   }

cat /etc/logstash/conf.d/11-filter-beats.conf

  # Filters
  filter {
      if "zeek" in [tags] {
          # Extract the json into Key value pairs
          json {
              source => "message"
          }     
          mutate {
          remove_field => ["message"]
          }
           translate {
           field => "[id][resp_h]"
           destination => "malicious_IP"    
           dictionary_path => '/opt/talos/talos.yaml'
           override => true
           }
      }
   }

cat /etc/logstash/conf.d/21-elasticsearch-output.conf

  # Outputs 
  output {
    # Send logs that contain the zeek tag too
    if "zeek" in [tags] {
      # Outputting logs to elasticsearch
      elasticsearch {
        # ES host to send logs too
        hosts => ["http://localhost:9200"]
        # Index to store data in
        index => "filebeat-zeek-%{+YYYY.MM.dd}"
        } 
    stdout {
    codec => rubydebug
    }
     }
   }

What is wrong?

  • I need to compare and add new field if it is true.

I use the reference: Compare two datasets (Logstash)

Thanks

I resolved the issue now.

Host: Host ELK.
File: /opt/talos/talos.csv
Content of the file:

   # IP Maliciosa - https://iplists.firehol.org/?ipset=talosintel_ipfilter
    8.8.8.4,malicious_IP
    89.248.172.196,malicious_IP
    211.57.200.56,malicious_IP 
    23.102.61.2,malicious_IP

File_config: /etc/logstash/conf.d/01-input-beats.conf

      # Inputs are used to ingest logs from remote logging clients
      input {
        # Ingest logs that match the Beat template
        beats {
          # Accept connections on port 5044
          port => 5044
          codec => "json"
          } 
       }

File_config: /etc/logstash/conf.d/11-filter-beats.conf

      # Filters
      filter {
      # Only apply these transformations to logs that contain the "zeek" tag
          if "zeek" in [tags] {
               translate {
                field => "[id.resp_h]"
                destination => "malicious_IP"    
                dictionary_path => "/opt/talos/talos.csv"
                override => true
               }
          }
       }

File_config: /etc/logstash/conf.d/21-elasticsearch-output.conf

      # Outputs 
      output {
        # Send logs that contain the zeek tag too
        if "zeek" in [tags] {
          # Outputting logs to elasticsearch
          elasticsearch {
            # ES host to send logs too
            hosts => ["http://localhost:9200"]
            # Index to store data in
            index => "filebeat-zeek-%{+YYYY.MM.dd}"
            } 
      	stdout {
      	codec => rubydebug
      	}
         }
       }

On the other hand, the configuration of the filebeat.yml

Host: Host Zeek.
File: /etc/filebeat/filebeat.yml
Content of the file:

          filebeat.inputs:
          - type: log
            # Change to true to enable this input configuration.
          enabled: true
         # Paths that should be crawled and fetched. Glob based paths.
              paths:
         ## Logs from Zeek
          - /opt/zeek/logs/current/*.log
          tags: ["zeek"]
          processors:
            - add_host_metadata: ~
            - add_cloud_metadata: ~
            - add_docker_metadata: ~
            - add_kubernetes_metadata: ~

How I test this configuration:

(1) I send a conection to a external host that is not in the "malicious list" (talos.csv)

(2) Check that zeek show the connection


(3) Add the new ip to the "malicious list" file (talos.csv)

(4) Check (few minutes later) thant Kibana show the new information

Thats OK for me. :white_check_mark:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.