Accessing/comparing fields from two different input plugins in filter, simultaneously

I want to use file input along with Elasticsearch query input and then in filter I want to drop the Elasticsearch query results that don't quite match my file input.

Now here is my problem:
I can't seem to access and compare fields from both inputs simultaneously.

There are neither any errors nor an output.

Logstash stops at:

[INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ][filewatch.observingtail  ][main] START, creating Discoverer, Watch with file and sincedb collections
[INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}

Here is my conf:

input {
 file{
 path => "/path/to/input" 
 start_position => "beginning"
 type => "file_input"
}
elasticsearch{
	 hosts => ["localhost:9200"]
	index => "index*"
	query => '{ "query": {

                "range": {
                    "@timestamp": {
                        "gt": "now-1d/d",
                        "lt": "now"
                    }
                }
              
 
  }}'
	type => "es"
}
#stdin{}
}
filter{

	if [type] == "file_input"{
		grok{
			match => ["message","(?<ip>.*)"]
		}
		
	}
if [type] == "es" {
			if [Clientip] != [ip]  {
				drop{}
			}
		}
		
}
output {
  csv {
    fields => ["@timestamp","server", "Solution"]
    
path => "/path/to/output.csv"
  }
stdout{}
}

My input file has an ip like this:

"1.2.3.4"

Can someone help, please! Thanks!

do both inputs produce outputs? what kind of outputs are produced by each output, and which fields do you want to compare ?

That is correct. The events from the two inputs are processed independently. You cannot reference fields from an event produced by the file input when processing an event produced by the elasticsearch input.

1 Like

can both inputs combined to a common output pipeline using the collector pattern in pipeline-to-pipeline? if so, will fields from each inputs accessible to a filter in the common output pipeline ?

Thanks for the reply!
How do you suggest I accomplish my task:
I've to use a make a dynamic elasticsearch query filter in logstash, which filters on values of a field i provide, hence the file input and then give out an output file.

Example:

query => "ip:<dynamic_value> AND @timestamp:[now-1d/d TO now]"

Please help.

Using an elasticsearch filter sounds like a promising approach, but I do not run elasticsearch, so I cannot advise you on configuring it.

Thanks for replying @Badger
I tried using elasticsearch filter, I didn't get any output. I'll try once again and post about it.

But is there a pre requisite to use elasticsearch input plugin along with elasticsearch filter plugin? I mean does it filter the output from input plugin?

The Logstash document of elasticsearch filter plugin does not show the input part of it at all. It'd be great if you could shed some light on it.

No, they are independent.

No matter what input plugin I use, or even if I don't use an input plugin, I don't get any output for elasticsearch filter. That's why I was asking about it's compatibility with an input plugin.

Here's my conf:

input{}
filter{
	elasticsearch{
		hosts => ["localhost:9200"]
	    index => ["index*"]
	    query => "ip:1.2.3.4"
}
		
}
output {
 
stdout{}
}

For the above conf, I get following output:

[logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
[logstash.runner          ] Logstash shut down.

If I use an stdin{} plugin I get:

And i get desired output for:

GET index*/_search?q=ip:1.2.3.4

The output for above query is company sensitive, so please don't ask for it.
Please help me out. Thanks!

@Badger
I'm getting some output, still not right.
Here's my config:

input{
 stdin{}
}
filter{
	grok{
        match => {
            "message" => "(?<ip>%{IP})%{GREEDYDATA}"
        }
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => ["index*"]
        query => "xforwarded1:%{ip} AND @timestamp:[now-1d/d TO now]"
	fields => {
  "message" => "log"
}
    }
		
}
output {
  stdout{}
}

field was the catch!

But I can only see one hit, while my console shows 16 hits. Any remedy?

Thanks

Here's the solution:

input{
 stdin{}
}
filter{
	grok{
        match => {
            "message" => "(?<ip>%{IP})%{GREEDYDATA}"
        }
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => ["index*"]
	result_size => 100
        query => "xforwarded1:%{ip} AND @timestamp:[now-1d/d TO now]"
	fields => {
  		"message" => "log"
	}
	
    }
	split {
   field => "log"
 }	
}
output {
   stdout{}
}

Use result_size and split to make the output complete and presentable. Target achieved. Thanks.