Hi there,
I have a question about ruby or anything else if it is possible.
@Badger had a nice ruby script a few years ago. But it doesnt work right now and I am not familiar with ruby.
ruby {
init => ' Exceptions = [ "@timestamp", "@version", "host", "message", "sequence", "version" ] '
code => "
event.to_hash.each { |k, v|
unless Exceptions.include? k
event.set('[message][#{k}]', v)
event.remove(k)
end
}
"
}
I get an error:
object mapping for [message.\#{k}] tried to parse field [\#{k}] as object, but found a concrete value"}}}}
What I want to solve is that I have to use logstash to get cloudtrail logs. In the future we will switch to filebeat. But for now we have to do this.
Our config:
input {
s3snssqs {
region => "eu-central-1"
s3_default_options => { "endpoint_discovery" => true }
queue => "QUEUENAME"
queue_owner_aws_account_id => "NUMBER"
tags => ["cloudtrail"]
sqs_skip_delete => false # uncomment for debugging
codec => cloudtrail
from_sns => false
consumer_threads => 2
s3_options_by_bucket => [
{ bucket_name => "BUCKETNAME"
credentials => { role =>"ROLEARN" }
}
]
}
}
filter{
if [@metadata][s3][object_key] {
mutate { ### keep key (path/file.gz) information for audits
copy => {
"[@metadata][s3][object_key]" => "[s3][object_key]"
}
}
}
clone { clones => ["clone"] }
if "clone" == [type] {
ruby {
init => ' Exceptions = [ "@timestamp", "@version", "host", "message", "sequence", "version" ] '
code => "
event.to_hash.each { |k, v|
unless Exceptions.include? k
event.set('[message][#{k}]', v)
event.remove(k)
end
}
"
}
}
ruby {
code => "
src_names = [
'[requestParameters][DescribeHostsRequest]',
'[requestParameters][DescribeLaunchTemplatesRequest]',
'[requestParameters][DescribeVpcEndpointConnectionNotificationsRequest]',
'[requestParameters][DescribeVpcEndpointServiceConfigurationsRequest]',
'[requestParameters][DescribeVpcEndpointServicesRequest]',
'[requestParameters][MaxResults]',
'[requestParameters][TimePeriod][End]',
'[requestParameters][TimePeriod][Start]',
'[requestParameters][attribute]',
'[requestParameters][bucketPolicy][Statement][Principal]',
'[requestParameters][domainName]',
'[requestParameters][maxItems]',
'[requestParameters][maxResults]',
'[requestParameters][startTime]',
'[responseElements][CreateVpcEndpointResponse][vpcEndpoint][dnsEntrySet]',
'[responseElements][CreateVpcEndpointResponse][vpcEndpoint][groupSet]',
'[responseElements][CreateVpcEndpointResponse][vpcEndpoint][networkInterfaceIdSet]',
'[responseElements][CreateVpcEndpointResponse][vpcEndpoint][ownerId]',
'[responseElements][CreateVpcEndpointResponse][vpcEndpoint][routeTableIdSet]',
'[responseElements][CreateVpcEndpointResponse][vpcEndpoint][subnetIdSet]',
'[responseElements][dBSubnetGroup]',
'[responseElements][policy]',
'[responseElements][status]',
'[requestParameters][filter]',
'[requestParameters][DescribeFlowLogsRequest]',#requestParameters.DescribeFlowLogsRequest
'[requestParameters][DescribeHostsRequest]',
'[requestParameters][operations]',
'[responseElements][role]',
'[requestParameters][DescribeVpcEndpointsRequest]',
'[requestParameters][DescribeNatGatewaysRequest]',
'[requestParameters][DescribeEgressOnlyInternetGatewaysRequest]',
'[responseElements][endpoint]'
]
src_names.each do |src_name|
if !event.get(src_name).nil? then
src_data = event.get(src_name)
src_class = src_data.class.name.downcase
dst_name = nil
if src_name[-1] == ']' then
dst_name = src_name[0..-2] + '_' + src_class + ']'
else
dst_name = src_name + '_' + src_class
end
event.set(dst_name, src_data)
event.remove(src_name)
end
end
"
}
}
output{
# PoC SERVER SIEM
if "clone" == [type] {
elasticsearch {
hosts => ["https://some.elastic.cluster:9243"]
ilm_rollover_alias => "aws-lz-logs"
ilm_pattern => "000001"
ilm_policy => "ilm-policy"
pipeline => "filebeat-7.8.0-aws-cloudtrail-pipeline"
manage_template => false
user => "USER"
password => "PW"
}
}
I want to use the filebeat pipeline as you can see in the output.
But for this I need to put all fields under "message:{...}" that the filebeat pipeline works.
Can someone help me or has a clue how to change the ruby filter that it works?
Thanks in advance
Malte