Hey guys,
I want to analyze the traffic of my application. I therefore log every HTTP request and push it to a RabbitMQ pipeline in JSON format.
A message could look like the following:
{
"request_method":"GET",
"request_url":"/api/resource/",
"request_query_params":{
"a":"123"
},
"request_header":{
"Accept":"application/json, text/plain, */*",
...
},
"request_body":{
},
"response_code":200,
"response_header":{
"Date":"Sun, 15 Dec 2019 18:41:05 GMT"
...
},
"response_body":{
"success":true
}
}
Logstash should then get the message and output it to Elasticsearch
I have Logstash configured like this.
input {
rabbitmq {
host => "rabbitmq"
port => 15672
queue => "requests"
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "requests"
}
stdout {}
}
Now the thing is that I get an error telling me "request_header" could not be parsed.
After experimenting some time (adding json filter, reducing size of the json) I realized that
after changing the property name to something else like "request_headerx" or "req_header" it suddenly works.
I've got the same issue with "request_body", "response_header" and "response_body".
The console output is the following:
elasticsearch | {"type": "server", "timestamp": "2019-12-15T23:19:17,538Z", "level": "DEBUG", "component": "o.e.a.b.TransportShardBulkAction", "cluster.name": "docker-cluster", "node.name": "fa78780267f2", "message": "[requests][0] failed to execute bulk item (index) index {[requests][_doc][b6zbC28B63RwtnBL49sh], source[{\"@timestamp\":\"2019-12-15T23:19:17.433Z\",\"request_header\":{\"x\":\"123\"},\"@version\":\"1\"}]}", "cluster.uuid": "XEQIsyiyRsWcvdZyv-ZFLA", "node.id": "SZgmo-gfT2-myRsevraH3A" ,
elasticsearch | "stacktrace": ["org.elasticsearch.index.mapper.MapperParsingException: failed to parse field [request_header] of type [text] in document with id 'b6zbC28B63RwtnBL49sh'. Preview of field's value: '{x=123}'",
elasticsearch | "at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:299) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:488) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentParser.parseObject(DocumentParser.java:505) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentParser.innerParseObject(DocumentParser.java:418) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrNested(DocumentParser.java:395) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentParser.internalParseDocument(DocumentParser.java:112) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:71) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:267) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShard.prepareIndex(IndexShard.java:776) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:753) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:725) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:258) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:161) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:193) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:118) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:79) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:917) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:108) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:394) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:316) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShard.lambda$wrapPrimaryOperationPermitListener$21(IndexShard.java:2752) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:113) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:285) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:237) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2726) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:858) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:312) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.action.support.replication.TransportReplicationAction.handlePrimaryRequest(TransportReplicationAction.java:275) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java:257) [x-pack-security-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.messageReceived(SecurityServerTransportInterceptor.java:315) [x-pack-security-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:63) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:752) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]",
elasticsearch | "at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]",
elasticsearch | "at java.lang.Thread.run(Thread.java:830) [?:?]",
elasticsearch | "Caused by: java.lang.IllegalStateException: Can't get text on a START_OBJECT at 1:59",
elasticsearch | "at org.elasticsearch.common.xcontent.json.JsonXContentParser.text(JsonXContentParser.java:85) ~[elasticsearch-x-content-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.common.xcontent.support.AbstractXContentParser.textOrNull(AbstractXContentParser.java:253) ~[elasticsearch-x-content-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.TextFieldMapper.parseCreateField(TextFieldMapper.java:822) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:277) ~[elasticsearch-7.5.0.jar:7.5.0]",
elasticsearch | "... 40 more"] }
logstash | [2019-12-15T23:19:17,544][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"requests", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x63619282>], :response=>{"index"=>{"_index"=>"requests", "_type"=>"_doc", "_id"=>"b6zbC28B63RwtnBL49sh", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [request_header] of type [text] in document with id 'b6zbC28B63RwtnBL49sh'. Preview of field's value: '{x=123}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:59"}}}}}
logstash | {
logstash | "@timestamp" => 2019-12-15T23:19:17.433Z,
logstash | "request_header" => {
logstash | "x" => "123"
logstash | },
logstash | "@version" => "1"
logstash | }
As I said, if i change it to "request_headerX" everything works fine
elasticsearch | {"type": "server", "timestamp": "2019-12-15T23:56:03,636Z", "level": "INFO", "component": "o.e.c.m.MetaDataMappingService", "cluster.name": "docker-cluster", "node.name": "fa78780267f2", "message": "[requests/tklbThp4RwCXkbpR2Ptcqw] update_mapping [_doc]", "cluster.uuid": "XEQIsyiyRsWcvdZyv-ZFLA", "node.id": "SZgmo-gfT2-myRsevraH3A" }
logstash | {
logstash | "request_headerX" => {
logstash | "x" => "123"
logstash | },
logstash | "@timestamp" => 2019-12-15T23:56:03.515Z,
logstash | "@version" => "1"
logstash | }
Does anyone have an idea why logstash/elasticsearch behaves like this and how to fix this?
P.S.: I set up ELK with docker (image versions 7.5.0)