Use of variables in a query in logstash

Hi,
I'm using ELK 7.4.0.
I want to use dynamic variables in logstash configuration file.
Plan A: There is an input plugin called elasticsearch for querying elasticsearch, can I use input from a file and then put its output as input to the elasticsearch input plugin? How?
OR
Plan B: Can I use input from file and then in filter query elasticsearch? The only problem in this plan is how do I use a date range along with term filter in that query?

Please help me execute either of those plans or suggest a new one thanks!

This would work well. The elasticsearch filter has a query parameter which lets you define query string and range filtering is supported in the query strings.

The other option is to instead use the query_template parameter which allows full fuse of Elasticsearch's query DSL.

plan A won’t work. you use elasticsearch input plugin to search for events in elasticsearch instance. if you want to read from a file then use file input plugin and elasticsearch filter as in plan B.

Hi
Can you show me an example executing a query with a query template?
This document from Elastic shows them independently
https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html#plugins-filters-elasticsearch-query

Hi all,

I tried plan B and elasticsearch filter does not quiet work for me.
I'm thinking of a plan C:
What if I use file input along with Elasticsearch query input and then in filter i drop the Elasticsearch query results that don't quite match my file input?

Now here is my problem:
I can't seem to access and compare fields from both inputs simultaneously.
Here is my conf:

input {
 file{
 path => "/path/to/input" 
 start_position => "beginning"
 type => "file_input"
}
elasticsearch{
	 hosts => ["localhost:9200"]
	index => "index*"
	query => '{ "query": {

                "range": {
                    "@timestamp": {
                        "gt": "now-1d/d",
                        "lt": "now"
                    }
                }
              
 
  }}'
	type => "es"
}
#stdin{}
}
filter{

	if [type] == "file_input"{
		grok{
			match => ["message","(?<ip>.*)"]
		}
		
	}
if [type] == "es" {
			if [Clientip] != [ip]  {
				drop{}
			}
		}
		
}
output {
  csv {
    fields => ["@timestamp","server", "Solution"]
    
path => "/path/to/output.csv"
  }
#stdout{}
}

I am not able to drop logs because I can't access fields from two different inputs.

Can someone help, please! Thanks!

Plan C is not going to work because two inputs can never share events with each other. You have to use one input plugin and then access its events in the filter plugin. What is not working in the Elasticsearch filter when you try plan B?

No matter what input plugin I use, or even if I don't use an input plugin, I don't get any output for elasticsearch filter. That's why I was asking about it's compatibility with an input plugin.

Here's my conf:

input{}
filter{
	elasticsearch{
		hosts => ["localhost:9200"]
	    index => ["index*"]
	    query => "ip:1.2.3.4"
}
		
}
output {
 
stdout{}
}

For the above conf, I get following output:

[logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
[logstash.runner          ] Logstash shut down.

If I use an stdin{} plugin I get:

And i get desired output for:

GET index*/_search?q=ip:1.2.3.4

The output for above query is company sensitive, so please don't ask for it.

Once I can use elasticsearch filter, I think I can achieve my objective.

Please help me out. Thanks!

Please try this:

  1. The file goes in the input filter. Update the path to whatever is valid.
  2. Since I do not know what the messages in your file look like, I used a basic grok pattern to pull out the ip field, so update this grok pattern to whatever is applicable otherwise you will get grokparsefailure and your Elasticsearch filter in step below will not return results again.
  3. Reference that ip field available in step 2 in the query of the Elaticsearch filter. If you change the name of the field from ip to something else, please change the reference too. Update the host if your Elasticsearch is not running on localhost.
  4. See what it outputs in stdout before using the elasticsearch output plugin .
input {
    file{
        path => "/path/to/input" 
        start_position => "beginning"
    }
}
filter{
    grok{
        match => {
            "message" => "(?<ip>%{IP})%{GREEDYDATA}"
        }
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => ["index*"]
        query => "ip:%{ip}"
    }
}
output {
    stdout{}
}

Um the value(1.2.3.4) is in the file, not the field(ip). I'm filtering the field on the value in file. So, I believe the query should be something like:

query=> "ip:%{ip}"

You see

here I've tried to filter it on a value. I think if it works with a static value then I'll definitely work with dynamic value(with the value extracted from file).

Thanks for bearing with me.

Good catch. Yes, the ip filter should be how you set it. I corrected it in my response.

@Rahul_Kumar4 Your way also does not work.
Here's my config:

input{
 stdin{}
}
filter{
	grok{
        match => {
            "message" => "(?<ip>%{IP})%{GREEDYDATA}"
        }
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => ["index*"]
        query => "xforwarded1:%{ip}"
    }
		
}
output {
 
stdout{}
}

Here's my output:

And I'd get same output if my config didn't have elasticsearch filter. I don't think I'm supposed to expect this output from elasticsearch filter.

Conclusion: elasticsearch filter does not work, not for me.

If anyone has got a working elasticsearch filter conf then please help me out. Thanks.

I'm getting some output, still not right.
Here's my config:

input{
 stdin{}
}
filter{
	grok{
        match => {
            "message" => "(?<ip>%{IP})%{GREEDYDATA}"
        }
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => ["index*"]
        query => "xforwarded1:%{ip} AND @timestamp:[now-1d/d TO now]"
	fields => {
  "message" => "log"
}
    }
		
}
output {
  stdout{}
}

field was the catch!

But I can only see one hit, while my console shows 16 hits. Any remedy?
Thanks

Here's the solution:

input{
 stdin{}
}
filter{
	grok{
        match => {
            "message" => "(?<ip>%{IP})%{GREEDYDATA}"
        }
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => ["index*"]
	result_size => 100
        query => "xforwarded1:%{ip} AND @timestamp:[now-1d/d TO now]"
	fields => {
  		"message" => "log"
	}
	
    }
	split {
   field => "log"
 }	
}
output {
   stdout{}
}

Use result_size and split to make the output complete and presentable. Target achieved. Thanks!