New config adds mappings to other indicies

Hi everybody,

I´ve got a problem with a config file, which updates mappings of existing ones.

Environment:
one Debian Server with ES 7.1.1

Configs:

system_action_job.conf

input {
  jdbc {
    jdbc_driver_library => "/opt/jdbc/mysql-latest.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://server.sae.intra:3306/application"
    jdbc_user => "user"
    jdbc_password => "password"
    statement => "SELECT * from system_action_job WHERE id > :sql_last_value order by id asc LIMIT 1000000"
    use_column_value => true
    last_run_metadata_path => "/var/lib/logstash/.system_action_job_id"
    tracking_column => "id"
  }
}

output{
   elasticsearch {
   hosts => ["localhost:9200"]
   index => "system_action_log"
   }

waf.conf

input {
  udp {
    #host => "localhost"
    port => 514
    type => barracuda

  }
}
filter {
  if [type] == "barracuda" {
# Extract Log Fields
  ruby {
        init => "
                HEADER_FIELDS = ['cef_version','Vendor','Product','DeviceVersion','SignatureId','EventName','Severity']
                #event_new = LogStash::Event.new
                      def store_header_field(event,field_name,field_data)
                          #Unescape pipes and backslash in header fields
                          event.set(field_name,field_data.gsub(/\\\|/, '|').gsub(/\\\\/, '\\')) unless field_data.nil?
                      end

        "
        code => "
                if event.get('[message][0]') == '\"'
                         event.set('[message]' , event.get('[message]')[1..-2])
                      end
                      split_data = event.get('[message]').split /(?<=[^\\]\\\\)[\|]|(?<!\\)[\|]/

                HEADER_FIELDS.each_with_index do |field_name, index|
                   store_header_field(event,field_name,split_data[index])
                end
          msg = split_data[HEADER_FIELDS.size..-1].join('|')
                if event.get('cef_version').include? ' '
                   split_cef_version= event.get('cef_version').rpartition(' ')
                   event.set('syslog', split_cef_version[0])
                   event.set('cef_version',split_cef_version[2])
                end
                event.set('cef_version', event.get('cef_version').sub(/^CEF:/, ''))
                  if not msg.nil? and msg.include? '='
                   msg = msg.strip

                   # If the last KVP has no value, add an empty string, this prevents hash errors below
             if msg.end_with?('=')
                      msg=msg + ' ' unless msg.end_with?('\=')
                   end

                   # Now parse the key value pairs into it
                   msg = msg.split(/ ([\w\.]+)=/)
                   key, value = msg.shift.split('=', 2)
             event.set(key,value.gsub(/\\=/, '=').gsub(/\\\\/, '\\'))
             Hash[*msg].each{ |k, v| event.set(k,v.gsub(/\\=/, '=').gsub(/\\\\/, '\\')) unless v.nil? }

           end
                "
  remove_field => ['message']
  }
# Filtering LogFields which are common to all Log Types
  mutate {
    convert => {"Severity" => "integer" }
  }
  grok {
    match => {"cef_version" => ".+\:%{INT:cef_version:int}" }
    overwrite => ["cef_version"]
  }
  grok {
    match => {"DeviceReceiptTime" => "^\s*%{DATA:DeviceReceiptTime}\s*$" }
    overwrite => ["DeviceReceiptTime"]
  }
  mutate {
    gsub => ["StartTime", '\"', ""]

  }
  grok {
    match => {"StartTime" => "^\s*%{DATA:StartTime}\s*$" }
    overwrite => ["StartTime"]
  }
  date {
    match => ["StartTime","MMM dd YYYY HH:mm:ss"]
    target => "StartTime"
  }
  date {
    match => ["DeviceReceiptTime","UNIX_MS"]
    target => "DeviceReceiptTime"
  }
# LogType Specific Filtering
  if [LogType] == "SYS" {
# No filtering needed


      } else if [LogType] == "AUDIT" {
        mutate {
          convert => {"TransactionID" => "integer"}
        }
        grok {
          match => {"LoginPort" => "%{INT:LoginPort:int}" }
          overwrite => ["LoginPort"]
        }
      } else if [LogType] == "NF" {

        grok {
          match => {"DestinationPort" => "%{INT:DestinationPort:int}" }
          overwrite => ["DestinationPort"]
        }
        grok {
          match => {"SourcePort" => "%{INT:SourcePort:int}" }
          overwrite => ["SourcePort"]
        }
        geoip {
          source => "SourceIP"
          target => "geoip"
          #database => "/etc/logstash/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float"]
        }
      } else if [LogType] == "WF" {
        grok {
          match => {"ProxyPort" => "%{INT:ProxyPort:int}" }
          overwrite => ["ProxyPort"]
        }

        grok {
          match => {"ServicePort" => "%{INT:ServicePort:int}" }
          overwrite => ["ServicePort"]
        }
        grok {
          match => {"ClientPort" => "%{INT:ClientPort:int}" }
          overwrite => ["ClientPort"]
        }
        geoip {
          source => "ClientIP"
          target => "geoip"
          #database => "/etc/logstash/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float"]
        }
      } else if [LogType] == "TR" {
        grok {
          match => {"ServicePort" => "%{INT:ServicePort:int}" }
          overwrite => ["ServicePort"]
        }
        grok {
          match => {"CacheHit" => "%{INT:CacheHit:int}" }
          overwrite => ["CacheHit"]
        }
        grok {
          match => {"ProxyPort" => "%{INT:ProxyPort:int}" }
          overwrite => ["ProxyPort"]
        }
        grok {
          match => {"ServerTime" => "%{INT:ServerTime:int}" }
          overwrite => ["ServerTime"]
        }
        grok {
          match => {"TimeTaken" => "%{INT:TimeTaken:int}" }
          overwrite => ["TimeTaken"]
        }


        grok {
          match => {"ServerPort" => "%{INT:ServerPort:int}" }
          overwrite => ["ServerPort"]
        }
        grok {
          match => {"BytesReceived" => "%{INT:BytesReceived:int}" }
          overwrite => ["BytesReceived"]
        }
        grok {
          match => {"BytesSent" => "%{INT:BytesSent:int}" }
          overwrite => ["BytesSent"]
        }
        grok {
          match => {"ClientPort" => "%{INT:ClientPort:int}" }
          overwrite => ["ClientPort"]
        }
        geoip {
          source => "ClientIP"
          target => "geoip"
          #database => "/etc/logstash/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float"]
        }

      }
     }

    }

    output {
    #   file {
    #      path => "/home/logstash/output.txt"
    #    }
      if [type] == "barracuda" {
       if [LogType] == "SYS" {
             elasticsearch {
                            hosts => [ "localhost:9200" ]
                            index => "system_logs"
             }
          } else if [LogType] == "TR" {
              elasticsearch {
                                hosts => [ "localhost:9200" ]
                                index => "access_logs"
              }
          } else if [LogType] == "WF" {
              elasticsearch {
                                    hosts => [ "localhost:9200" ]
                                    index => "web_firewall_logs"
              }
          } else if [LogType] == "NF" {
             elasticsearch {
                                    hosts => [ "localhost:9200" ]
                                    index => "network_firewall_logs"
             }
          } else if [LogType] == "AUDIT" {
             elasticsearch {
                                    hosts => [ "localhost:9200" ]
                                    index => "audit_logs"
             }
          }
      }
    }

If I run the "system_action_job.conf" without the "waf.conf" it´s working fine.
As soon as I activate the the "waf.conf" it´s updating the mappings of index "system_action_log" with some properties of the index "access_logs".
The error I´m getting is for example:

[2019-07-02T11:31:27,057][ERROR][logstash.filters.ruby    ] Ruby exception occurred: undefined method `join' for nil:NilClass
[2019-07-02T11:31:27,090][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"system_task_started", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x2ed00c23>], :response=>{"index"=>{"_index"=>"system_task_started", "_type"=>"_doc", "_id"=>"ZtYGsmsBk4QW43dnUSXk", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [cef_version] of type [long] in document with id 'ZtYGsmsBk4QW43dnUSXk'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"For input string: \"<46>-- MARK --\n\""}}}}}

If you need some more details, I´ll provide them of course.
Hope you guys can help me

Otherwise you need to wrap the other waf.conf output in a conditional as well, as Logstash merges all the configs together at run time.

It might be easier if you move to pipelines.

Sorry for my dumbness, but I´m relative new to ES.
Could you please explain, how to do that?

Here is a debug log from elasticsearch.log if it would help...

[2019-07-03T14:15:03,005][DEBUG][o.e.a.b.TransportShardBulkAction] [kibana] [system_action_log][0] failed to execute bulk item (index) index {[system_action_log][_doc][Ci7Ct2sBk4QW43dndRJa], source[{"@timestamp":"2019-07-03T12:15:02.857Z","host":"XXX.XXX.XXX.XXX","type":"barracuda","message":"<46>-- MARK --\n","tags":["_rubyexception","_grokparsefailure"],"@version":"1","cef_version":"<46>-- MARK --\n"}]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse field [cef_version] of type [long] in document with id 'Ci7Ct2sBk4QW43dndRJa'
        at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:280) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:468) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentParser.parseValue(DocumentParser.java:596) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentParser.innerParseObject(DocumentParser.java:407) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrNested(DocumentParser.java:381) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentParser.internalParseDocument(DocumentParser.java:98) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:71) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:267) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShard.prepareIndex(IndexShard.java:770) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:747) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:719) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$executeIndexRequestOnPrimary$3(TransportShardBulkAction.java:452) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeOnPrimaryWhileHandlingMappingUpdates(TransportShardBulkAction.java:475) ~[elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary(TransportShardBulkAction.java:450) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:218) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:161) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:153) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:141) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:79) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:1042) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:1020) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:104) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:422) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:363) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:61) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShard.lambda$wrapPrimaryOperationPermitListener$14(IndexShard.java:2538) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:61) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:269) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:236) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2513) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:979) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:359) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.1.1.jar:7.1.1]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:314) [elasticsearch-7.1.1.jar:7.1.1]
        .......
Caused by: java.lang.IllegalArgumentException: For input string: "<46>-- MARK --

I created a pipeline like this:

PUT _ingest/pipeline/access_logs
{
  "description": "access_logs",
  "processors": [
    {}
  ]
}

and edited the config in waf.conf

output {

  if [type] == "barracuda" {
   if [LogType] == "SYS" {
         elasticsearch {
                        hosts => [ "localhost:9200" ]
                        index => "system_logs"
         }
      } else if [LogType] == "TR" {
          elasticsearch {
                            hosts => [ "localhost:9200" ]
                            pipeline => "access_logs"
          }
      } else if [LogType] == "WF" {
          elasticsearch {
                                hosts => [ "localhost:9200" ]
                                index => "web_firewall_logs"
          }
      } else if [LogType] == "NF" {
         elasticsearch {
                                hosts => [ "localhost:9200" ]
                                index => "network_firewall_logs"
         }
      } else if [LogType] == "AUDIT" {
         elasticsearch {
                                hosts => [ "localhost:9200" ]
                                index => "audit_logs"
         }
      }
  }
}

Is this correct so far?

In the log I´m now getting this entrys, where the system_action_job also is updating:

[2019-07-03T14:49:11,473][INFO ][o.e.c.m.MetaDataCreateIndexService] [kibana] [logstash-2019.07.03-000001] creating index, cause [api], templates [logstash], shards [1]/[1], mappings [_doc]
[2019-07-03T14:49:11,764][INFO ][o.e.x.i.a.TransportPutLifecycleAction] [kibana] adding index lifecycle policy [logstash-policy]
[2019-07-03T14:50:13,793][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:13,899][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:13,927][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:13,931][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:13,934][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:13,962][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:14,060][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:14,065][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:14,124][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:14,195][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:36,057][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:50:51,121][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:51:43,019][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:52:16,894][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:52:17,274][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:52:26,213][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:52:53,881][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T14:53:47,197][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T15:04:35,898][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T15:16:13,801][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T15:19:46,486][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]
[2019-07-03T15:20:20,411][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [system_action_job/ODD-FJVOTVmTSgPGdKAa5Q] update_mapping [_doc]
[2019-07-03T15:20:20,444][INFO ][o.e.c.m.MetaDataMappingService] [kibana] [logstash-2019.07.03-000001/kvrPcfa-Q4-CSkBnfPZJcw] update_mapping [_doc]

I meant specifically Logstash pipelines, as there's less needed to move to that :slight_smile:

It´s solved now.
I don´t know how but after playing around and restarting the server it started to work again.
One thing is, that after every jdbc import, logstash restarted the service.
After adding a schedule it´s running without any error.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.