Tags does not work with aggregate filter and elasticsearch output plugins

(Sachin) #1

I am trying to get the tags to work with filter and output and so far no luck with it . The tags are required to import data from multiple tables conditionally. when the tags are applied to aggregate and output plugins they dont seem to import any data into the index. Below is a sample logstash config file. Any help is appreciated.

input { 
    jdbc {
        jdbc_connection_string => "jdbc:sqlserver://DEV.localhost.com:3181;databaseName=SQL;user=DBA;password=DBA"
        jdbc_user => "DBA"
		jdbc_password => "DBA"
        jdbc_driver_library => "C:/Apps/mssqljdbc/sqljdbc_6.2/enu/mssql-jdbc-6.2.2.jre8.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
		statement => "SELECT * from category"
		use_column_value => true
        tracking_column => catid
        tracking_column_type => "numeric"
        clean_run => true
        record_last_run => true
		tags => ["category"]
    } 
	jdbc {
        jdbc_connection_string => "jdbc:sqlserver://DEV.localhost.com:3181;databaseName=SQL;user=DBA;password=DBA"
        jdbc_user => "DBA"
		jdbc_password => "DBA"
        jdbc_driver_library => "C:/Apps/mssqljdbc/sqljdbc_6.2/enu/mssql-jdbc-6.2.2.jre8.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
		statement => "SELECT * from subcategory"
		use_column_value => true
        tracking_column => subcatid
        tracking_column_type => "numeric"
        clean_run => true
        record_last_run => true
		tags => ["subcategory"]
    } 
    
}

filter {
    if "category" in [tags] { 
		      aggregate {
			   task_id => "%{[catid]}"
			   code => "
				 map['catid'] = event.get('catid')
				 map['category'] ||= []
				 map['category'] << {'catid' => event.get('catid'), 'col2' => event.get('col2'), 'cat' => event.get('cat')}
				 map['category_subcategory'] ||= []
				 map['category_subcategory'] << {'name' => 'category'}
				 event.cancel()
			   "
			   push_previous_map_as_event => true
			   timeout => 3
			 }
    } else if "subcategory" in [tags] { 
		      aggregate {
			   task_id => "%{[subcatid]}"
			   code => "
				 map['catid'] = event.get('catid')
				 map['category'] ||= []
				 map['category'] << {'subcatid' => event.get('subcatid'), 'col3' => event.get('col3'), 'cat' => event.get('cat')}
				 map['category_subcategory'] ||= []
				 map['category_subcategory'] << {'name' => 'category', 'parent' => event.get('catid')}
				 event.cancel()
			   "
			   push_previous_map_as_event => true
			   timeout => 3
			 }
    }
}
   

 output {
    if "category" in [tags] {
	   elasticsearch {
		 hosts => ["http://localhost:9200"]
		 index => "category"
		 document_id => "category-%{[catid]}"
		 ssl_certificate_verification => false
		 workers => 1
	   }
    }
	else if "subcategory" in [tags] {
	   elasticsearch {
		 hosts => ["http://localhost:9200"]
		 index => "category"
		 document_id => "subcategory-%{[subcatid]}"
		 ssl_certificate_verification => false
		 workers => 1
	   }
    }
 }
#2

If you remove the elasticsearch outputs and just use

output { stdout { codec => rubydebug } }

do the events have the category and subcategory tags? I am wondering if the problem is in your aggregate filters.

(Sachin) #3

Using stdout has no difference. Still no records in the output. For sure the problem is with the aggregate filters. The events do not have category or subcategory tags. I have tried adding them with static values and still no luck.

#4

OK, so if you get rid of the filter section as well, what does an event look like in the rubydebug output?

(Sachin) #5

Getting rid of the filters results in the desired output. Its just i am unable to figure out why the same does not with filters.

#6

If you will tell us what data you are trying to manipulate we cannot tell you why your manipulations are failing.

(Sachin) #7

The plan is to import data from multiple tables using the join data type. These tables contain parent child relationships. Based on the documentation, i have decided to use the Join datatype and organize the data based on the table names.

Below is an example 
table **category** contains the columns
   - catid
   - catname
   - col2
   - col3

table **subcategory** contains the columns
   - subcatid
   - subcatname
   - catid ( foreign key to category catid)
   - col5

The aggregate plugin should organize the data into below 

document
    - category
                   - catid
                   - catname
                   - col2
                   - col3

    - subcategory
                   - subcatid
                   - subcatname
                   - catid
                   - col5

    - category_subcategory
                   - name
                   - parent ( added only for subcategory records )

The tags are required to conditionally add category and subcategory fields so that searching is easier.

#8

The reason I am asking to see your data is that your filters work just fine, which suggests that your data does not look the way you think it looks.

Of course I am inferring the data structure from the filter configuration, but this

input { generator { count => 1 lines => [ 'foo,1,a', 'foo,2,b', 'bar,1,a' ] } }
filter {
    mutate { add_tag => [ "category"] }
    csv { columns => [ "catid", "col2", "cat" ] }
    if "category" in [tags] {
        aggregate {
            task_id => "%{[catid]}"
            code => '
                map["catid"] = event.get("catid")
                map["category"] ||= []
                map["category"] << {"catid" => event.get("catid"), "col2" => event.get("col2"), "cat" => event.get("cat")}
                map["category_subcategory"] ||= []
                map["category_subcategory"] << {"name" => "category"}
                event.cancel()
            '
            push_previous_map_as_event => true
            timeout => 3
        }
    }
}

gets me

{
               "catid" => "foo",
          "@timestamp" => 2019-05-17T17:21:15.264Z,
            "@version" => "1",
"category_subcategory" => [
    [0] {
        "name" => "category"
    },
    [1] {
        "name" => "category"
    }
],
            "category" => [
    [0] {
        "catid" => "foo",
          "cat" => "a",
         "col2" => "1"
    },
    [1] {
        "catid" => "foo",
          "cat" => "b",
         "col2" => "2"
    }
]
}
{
               "catid" => "bar",
          "@timestamp" => 2019-05-17T17:21:15.478Z,
            "@version" => "1",
"category_subcategory" => [
    [0] {
        "name" => "category"
    }
],
            "category" => [
    [0] {
        "catid" => "bar",
          "cat" => "a",
         "col2" => "1"
    }
],
                "tags" => [
    [0] "_aggregatefinalflush"
]
}
(Sachin) #9

i still see no output once the if condition is added to the output.

code sample below

input { generator { count => 1 lines => [ 'foo,1,a', 'foo,2,b', 'bar,1,a' ] } }
filter {
    mutate { add_tag => [ "category"] }
    csv { columns => [ "catid", "col2", "cat" ] }
    if "category" in [tags] {
        aggregate {
            task_id => "%{[catid]}"
            code => '
                map["catid"] = event.get("catid")
                map["category"] ||= []
                map["category"] << {"catid" => event.get("catid"), "col2" => event.get("col2"), "cat" => event.get("cat")}
                map["category_subcategory"] ||= []
                map["category_subcategory"] << {"name" => "category"}
                event.cancel()
            '
            push_previous_map_as_event => true
            timeout => 3
        }
    }
}
 output {
     
    if "category" in [tags] {
	stdout { codec => rubydebug }
        
   }
 }
#10

Doh! Just spotted the issue. Add this to the code in your ruby filter

 map["tags"] = event.get("tags")

Do you need to merge the tags arrays of every event that you aggregate?

(Sachin) #11

Thanks Badger . Adding the tags to the map did the trick. I was trying to use the below which should have resulted in same behavior.

aggregate {
      add_tag => [ "category"]
      task_id => "%{[catid]}"
}

I have also noticed that now tags is also a field in the document which should be ok in this case.

Below is a working sample for everybody who has similar issues.

input { generator { count => 1 lines => [ 'foo,1,a', 'foo1,2,b', 'bar,1,a' ] } }
filter {
    mutate { add_tag => [ "category"] }
    csv { columns => [ "catid", "col2", "cat" ] }
    if "category" in [tags] {
        aggregate {
            task_id => "%{[catid]}"
            code => '
                map["catid"] = event.get("catid")
				map["tags"] = event.get("tags")
                map["category"] ||= []
                map["category"] << {"catid" => event.get("catid"), "col2" => event.get("col2"), "cat" => event.get("cat")}
                map["category_subcategory"] ||= []
                map["category_subcategory"] << {"name" => "category"}
                event.cancel()
            '
            push_previous_map_as_event => true
            timeout => 3
        }
    }
}
 output {
     
    if "category" in [tags] {
	stdout { codec => rubydebug }
        
   }
 }