Http input and output google bigquery

input {
http {
host => "0.0.0.0" # default: 0.0.0.0
port => 0000 # default: 8080

}
}

filter {
urldecode {
all_fields => true
}
}

filter {
   mutate { gsub => [ "message", "\]", "}", "message", "\[", "{" ] }
   kv { field_split => "&" allow_empty_values => true }

}

filter {
mutate {

    split => { "contact{fields}{39}" => '+' }
    split => { "unsubscribe{reason}" => '+' }
    split => { "date_time" => '+' }
    split => { "share{content}" => '+' }
    split => { "message" => '+' }
    split => { "bounce{description}" => '+' }

}
}

filter {
mutate {
join => { "contact{fields}{39}" => " " }
join => { "unsubscribe{reason}" => " " }
join => { "date_time" => " " }
join => { "share{content}" => " " }
join => { "message" => " " }
join => { "bounce{description}" => " " }
}
}

output {
Elasticsearch {
hosts => [ 'es:9200' ]
ndex => "test"

}
stdout {
codec => "json"
}
}

output {
google_bigquery {
project_id => "project-273709"
dataset => "test"
csv_schema => "campaign{recipients}:INTEGER,contact{ip}:INTEGER,contact{id}:FLOAT"
json_key_file => "/key/test.json"
error_directory => "/tmp/bigquery-errors"
date_pattern => "%Y-%m-%dT%H:00"
flush_interval_secs => 30
}
}

I am trying to get http input data in to google boig gquery but first I am facing Schema Error and
if I am using above config I am getting following error anyone pls help me

"campaign{recipients}". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 300 characters long.}

The message is saying that {} are not allowed in the schema name.

Hello @Badger

thank you for promp reply my Log is like

{"initiated_by":"admin","bounce{code}":"5.1.1","contact_fields_39":"custom field value","campaign{id}":"98","contact{last_name}":"Last","date_time":"2013-01-01 12:00:00","bounce{type}":"hard","initiated_from":"admin","version":"1","contact{email}":"test@test.com","contact{id}":"42","bounce{description}":"Address+not+found.","host":"10.10.4.151","type":"bounce","list":"1","contact_first_name":"First","url":"","timestamp":"2022-05-10T13:26:57.392924Z","headers":{"content_length":"390","content_type":"application/x-www-form-urlencoded","accept_encoding":"gzip","http_user_agent":null,"http_accept":"/","request_method":"POST","http_version":"HTTP/1.1","http_host":"9947-62-255-138-230.eu.ngrok.io","x_forwarded_proto":"https","request_path":"/","x_forwarded_for":"34.233.145.248"},"contacti_ip":"127.0.0.1","message":["url=&type=bounce&date_time=2013-01-01","12:00:00&initiated_by=admin&initiated_from=admin&list=1&campaign{id}=98&contact{id}=42&contact{email}=test@test.com&contact{first_name}=First&contact{last_name}=Last&contact{ip}=127.0.0.1&contact{fields}{39}=custom","field","value&bounce{type}=hard&bounce{code}=5.1.1&bounce{description}=Address","not","found."]}

**Comming in to http input now I want to output this Data into Bigquery As well **

so how can I achiev that I read thar mesasge {} is not allowed in to schema but I want to store data in to bigquery selective fileds Other files I want to drop How can I do that

I would do things rather differently

    mutate { gsub => [ "message", "(\w+){(\w+)}", "\1_\2" ] }
    json { source => "message" remove_field => [ "message" ] }
    urldecode { all_fields => true }
    mutate {
        gsub => [
            "contact_fields_39", "\+", " ",
            "unsubscribe_reason", "\+", " ",
            "date_time", "\+", " ",
            "share_content", "\+", " ",
            "message", "\+", " ",
            "bounce_description", "\+", " "
        ]
    }
    # If you want headers at the top-level
    ruby {
        code => '
            headers = event.remove("headers")
            if headers headers.each { |k, v| event.set(k, v) }; end
        '
    }

input {
http {
host => "0.0.0.0" # default: 0.0.0.0
port => 0000 # default: 8080

}
}

filter {
mutate { gsub => [ "message", "(\w+){(\w+)}", "\1_\2" ] }
json { source => "message" remove_field => [ "message" ] }
urldecode { all_fields => true }
mutate {
gsub => [
"contact_fields_39", "+", " ",
"unsubscribe_reason", "+", " ",
"date_time", "+", " ",
"share_content", "+", " ",
"message", "+", " ",
"bounce_description", "+", " "
]
}
}
# If you want headers at the top-level
ruby {
code => '
headers = event.remove("headers")
if headers headers.each { |k, v| event.set(k, v) }; end
'
}

output {
Elasticsearch {
hosts => [ 'es:9200' ]
index => "ac_test"

}
stdout {
codec => "json"
}
}

output {
google_bigquery {
project_id => "id-709"
dataset => "_test"
table_separator => ""
batch_size => 1000
table_prefix => "_test"
csv_schema => "campaign_name:STRING,timestamp:TIMESTAMP,date_time:DATETIME,campaign_id:STRING,account_id:STRING,type:STRING,campaign_recipients:STRING"
json_key_file => "/key/key.json"
error_directory => "/tmp/bigquery-errors"
date_pattern => ""
ignore_unknown_values => true
flush_interval_secs => 30
}
}

Hello @Badger Thank you for reply I tried above code in logstash but I am getting error in Logstash start I need All fileds in Elasticsearch and selective field in to bigquery

see the following error

[2022-05-12T09:46:25,110][INFO ][logstash.runner ] Log4j configuration path used is: /etc/logstash/log4j2.properties
[2022-05-12T09:46:25,136][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"8.1.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.14.1+1 on 11.0.14.1+1 +indy +jit [linux-x86_64]"}
[2022-05-12T09:46:25,137][INFO ][logstash.runner ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[2022-05-12T09:46:27,661][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601, :ssl_enabled=>false}
[2022-05-12T09:46:28,526][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "input", "filter", "output" at line 25, column 5 (byte 607) after ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in block in converge_state'"]}
[2022-05-12T09:46:28,626][INFO ][logstash.runner ] Logstash shut down.
[2022-05-12T09:46:28,652][FATAL][org.logstash.Logstash ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:747) ~[jruby.jar:?]
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:710) ~[jruby.jar:?]
at usr.share.logstash.lib.bootstrap.environment.(/usr/share/logstash/lib/bootstrap/environment.rb:94) ~[?:?]
[2022-05-12T09:46:56,247][INFO ][logstash.runner ] Log4j configuration path used is: /etc/logstash/log4j2.properties
[2022-05-12T09:46:56,310][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"8.1.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.14.1+1 on 11.0.14.1+1 +indy +jit [linux-x86_64]"}
[2022-05-12T09:46:56,312][INFO ][logstash.runner ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[2022-05-12T09:46:58,626][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601, :ssl_enabled=>false}
[2022-05-12T09:46:59,490][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "input", "filter", "output" at line 25, column 5 (byte 607) after ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in block in converge_state'"]}
[2022-05-12T09:46:59,623][INFO ][logstash.runner ] Logstash shut down.
[2022-05-12T09:46:59,643][FATAL][org.logstash.Logstash ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:747) ~[jruby.jar:?]
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:710) ~[jruby.jar:?]
at usr.share.logstash.lib.bootstrap.environment.(/usr/share/logstash/lib/bootstrap/environment.rb:94) ~[?:?]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.