Single data source split to different indexes

Morning all !

i have a question,

if i have a json data source like :

{
   "log_url" => "http://127.0.0.1/log.txt",
       "key" => "22op3dfe",
   "raw_msg" => "404.19 – Denied by filtering rule",
       "MD5" => "2c5cddf13ab55a1d4eca955dfa32d245",
    "syntax" => "text",
  "@version" => "1",
    "SHA256" => "766be5c99ba674f985ce844add4bc5ec423e90811fbceer5ec84efa3cf1624f4",
      "user" => "user",
       "URL" => "http://127.0.0.1",
  "YaraRule" => [
    [0] "no_match"
],
    "expire" => "0",
      "size" => 107,
    "source" => "localhost",
       "Msg" => "404 OK",
  "filename" => "log.txt",
"@timestamp" => 2020-01-07T13:59:04.000Z
}

and all of this is processed and send to index1 but i want to split off :
"MD5" => "2c5cddf13ab55a1d4eca955dfa32d245"
and
"@timestamp" => 2020-01-07T13:59:04.000Z
into index 2

does this mean i have to ;

A) Run an ingest process twice on the same data source and in process one.. drop MD5 and Timestamp and push remaining to index 1 and then rerun the ingest and drop everything except MD5 and Timestamp into index 2

B) use IF statements on the output for fields; so IF field name is MD5 then > index 2

C) Config a complex conf that processes the json source, then pushes MD5 and Timestamp to @Metadata then at the end, read metadata and push it to Index 2

D) do some other uber process that i'm not aware of yet to accomplish this much easier leaving time to drink my warm coffee before it gets stone cold.

If this is something that can or needs to be done with Filebeat for example, please let me know, i'm hoping it can all be done in logstash.

Thanks

You may find the forked-path pattern for pipeline-to-pipeline communications useful.

Thanks Badger, i do think we're on to something here :slight_smile:

So i did some research found some code @magnusbaeck posted on Stackoverflow and adapted it.

so the logic behind this is logstash conf is executed under main-intake which then splits it into main-index and 1st-index.

main index drops everything relating to HTTP.Data field
1st-index drops everything except user.data* (* here means everything user.data.1, user.data.2, user.data.3 etc ( is this the correct way to state wildcard in pipelines?))

- pipeline.id: main-intake
  queue.type: persisted
  path.config: "/etc/logstash/conf.d/main.conf"
  config.string: |
    output { pipeline { send_to => ["main-index", "1st-index"] } }
- pipeline.id: main-index
  queue.type: persisted
  config.string: |
    input { pipeline { address => "main-intake" } }
    filter {
       ruby {
         code => "
           event.to_hash.keys.each { |k|
             if k.start_with?('HTTP.Data')
               event.remove(k)
             end
            }
          "
    }
    }
  output { elasticsearch { } }
- pipeline.id: 1st-index
  queue.type: persisted
  config.string: |
    input { pipeline { address => "main-intake" } }
    filter {
      ruby {
        code => "
          wanted_fields = ['user-data*']
          event.to_hash.keys.each { |k|
            event.remove(k) unless wanted_fields.include? k
          }
        "
     }
    }
    output { elasticsearch { } }  

Now, reading through this pipeline, im hit by some thoughts, the queue types... persisted and memory... which would be better? i see this as a choice over performance Vs. disk space/resources/iops.

also, i'm cascading data through three pipelines in order to filter data and inverse filtering data which is a little crazy, but would be excellent... if... it works. which it doesn't

Logstash reports it can't read the pipeline and exits

[2020-01-20T17:16:39,185][DEBUG][logstash.config.source.multilocal] Reading pipeline configurations from YAML {:location=>"/etc/logstash/pipelines.yml"}
[2020-01-20T17:16:39,366][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
[2020-01-20T17:17:09,383][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}

this is the excerpt from my logstash log.... ( is there any other place pipelines debug events are stored ? )

Still getting same issues.
Tired swapping out the pipelines.yml file whilst logstash was still running and its reporting:

[2020-01-22T09:33:11,758][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2020-01-22T09:33:11,759][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2020-01-22T09:33:12,816][DEBUG][logstash.config.source.multilocal] Reading pipeline configurations from YAML {:location=>"/etc/logstash/pipelines.yml"}
[2020-01-22T09:33:12,819][ERROR][logstash.config.sourceloader] Could not fetch all the sources {:exception=>LogStash::ConfigurationError, :message=>"Failed to read pipelines yaml file. Location: /etc/logstash/pipelines.yml, Exception: #<Psych::SyntaxError: (<unknown>): could not find expected ':' while scanning a simple key at line 29 column 1>", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/config/source/multi_local.rb:82:in `read_pipelines_from_yaml'", "/usr/share/logstash/logstash-core/lib/logstash/config/source/multi_local.rb:67:in `retrieve_yaml_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/config/source/multi_local.rb:17:in `pipeline_configs'", "/usr/share/logstash/logstash-core/lib/logstash/config/source_loader.rb:61:in `block in fetch'", "org/jruby/RubyArray.java:2579:in `collect'", "/usr/share/logstash/logstash-core/lib/logstash/config/source_loader.rb:60:in `fetch'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:149:in `converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:114:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/stud-0.0.23/lib/stud/interval.rb:18:in `interval'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:103:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:369:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}
[2020-01-22T09:33:12,822][DEBUG][logstash.agent           ] Could not fetch the configuration to converge, will retry {:message=>"Failed to read pipelines yaml file. Location: /etc/logstash/pipelines.yml, 

Seems its getting stuck on this

(<unknown>): could not find expected ':'

Yet

- pipeline.id: 1st-index

is perfectly fine.. if i alter the variables and comment out stuff and make it work reading a flat file it parses... so the pipeline object here is workable... which suggests to me that its something previous to this

output { pipeline { send_to => ["main-index", "1st-index"] } }

IS it maybe that i can't fork this correctly or it parse to main-index, but can't pipe to 1st-index? ( i've altered the pipeline names as well removing the - just in case and that doesn't work

What does the file look like at and just before line 29?

- pipeline.id: intake
  queue.type: persisted
  path.config: "/etc/logstash/conf.d/intake.conf"
  config.string: |
    output { pipeline { send_to => ["mainindex","test"] } }
- pipeline.id: mainindex
  queue.type: persisted
  config.string: |
    input { pipeline { address => "intake" } }
    filter {
       ruby {
         code => "
           event.to_hash.keys.each { |k|
             if k.start_with?('HTTP.Data')
               event.remove(k)
             end
            }
          "
    }
  output { elasticsearch { } }
- pipeline.id: test
  queue.type: persisted
  config.string: |
    output { elasticsearch { } }
- pipeline.id: finalindex
  queue.type: persisted
  config.string: |
    input { pipeline { address => "intake" } }
    filter {
      ruby {
        code => "
          wanted_fields = ['user.data']
          event.to_hash.keys.each { |k|
            event.remove(k) unless wanted_fields.include? k
          }
        "
     }
    }
    output { elasticsearch { } }

the section in question is

  output { elasticsearch { } }
- pipeline.id: test
  queue.type: persisted
  config.string: |
    output { elasticsearch { } }

I can't see anything wrong with it, each pipeline works separately if i remove the ruby code and ingest straight into ES so i'm thinking its something to do with how the data is being split and ingested or possibly once processed, the script takes too long to complete and can't move it straight away to the next pipeline? Timeout maybe?

i'm going to look into the logstash.yml and see if there's anything there that might need to be added/amended to cater for this particular pipeline process, but i'm not hopeful.

So i looked through the logstash.yml documentation and nothing there apart from pipeline logging.... yah!!!

log.level: debug
path.logs: /var/log/logstash
pipeline.seperate_logs: true


[2020-01-23T09:09:43,043][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<ArgumentError: Setting "pipeline.seperate_logs" hasn't been registered>, :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/settings.rb:69:in `get_setting'", "/usr/share/logstash/logstash-core/lib/logstash/settings.rb:102:in `set_value'", "/usr/share/logstash/logstash-core/lib/logstash/settings.rb:121:in `block in merge'", "org/jruby/RubyHash.java:1417:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/settings.rb:121:in `merge'", "/usr/share/logstash/logstash-core/lib/logstash/settings.rb:179:in `validate_all'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:284:in `execute'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/clamp-0.6.5/lib/clamp/command.rb:67:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:242:in `run'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/clamp-0.6.5/lib/clamp/command.rb:132:in `run'", "/usr/share/logstash/lib/bootstrap/environment.rb:73:in `<main>'"]}
[2020-01-23T09:09:43,072][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

Booooh!! :frowning:
<ArgumentError: Setting "pipeline.seperate_logs" hasn't been registered>
what the hell does that mean ? :frowning:

doh!!

pipeline.separate_logs

not

pipeline.seperate_logs

ok, so its not logging the separate pipelines into logs... maybe this is due to no data collected which might mean one of two things:

either its reading the entire pipeline.yml and checking to make sure it can execute and if it can it starts the process then logs activity on each pipeline or it isn't executing the first pipeline and doesn't start the whole process... which brings me back around to the whole won't fork, can't fork angle.

Ok.. got past the errors...

it was output { elasticsearch { } }

needs to be output { elasticsearch { hosts => [http://127.0.0.1:9200} }

now i have a different issue... which is ruby related i think...

Jan 23 09:47:39 buntu1804 logstash[100788]: [2020-01-23T09:47:39,593][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:mainindex, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"=>\" at line 12, column 24 (byte 237) after filter {\n ruby {\n code => \"\n event.to_hash.keys.each { |k|\n if k.start_with?('HTTP.Data')\n event.remove(k)\n end\n }\n \"\n}\noutput { elasticsearch ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:41:in compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:49:in compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in block in compile_sources'", "org/jruby/RubyArray.java:2584:in map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:10:in compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:156:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:27:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:36:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:326:in block in converge_state'"]}

  config.string: |
    input { pipeline { address => "intake" } }
    filter {
       ruby {
         code => "
           event.to_hash.keys.each { |k|
             if k.start_with?('HTTP.Data')
               event.remove(k)
             end
            }
          "
    }

line 12 is event.to_hash.keys.each { |k|

For the mainindex pipeline you are missing a } to close out the ruby filter.

yup, noticed that, but didn't make any difference when i put it in :stuck_out_tongue:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.