Using tag found in array as index name

We are trying to automate Logstash configuration template creation for about 200 customers. Each customer will have its own index based on its company name.

Is it possible to use the tag found in array as a variable for the index name?

output {
if ['company1', 'company2'] in tags {
elasticsearch {
    hosts => '10.35.1.38'
    index => "{TAGFOUND}-%{+YYYY.MM.dd}"
    }
 }
}

}

You can definitely do this, but there's nothing like the syntax in your example. You need something like this:

filter {
  if ['company1', 'company2'] in [tags] {
    mutate {
      add_field => { "[@metadata][indexname]", "companyX" }
  }
}

output {
  elasticsearch {
    ...
    index => "%{[@metadata][indexname]}-%{+YYYY.MM.dd}"
  }
}

The filter section can of course be generated by a script based on a data representation that's easy to maintain for a human.

Thanks for the reply. Glad this is actually possible.

How does the CompanyX work exactly? Won't it add the literal "CompanyX" instead of the Company1 or Company2 found in the tags?

My goal is to have CompanyX be Company1 if that is in the tags and use that as the index name only. If Company2 is in the tags then use only Company2 as the main index name.

Is that what this will accomplish?

How does the CompanyX work exactly? Won't it add the literal "CompanyX" instead of the Company1 or Company2 found in the tags?

Yes.

My goal is to have CompanyX be Company1 if that is in the tags and use that as the index name only. If Company2 is in the tags then use only Company2 as the main index name.

Okay. I misunderstood your example.

Are you sure tags is the right data structure to use here? Since an event apparently belongs to exactly one company, wouldn't it make better sense to store the company name in a separate field instead of having company1, company2, ..., companyX tags? If you have a company name field you can use it directly in the elasticsearch output's index option.

I was concerned tags wasn't the right data structure to use. Let me explain a bit more in depth what I'm trying to accomplish.

We want to automate logstash configuration creation (via a python script)

We have an API server that we can query using the python script to return a list of company names, these company names will be used for searches in the message data.

Here is an example of something we have currently, as you can imagine we do not want to make 200 if statements manually.

input {
  exec {
    command => "cd /root/san/svc; python latency"
    interval => 300
    tags => "SANSVC"
    }
}
filter {
        if "SANSVC" in [tags] {
         split { field => ["message"]
        }
 
kv  {
        add_tag => ["SANSVC"]
    }
 
date {
        locale => "en"
        match => ["DATE", "YYYY-MM-dd;HH:mm:ss"]
        timezone => "Europe/Brussels"
        target => "@timestamp"
        add_field => { "debug" => "timestampMatched"}
   }
 
mutate {
    convert => ["ID", "integer"]
    convert => ["READOPS", "float"]
    convert => ["WRITEOPS", "float"]
    convert => ["WRITELATENCY", "float"]
    convert => ["READLATENCY", "float"]
    replace => { "host" => "%{SVCHOST}" }
    remove_field => [ "SVCHOST", "DATE" ]
        }
grok {
  
match => [ "message", ".*(?i)(unik).*"]
        add_tag => ["unik"]
        tag_on_failure => []
    }
  
grok {
 
match => [ "message", ".*(?i)(micro)(matic)?.*"]
        add_tag => ["micro"]
        tag_on_failure => []
    }
}
}#End big if
 
output {
if "SANSVC" in [tags] {
  elasticsearch {
    hosts => '10.35.1.38'
    index => "san-svc-%{+YYYY.MM.dd}"
    }
 }
if "unik" in [tags]{
    elasticsearch {
        hosts => '10.35.1.38'
        index => "unik-svc-%{+YYYY.MM}"
        }
    }
  
if "micro" in [tags]{
    elasticsearch {
        hosts => '10.35.1.38'
        index => "micro-svc-%{+YYYY.MM}"
        }
    }
  
}

Hopefully this provides a better explanation, I'm happy to clarify further.

Are you suggesting we add a field based on the tag name?

Why not just turn

grok {
  match => [ "message", ".*(?i)(unik).*"]
  add_tag => ["unik"]
  tag_on_failure => []
}

into

grok {
  match => [ "message", ".*(?i)(unik).*"]
  add_field => {
    "company" => "unik"
  }
  tag_on_failure => []
}

?

I understand I can use a field instead but I am still going to have 200 if statements then aren't I?

Is it clear what I'm trying to do from the previous post? I can't see how changing from tag to fields is going to prevent having hundreds of if statements? My main goal here is to streamline the configuration so it is easily readable.

Something like this would be more ideal if it is possible:

if ['company1', 'company2'] in [tags] {
grok {
  match => [ "message", ".*(?i)]FOUNDTAG}.*"]
  add_field => {
    "company" => "{FOUNDTAG}"
  }
  tag_on_failure => []
}

I understand I can use a field instead but I am still going to have 200 if statements then aren't I?

You don't need any additional if conditionals. The existing grok filters are enough.

There has got to be a better way of figuring out which company each log entry pertains to than having tons of consecutive grok filters.

I will need to have an additional if for each company name, we have 200 of them so that means 200 ifs.

I can't see a better way which is why I came here, the @message contains the company name so it made sense to search the message for a company name using an array of company names and then use the found name as the tag. Later in the output then use the same technique to search the tags using an array of company names and use that found tag as a variable for the index name.

If this isn't possible then I will request it as a feature on the logstash github.

I will need to have an additional if for each company name, we have 200 of them so that means 200 ifs.

No. The grok filter will set a field to the company name and that field can be referenced in the elasticsearch output's index option.

I can't see a better way which is why I came here, the @message contains the company name so it made sense to search the message for a company name

Sure, but isn't the company name in the same place in every message? Or will it appear randomly somewhere in the message?

Your suggestion to transform this is fine but I have 200 company names, therefore I would need 200 if statements.

grok {
  match => [ "message", ".*(?i)(company1).*"]
  add_field => ["company1"]
  tag_on_failure => []
}

grok {
  match => [ "message", ".*(?i)(company2).*"]
  add_field => ["company2"]
  tag_on_failure => []
}

...
grok {
  match => [ "message", ".*(?i)(company200).*"]
  add_field => ["company200"]
  tag_on_failure => []
}

Then for output I would have to do the same thing, 200 if statements :confused: searching the field one by one instead of using an array.

I've started a feature/issue on the logstash github

Your suggestion to transform this is fine but I have 200 company names, therefore I would need 200 if statements.

Why do you keep repeating this even though I'm saying "no" and explaining why? You only need this:

filter {
  grok {
    match => [ "message", ".*(?i)(company1).*"]
    add_field => {
      "[@metadata][company]" => "company1"
    }
    tag_on_failure => []
  }

  grok {
    match => [ "message", ".*(?i)(company2).*"]
    add_field => {
      "[@metadata][company]" => "company2"
    }
    tag_on_failure => []
  }

  ...
  grok {
    match => [ "message", ".*(?i)(company200).*"]
    add_field => {
      "[@metadata][company]" => "company200"
    }
    tag_on_failure => []
  }
}

output {
  elasticsearch {
    ...
    index => "%{[@metadata][company]}-%{+YYYY.MM.dd}"
  }
}

This works and doesn't require 200 if statements. I'm quite sure there's a better solution that doesn't require the 200 grok filters either but you don't seem to be interested. Over and out.

I've started a feature/issue on the logstash github

Forking the discussion at this point isn't going to help.

how is the company name discovered from the event? does each event have a field containing the vdisk name?

1 Like

@magnusbaeck thank you for your help but you seemed to be missing my point (hence I kept repeating it). I do not want to make 200 grok statements as that seems like a silly inefficient way to do it :stuck_out_tongue:

@jsvd we have an API server that will give us a list of company names, we want to use this array of company names in the logstash configuration. So we want to search the message field for all the items in the company array list and use any found company name as a tag for that event. Does that make sense?

@magnusbaeck
Honestly, if he is looking for an automated solution, so he doesn't have to have a config with 200 if statements, doesn't it defeat the purpose of automation, to create 200 grog filters instead? Since he keeps repeating his issue, he is obviously interested in a better solution.

1 Like

Can you provide an example of what an event looks like and where the company name can occur?

The clean company name does not occur in the message at all hence why I want to use the the custom array of company name tags and use the tag variable.

Here is an example message though for the company Unik

DATE=2016-04-04;11:52:52 SVCHOST=ATH_SVC_CLUSTER01 ID=32 VDISK=UNIK_Performance_VOL2 READOPS=72.95 WRITEOPS=155.817 READLATENCY=4.942 WRITELATENCY=0.674

Similarly for another log entry I could use the field name devname here but the company name is Borg which again shows why I want to use grok to search and autotag. Some companies can have multiple firewalls so it is not possible to just rename the firewall to the company name.

<190>date=2016-04-04 time=12:05:55 devname=BorgAutoFW03 devid=FGVM000000039696 logid=1059028704 type=utm subtype=app-ctrl eventtype=app-ctrl-all level=information vd="root" appid=16177 user="" srcip=x.x.x.x srcport=35205 srcintf="Vlanxxx" dstip=x.x.1x.x dstport=443 dstintf="port1" proto=6 service="HTTPS" sessionid=293518224 applist="default" appcat="Collaboration" app="Microsoft.Office.Online" action=pass hostname="nexusrules.officeapps.live.com" url="/" msg="Collaboration: Microsoft.Office.Online," apprisk=medium

If each event can only ever match a single company, you may be able to use the translate filter to achieve a much more compact configuration as this supports regular expressions keys and can be driven by a configuration file.

2 Likes