How to use logstash to collect docker logs cleanly

The default output of docker is in json format, which will contain many special characters (such as \n \r).

The following log sample, I need to merge multiple lines and remove all \n \r.
For "\r \n", should Line Feed and Carriage Return be implemented first and then deleted?

I want to know how everyone collects docker logs and is there a best practice solution?

{"log":"{\"type\": \"server\", \"timestamp\": \"2021-03-27T00:33:09,519Z\", \"level\": \"WARN\", \"component\": \"r.suppressed\", \"cluster.name\": \"es-single-node\", \"node.name\": \"es-docker\", \"message\": \"path: /elastalert_status_test/_search, params: {size=1000, index=elastalert_status_test}\", \"cluster.uuid\": \"V-K8-DD7RValiCtkesXqMg\", \"node.id\": \"9_5rNmciQ1CQniRQ1rvFRw\" , \n","stream":"stdout","time":"2021-03-27T00:33:09.526433505Z"}
{"log":"\"stacktrace\": [\"org.elasticsearch.action.search.SearchPhaseExecutionException: all shards failed\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526497671Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:636) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526505739Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:357) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.52651011Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:669) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526514441Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:440) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526518428Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.lambda$performPhaseOnShard$1(AbstractSearchAsyncAction.java:265) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526522497Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction$2.doRun(AbstractSearchAsyncAction.java:333) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526526505Z"}
{"log":"\"at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.52653038Z"}
{"log":"\"at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:33) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526534264Z"}
{"log":"\"at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:732) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526538217Z"}
{"log":"\"at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526542121Z"}
{"log":"\"at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526546005Z"}
{"log":"\"at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526550974Z"}
{"log":"\"at java.lang.Thread.run(Thread.java:832) [?:?]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526555037Z"}
{"log":"\"Caused by: org.elasticsearch.action.NoShardAvailableActionException\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526558816Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:473) ~[elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526562725Z"}
{"log":"\"at org.elasticsearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:422) [elasticsearch-7.12.0.jar:7.12.0]\",\n","stream":"stdout","time":"2021-03-27T00:33:09.526566674Z"}
{"log":"\"... 9 more\"] }\n","stream":"stdout","time":"2021-03-27T00:33:09.526570813Z"}
{"log":"\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648933649Z"}
{"log":"Hello from Docker!\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648976724Z"}
{"log":"This message shows that your installation appears to be working correctly.\r\n","stream":"stdout","time":"2019-02-01T03:43:43.64897985Z"}
{"log":"\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648981896Z"}
{"log":"To generate this message, Docker took the following steps:\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648983741Z"}
{"log":" 1. The Docker client contacted the Docker daemon.\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648985618Z"}
{"log":" 2. The Docker daemon pulled the \"hello-world\" image from the Docker Hub.\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648987499Z"}
{"log":"    (amd64)\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648989529Z"}
{"log":" 3. The Docker daemon created a new container from that image which runs the\r\n","stream":"stdout","time":"2019-02-01T03:43:43.64899137Z"}
{"log":"    executable that produces the output you are currently reading.\r\n","stream":"stdout","time":"2019-02-01T03:43:43.6489933Z"}
{"log":" 4. The Docker daemon streamed that output to the Docker client, which sent it\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648995264Z"}
{"log":"    to your terminal.\r\n","stream":"stdout","time":"2019-02-01T03:43:43.64899717Z"}
{"log":"\r\n","stream":"stdout","time":"2019-02-01T03:43:43.648998977Z"}
{"log":"To try something more ambitious, you can run an Ubuntu container with:\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649000749Z"}
{"log":" $ docker run -it ubuntu bash\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649002642Z"}
{"log":"\r\n","stream":"stdout","time":"2019-02-01T03:43:43.6490045Z"}
{"log":"Share images, automate workflows, and more with a free Docker ID:\r\n","stream":"stdout","time":"2019-02-01T03:43:43.64900631Z"}
{"log":" https://hub.docker.com/\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649009798Z"}
{"log":"\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649011696Z"}
{"log":"For more examples and ideas, visit:\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649013472Z"}
{"log":" https://docs.docker.com/get-started/\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649015347Z"}
{"log":"\r\n","stream":"stdout","time":"2019-02-01T03:43:43.649017234Z"}

There are even color tags for docker logs

Wow, what a horror show! I can see why someone might have though that was a good idea, but it is not a good fit for logstash.

I will take a look over the weekend. My first impression is to parse each line as JSON, then route anything from the stdout stream to another pipeline with the [log] field prepended with the date, and in the other pipeline use a multiline codec. The pattern is going to be complex, that is for sure. I do not expect an aggregate filter will be a better approach.

Something ate the tabs, which does not help. Removing the \r\n is definitely the least of your problems.

@Badger
Thank you for your reply, I also thought about similar solutions, but it feels too complicated, so I want to see how everyone handles docker logs.

A multiline code will not work. You need to maintain state. If you are in a JSON object then you need to join lines, if you are not, then you do not. I managed to get this working with a ruby filter. I do not think it is a good fit for aggregate, but since an aggregate filter runs arbitrary ruby code I expect it could be shoe-horned in.

    ruby {
        init => '@joining = false; @joined = ""'
        code => '
            log = event.get("log")
            if @joining
                @joined += log
                if log =~ /}$/
                    @joining = false
                    event.set("log", @joined)
                else
                    event.cancel
                end
            else
                if log =~ /^{/ and log !~ /}$"/
                    @joining = true
                    @joined = log
                end
            end
        '
    }
    if [log] =~ /^{/ and [log] =~ /}$/ {
        json { source => "log" }
    }

Edited to add: The if log =~ /^{/ and log !~ /}$"/ takes care of lines where an entire JSON object is in a single log entry. But also break things if that object is nested inside another object. You really need a JSON parser to get this right, but that is a lot of code.

Have you tried to change the logging driver to see if the messages are in a better format? I think you could try to use syslog, with a syslog server (or even logstash), or gelf and send directly to logstash.

This json output of docker is pretty bad to parse.

If I don't find the best solution, I should replace the log driver

Thank you very much. I'll try it.