Logstash fails 409 conflict unknown column: 'type'\"

Hello Everyone,

I'm a newbie to logstash and i'm trying to configure logstash with manticore which is based on elasticsearch 6.3.2. And i have the following config

input {
  # Input for the first log file
  file {
    path => "${LOG_BUNDLE}/*/cvm_logs/stargate.*"  # Path to the first log file
    start_position => "beginning"
    exit_after_read => true
    sincedb_clean_after => "1 day"
    sincedb_path => "/dev/null"
    mode => "read"
    file_completed_action => "log"
    file_completed_log_path => "/dev/null"
    type => "cpp_stargate"
  }
}

filter {
  # Filter for the first log type
  if [type] == "cpp_stargate" {
    if [message] =~ /^Log|^Running/ {
      drop { }
    }
    grok {
      match => {
        "message" => "^(?<log_level>(I|W|E|F)+)(?<time>\d{4}\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{3})\d{3}Z\s+(?<pid>[^ ]+)\s+(?<source_log_filename>[^\:]+):[^\]]+]*%{GREEDYDATA:message}"
      }
      overwrite => [ "message" ]
      break_on_match => true
    }
    date {
      match => [ "time", "yyyyMMdd HH:mm:ss.SSS" ]
      target => "@timestamp"
      timezone => "UTC"
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9308"]
    index => "nu_logs1"
    data_stream => false
    document_id => "@timestamp"
    retry_on_conflict => 0
    ecs_compatibility => disabled
    manage_template => false
    ilm_enabled => false
    http_compression => false
    #action => "update"
    #doc_as_upsert => true
  }
  stdout {
    codec => rubydebug
  }
}

However, when i run this config on my log bundle i get the following error:

[ERROR] 2024-08-14 02:30:02.538 [[main]>worker10] elasticsearch - Encountered a retryable error (will retry with exponential backoff) {:code=>409, :url=>"http://localhost:9308/_bulk?filter_path=errors,items.*.error,items.*.status", :content_length=>11484, :body=>"{\"error\":\"unknown column: 'type'\"}"}

I'm not sure why this error? I don't see the usual tag errors like dateparserfailure or grokparserfailure. What could be going wrong here? Please help.

From Elastic Search to Logstash

Have you manually created the template? It seems "type" is missing.

Hi Rios,

Thanks for responding. Manticore doesn't support templates yet. So in their documentation, they have explicitly mentioned to disable manage_template which i have done.

https://manticoresearch.com/blog/integration-of-manticore-with-logstash-filebeat/

Am i missing anything? Appreciate your help.

Check does the log start with Log or Running.
Go to Elasticsearch and also check is there the template dpkg_log and does it have the type field.
Also you can add below date to remove type from JSON structure for ES:
mutate { remove_field => ["type"] }

Hi Rios,

Removing the type field still the same error so did some research on manticore and changed the config as follows:

input {
  # Input for the first log file
  file {
    path => "${LOG_BUNDLE}/*/cvm_logs/stargate.*"  # Path to the first log file
    start_position => "beginning"
    exit_after_read => true
    sincedb_clean_after => "1 day"
    sincedb_path => "/dev/null"
    mode => "read"
    file_completed_action => "log"
    file_completed_log_path => "/dev/null"
    type => "cpp_stargate"
  }
}

filter {
  # Filter for the first log type
  if [type] == "cpp_stargate" {
    if [message] =~ /^Log|^Running/ {
      drop { }
    }
    grok {
      match => {
        "message" => "^(?<log_level>(I|W|E|F)+)(?<time>\d{4}\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{3})\d{3}Z\s+(?<pid>[^ ]+)\s+(?<source_log_filename>[^\:]+):[^\]]+]*%{GREEDYDATA:message}"
      }
      overwrite => [ "message" ]
      break_on_match => true
    }
    date {
      match => [ "time", "yyyyMMdd HH:mm:ss.SSS" ]
      target => "@timestamp"
      timezone => "UTC"
    }
  }

  # Extract the source file name from the path
  mutate {
    add_field => {
      "filepath" => "%{path}"
      "filename_without_ext" => "%{path}"
      "filename" => "%{path}"
      "log_bundle_path" => "${LOG_BUNDLE}"
    }
  }
  # Extract the base filename from the full path
  mutate {
    gsub => ["filepath", "^.*/", ""]
  }

  mutate {
    update => { "time" => "@timestamp" }
  }

  mutate { remove_field => ["type", "tags", "@timestamp", "@version", "log", "event" ] }

  # Calculate a hash value from the source filename
  #ruby {
  #  code => "
  #    require 'digest'
  #    filename = event.get('filepath')
  #    hash = Digest::MD5.hexdigest(filename)
  #    index_number = hash.to_i(16) % 32 + 1
  #    event.set('index_number', index_number)
  #  "
  #}
}

output {
  elasticsearch {
    hosts => ["http://localhost:9308"]
    #index => " nu_logs"
    index => " nu_logs1"
    ilm_enabled => false
    manage_template => false
    http_compression => false
    #document_id => "@timestamp"
  }
}

After that now, i see that there are no other unknown column issues as before and now i face this error where it says this nu_logs1 index is absent in the elasticsearch as follows:

[WARN ] 2024-08-15 00:21:01.145 [[main]>worker30] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>" nu_logs1", :routing=>nil}, {"message"=>" ParseReturnCodes: Backend returns error kCASFailure for <vdisk, block>: <26472428930, 26624>", "filename"=>"%{path}", "log_bundle_path"=>"/home/ml/log_bundles/NCC-logs-06-14-2024-1718330601674411443-0005c928-4262-a90d-4a5b-d4f5ef3c9d60", "time"=>"@timestamp", "filepath"=>"%{path}", "pid"=>"5203", "log_level"=>"E", "filename_without_ext"=>"%{path}", "source_log_filename"=>"vdiskmap_ops.cc", "host"=>{"name"=>"ml-worker"}}], :response=>{"index"=>{"_index"=>" nu_logs1", "_type"=>"doc", "_id"=>"0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"table ' nu_logs1' absent, or does not support INSERT"}}}}

What could be the reason?

This table is defined in the manticore DB:

mysql> DESCRIBE nu_logs1;
+----------------------+-----------+----------------+
| Field                | Type      | Properties     |
+----------------------+-----------+----------------+
| id                   | bigint    |                |
| message              | text      | indexed stored |
| filename_without_ext | string    |                |
| filename             | string    |                |
| filepath             | string    |                |
| time                 | timestamp |                |
| log_level            | string    |                |
| source_log_filename  | string    |                |
| pid                  | bigint    |                |
| cvm_ip               | string    |                |
| log_bundle_path      | string    |                |
| anomaly              | bool      |                |
| anomaly_category     | string    |                |
+----------------------+-----------+----------------+
13 rows in set (0.00 sec)

These are shards that belongs to the table nu_logs

mysql> DESCRIBE nu_logs;
+-----------+-------+
| Agent     | Type  |
+-----------+-------+
| nu_logs1  | local |
| nu_logs2  | local |
| nu_logs3  | local |
| nu_logs4  | local |
| nu_logs5  | local |
| nu_logs6  | local |
| nu_logs7  | local |
| nu_logs8  | local |
| nu_logs9  | local |
| nu_logs10 | local |
| nu_logs11 | local |
| nu_logs12 | local |
| nu_logs13 | local |
| nu_logs14 | local |
| nu_logs15 | local |
| nu_logs16 | local |
| nu_logs17 | local |
| nu_logs18 | local |
| nu_logs19 | local |
| nu_logs20 | local |
| nu_logs21 | local |
| nu_logs22 | local |
| nu_logs23 | local |
| nu_logs24 | local |
| nu_logs25 | local |
| nu_logs26 | local |
| nu_logs27 | local |
| nu_logs28 | local |
| nu_logs29 | local |
| nu_logs30 | local |
| nu_logs31 | local |
| nu_logs32 | local |
+-----------+-------+
32 rows in set (0.00 sec)

Kindly help. Any ideas?

Please close this ticket. I was able to resolve this using the following config:

input {
  # Input for the first log file
  file {
    path => "${LOG_BUNDLE}/*/cvm_logs/stargate.*"  # Path to the first log file
    start_position => "beginning"
    exit_after_read => true
    sincedb_clean_after => "1 day"
    sincedb_path => "/dev/null"
    mode => "read"
    file_completed_action => "log"
    file_completed_log_path => "/dev/null"
    type => "cpp_stargate"
  }
}

filter {
  # Filter for the first log type
  if [type] == "cpp_stargate" {
    #if [message] =~ /^Log|^Running/ {
    #  drop { }
    #}
    grok {
      match => {
        "message" => "^(?<log_level>(I|W|E|F)+)(?<log_time>\d{4}\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{3})\d{3}Z\s+(?<pid>[^ ]+)\s+(?<source_log_filename>[^\:]+):[^\]]+]*%{GREEDYDATA:message}"
      }
      overwrite => [ "message" ]
      break_on_match => true
    }
    date {
      match => [ "log_time", "yyyyMMdd HH:mm:ss.SSS" ]
      target => "@timestamp"
      timezone => "UTC"
    }
  }

  # Calculate a hash value from the source filename
  ruby {
    code => "
      require 'digest'
      timestamp = event.get('@timestamp')
      epoch_timestamp = timestamp.to_i
      event.set('time', epoch_timestamp)
      index_number = epoch_timestamp % 32 + 1
      event.set('index_number', index_number)
    "
  }
  # Extract the source file name from the path
  mutate {
    add_field => {
      "filepath" => "%{[@metadata][path]}"
      "filename_without_ext" => "%{[@metadata][path]}"
      "filename" => "%{[@metadata][path]}"
      "log_bundle_path" => "${LOG_BUNDLE}"
      "[@metadata][shard_id]" => "%{index_number}"
      "[@metadata][_id]" => "@timestamp"
    }
    remove_field => [
      "type", "tags", "@timestamp", "@version",
      "log", "event", "index_number", "host", "log_time"
    ]
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9308"]
    index => "nu_logs%{[@metadata][shard_id]}"
    ilm_enabled => false
    manage_template => false
    http_compression => false
    #document_id => "%{[@metadata][_id]}"
  }
  #stdout {
  #  codec => rubydebug
  #}
}

Thanks for all the help!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.