Parsing Dynatrace dashboard reports in XML format

I'm trying to use Logstash 7.4.1 to parse XML files that are being generated each minute upon request to the Dynatrace server via REST api. The output will be sent to ElasticSearch.

Dynatrace Dashboard Report

My goal is to be able to generate a visualisation in Kibana similar to the Dynatrace Dashboard.
x-axis : timestamps
y-axis: values (request_count or average_response_time)

My approach is to aggreate all measurements belonging to the same measure according to their aggregation attribute. If aggregation type is Count, i'll sum up all count values and write it to ES as a single record, if aggregation type is Average, i'll compute the average using sum and count and write it to ES as another record.

I expect to have one document for each measure and the output should be similar to the following:
doc/1
chartdashlet: Operasyon Adet
measure : SGT_1
measure_type: operation_count
measure_time: 01.11.2019 14:01
value: somenumber

doc/2
chartdashlet: Operasyon Adet
measure : SGT_2
measure_type: operation_count
measure_time: 01.11.2019 14:01
value: somenumber

doc/3
chartdashlet: Operasyon Sure
measure : SGT_3
measure_type: operation_responsetime
measure_time: 01.11.2019 14:01
value: somenumber

How can i do it with logstash? In order to join parent and child nodes, i need Xpath 2.0 string_join function but the used library does only support Xpath 1.0 according to the documentation. Do i need to write Ruby scripts to make for loops over measures etc?
For now I'm able to get the whole file as a single event and i can extract measures of count type with the following configuration in a testing environment: (with a single input file)

$ /home/someuser/logstash-7.4.1/bin/logstash -f dynatrace-dashboard.conf
$ cat dynatrace-dashboard.conf
input {
   file {
     id => "dynatrace_dashboard_values"
     mode => "read"
     path => "/home/someuser/dynatrace-input.xml"
     codec => multiline {
       pattern => "<?xml"
       negate => "true"
       what => "previous"
      }
  }
}
filter {
  xml {
    source => "message"
    xpath => [        "/dashboardreport/data/chartdashlet/measures/measure[@aggregation='Count']", "measure_count" ]
    store_xml => "false"
  }
}
output {
  file {
    path => "/home/someuser/logst.out"
    codec => "rubydebug"
  }
}

I'm stucked at this point.

Another alternative for me is to write a Python script to do all this stuff that I've described above and send it directly to ES but since we have so many pipelines centralized on logstash containers, I would prefer to do it with Logstash rather than this custom solution.

Any alternative recommendation to satify my goal will be appreciated. Maybe it is possible to write all xml file to ES and do some high level querying to achieve my goal.

Create a ruby script file that contains

def register(params)
end

def filter(event)
    theEvents = []
    a = event.get("[@metadata][theXML][data][chartdashlet]")
    a.each { |x|
        x["measures"]["measure"].each { |y|
            measurement = y["measurement"]
            anEvent = Hash[ "name", x["name"], "measure", y["measure"], "type", y["aggregation"] ]
            if y["aggregation"] == "Average"
                sum = 0.0
                count = 0
                measurement.each { |z|
                    sum += z["sum"].to_f
                    count += z["count"].to_i
                }
                anEvent["value"] = sum/count
                theEvents << LogStash::Event.new(anEvent)
            elsif y["aggregation"] == "Count"
                count = 0
                measurement.each { |z|
                    count += z["count"].to_i
                }
                anEvent["value"] = count
                theEvents << LogStash::Event.new(anEvent)
            end
        }
    }
    theEvents
end

Then configure logstash using

    xml {
        source => "message"
        target => "[@metadata][theXML]"
        force_array => false
        remove_field => [ "message" ]
    }
    ruby { path => "/home/user/Ruby/dynatrace.rb" }

and it will return a set of events like

{
"@timestamp" => 2019-11-01T15:54:23.511Z,
   "measure" => "SGT_KrediliHayatTarifeBul",
      "name" => "Operasyon Adet ",
     "value" => 151,
  "@version" => "1",
      "type" => "Count"
}
{
"@timestamp" => 2019-11-01T15:54:23.530Z,
   "measure" => "SGT_GetirAktifTeklifPoliceMusteriNoTarihIle",
      "name" => "Operasyon Sure",
     "value" => 0.16071883216500282,
  "@version" => "1",
      "type" => "Average"
}

I haven't verified that the results are correct, but it should get you started.

Hi Badger,

This is exactly what i need. Thank you very much for your quick reply.

I've changed the api request, the pipeline conf and the ruby script a little bit to output the beginning of the timeframe filter in the api request as a new time field which shows the original record timestamp. Therefore all record timestamps which are produced through a single xml response file are equal.

So, instead of

tf:Last5Min

I use a custom time filter for the previous minute range.

tf:CustomTimeframe?1572875400000:1572875460000

,and my final output structure is below:

{
"measure" => "SGT_HesaplaSigortaPrimi",
"@timestamp" => 2019-11-05T08:39:43.841Z,
"name" => "Banka Sigortacılığı Operasyon Adet ",
"@version" => "1",
"type" => "Count",
"value" => 32,
"record_timestamp" => 2019-11-04T13:50:00.000Z
}
{
"measure" => "SGT_GetirAktifTeklifPoliceMusteriNoTarihIle",
"@timestamp" => 2019-11-05T08:39:43.842Z,
"name" => "Banka Sigortacılığı Operasyon Adet ",
"@version" => "1",
"type" => "Count",
"value" => 11,
"record_timestamp" => 2019-11-04T13:50:00.000Z
}

To obtain this result, here are the final filter conf and the ruby script.

filter {
xml {
source => "message"
target => "[@metadata][theXML]"
force_array => false
remove_field => [ "message" ]
xpath => [ "/dashboardreport/source/filters/filter/text()" , "report_filter_text" ]
}
dissect {
mapping => { "report_filter_text" => "%{?filter_type}?%{from_millis}:%{?end_millis}" }
}
date {
match => [ "from_millis", "UNIX_MS" ]
target => "record_timestamp"
}
ruby {
path => "/home/sa33670/code/dynatrace.rb"
}
}

Ruby script:

Before:

a = event.get("[@metadata][theXML][data][chartdashlet]")
a.each { |x|
...
        anEvent = Hash[ "name", x["name"], "measure", y["measure"], "type", y["aggregation"] ]
        if y["aggregation"] == "Average"

After:
a = event.get("[@metadata][theXML][data][chartdashlet]")
recordTimestamp = event.get("record_timestamp")
a.each { |x|
...
anEvent = Hash[ "name", x["name"], "measure", y["measure"], "type", y["aggregation"] ]
anEvent["record_timestamp"] = recordTimestamp
if y["aggregation"] == "Average"

Hi Badger,

There are some event processing errors in my ruby filter for some of my input files and i'm not able to detect the root cause, because there is no enough information even after i set "log.level: trace". I can not find out what is the difference btwn the successful ones and the failed ones

If you can teach me how to log from the loop inside the ruby script, i think i can figure out the source of the error. I've checked some other post and found an example of it but i didn't get the file-suffix thing.

The error is at line 25 as you can see below:

 [2019-11-13T15:02:11,610][ERROR][logstash.filters.ruby    ][main] Could not process event: no implicit conversion of String into Integer {:script_path=>"/home/xxx/code/dynatrace.rb", :class=>"TypeError", :backtrace=>["org/jruby/RubyArray.java:1483:in `[]'", "/home/xxx/code/dynatrace.rb:25:in `block in filter'", "org/jruby/RubyHash.java:1417:in `each'", "/home/xxx/code/dynatrace.rb:24:in `block in filter'", "org/jruby/RubyArray.java:1800:in `each'", "/home/xxx/code/dynatrace.rb:9:in `block in filter'", "org/jruby/RubyArray.java:1800:in `each'", "/home/xxx/code/dynatrace.rb:8:in `filter'", "/home/xxx/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.5/lib/logstash/filters/ruby/script/context.rb:55:in `execute_filter'", "/home/xxx/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.5/lib/logstash/filters/ruby/script.rb:30:in `execute'", "/home/xxx/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.5/lib/logstash/filters/ruby.rb:98:in `file_script'", "/home/xxx/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.5/lib/logstash/filters/ruby.rb:84:in `filter'", "/home/xxx/logstash-7.4.1/logstash-core/lib/logstash/filters/base.rb:143:in `do_filter'", "/home/xxx/logstash-7.4.1/logstash-core/lib/logstash/filters/base.rb:162:in `block in multi_filter'", "org/jruby/RubyArray.java:1800:in `each'", "/home/xxx/logstash-7.4.1/logstash-core/lib/logstash/filters/base.rb:159:in `multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:115:in `multi_filter'", "/home/xxx/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:243:in `block in start_workers'"]}
/home/xxx/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated

Logstash XML parsing output

I did some changes to your first version, the latest schema for the output event is the following:

{
    "chartdashlet" => "Banka Sigortacılığı Operasyon Adet ",
     "@timestamp" => 2019-11-13T11:27:03.550Z,
     "measure" => "SGT_GetirAktifTeklifPoliceMusteriNoTarihIle",
     "inputtype" => "dynatrace_dashboard_report",
     "@version" => "1",
     "measure_agg_type" => "Count",
     "value" => 2,
     "record_timestamp" => 2019-11-13T10:19:00.000Z,
     "dashboard" => "048_(SGT)_BANKA_SIGORTACILIĞI"
}

Final version of the script and the conf:

filter {
  xml {
    source => "message"
    target => "[@metadata][theXML]"
    force_array => false
    remove_field => [ "message" ]
    xpath => [ "/dashboardreport/source/filters/filter/text()" , "[@metadata][report_filter_text]" ]
  }
  dissect {
    mapping => { "[@metadata][report_filter_text]" => "%{?filter_type}?%{from_millis}:%{?end_millis}" }
  }
  date {
    match => [ "from_millis", "UNIX_MS" ]
    target => "record_timestamp"
  }
  ruby {
    path => "/home/xxx/code/dynatrace.rb"
  }
}
output {
  stdout {
    codec => rubydebug { metadata => true }
  }
 
}

I've did these changes in order to set the original record timestamp inside the xml file, and i get it by parsing the "filter" node inside the xml file and i added dashboard name to each record.

def register(params)
end

def filter(event)
    theEvents = []
    xml =  event.get("[@metadata][theXML]")
    recordTimestamp = event.get("record_timestamp")
    xml["data"]["chartdashlet"].each { |dashlet|
        dashlet["measures"]["measure"].each { |y|
            measurement = y["measurement"]
            anEvent = Hash[ "inputtype", "dynatrace_dashboard_report", "dashboard", xml["name"], "chartdashlet", dashlet["name"], "measure", y["measure"], "measure_agg_type", y["aggregation"] ]
            anEvent["record_timestamp"] = recordTimestamp
            if y["aggregation"] == "Average"
                sum = 0.0
                count = 0
                measurement.each { |z|
                    sum += z["sum"].to_f
                    count += z["count"].to_i
                }
                anEvent["value"] = sum/count
                theEvents << LogStash::Event.new(anEvent)
            elsif y["aggregation"] == "Count"
                count = 0
                measurement.each { |z|
                    count += z["count"].to_i
                }
                anEvent["value"] = count
                theEvents << LogStash::Event.new(anEvent)
            end
        }
    }
    theEvents
end

Best,
Volkan

line 25:
i can not see any value in the xml that is not convertible to int.

      <measurement timestamp="1573640520000" avg="0.20227371652921042" min="0.1356492042541504" max="0.2539059519767761" sum="0.6068211495876312" count="3"/>
      <measurement timestamp="1573640540000" avg="0.1311503474911054" min="0.12279105931520462" max="0.14222320914268494" sum="0.3934510424733162" count="3"/>
      <measurement timestamp="1573640550000" avg="0.15329458378255367" min="0.10025250911712646" max="0.24541789293289185" sum="0.6131783351302147" count="4"/>
      <measurement timestamp="1573640560000" avg="176.46291141318423" min="0.10929568856954575" max="1234.3441162109375" sum="1235.2403798922896" count="7"/>
      <measurement timestamp="1573640520000" avg="0.12850795686244965" min="0.12850795686244965" max="0.12850795686244965" sum="0.12850795686244965" count="1"/>
      <measurement timestamp="1573640520000" avg="0.22263357788324356" min="0.18629144132137299" max="0.25897571444511414" sum="0.4452671557664871" count="2"/>
      <measurement timestamp="1573640530000" avg="4801.10693359375" min="4801.10693359375" max="4801.10693359375" sum="4801.10693359375" count="1"/>
      <measurement timestamp="1573640540000" avg="0.2167513519525528" min="0.2167513519525528" max="0.2167513519525528" sum="0.2167513519525528" count="1"/>
      <measurement timestamp="1573640560000" avg="0.20838549733161926" min="0.20838549733161926" max="0.20838549733161926" sum="0.20838549733161926" count="1"/>
      <measurement timestamp="1573640520000" avg="0.25897571444511414" min="0.25897571444511414" max="0.25897571444511414" sum="0.25897571444511414" count="1"/>
      <measurement timestamp="1573640530000" avg="2400.700741663575" min="0.29454973340034485" max="4801.10693359375" sum="4801.40148332715" count="2"/>
      <measurement timestamp="1573640540000" avg="0.2167513519525528" min="0.2167513519525528" max="0.2167513519525528" sum="0.2167513519525528" count="1"/>
      <measurement timestamp="1573640520000" avg="0.21644921600818634" min="0.1356492042541504" max="0.25897571444511414" sum="0.8657968640327454" count="4"/>
      <measurement timestamp="1573640530000" avg="2400.700741663575" min="0.29454973340034485" max="4801.10693359375" sum="4801.40148332715" count="2"/>
      <measurement timestamp="1573640540000" avg="0.15255059860646725" min="0.12279105931520462" max="0.2167513519525528" sum="0.610202394425869" count="4"/>
      <measurement timestamp="1573640550000" avg="0.15329458378255367" min="0.10025250911712646" max="0.24541789293289185" sum="0.6131783351302147" count="4"/>
      <measurement timestamp="1573640560000" avg="176.46291141318423" min="0.10929568856954575" max="1234.3441162109375" sum="1235.2403798922896" count="7"/>
      <measurement timestamp="1573640520000" avg="0.12850795686244965" min="0.12850795686244965" max="0.12850795686244965" sum="0.12850795686244965" count="1"/>
      <measurement timestamp="1573640520000" avg="0.22263357788324356" min="0.18629144132137299" max="0.25897571444511414" sum="0.4452671557664871" count="2"/>
      <measurement timestamp="1573640530000" avg="4801.10693359375" min="4801.10693359375" max="4801.10693359375" sum="4801.10693359375" count="1"/>
      <measurement timestamp="1573640540000" avg="0.2167513519525528" min="0.2167513519525528" max="0.2167513519525528" sum="0.2167513519525528" count="1"/>
      <measurement timestamp="1573640560000" avg="0.20838549733161926" min="0.20838549733161926" max="0.20838549733161926" sum="0.20838549733161926" count="1"/>
      <measurement timestamp="1573640520000" avg="0.25897571444511414" min="0.25897571444511414" max="0.25897571444511414" sum="0.25897571444511414" count="1"/>
      <measurement timestamp="1573640530000" avg="2400.700741663575" min="0.29454973340034485" max="4801.10693359375" sum="4801.40148332715" count="2"/>
      <measurement timestamp="1573640540000" avg="0.2167513519525528" min="0.2167513519525528" max="0.2167513519525528" sum="0.2167513519525528" count="1"/>
      <measurement timestamp="1573640520000" avg="0.20227371652921042" min="0.1356492042541504" max="0.2539059519767761" sum="0.6068211495876312" count="3"/>
      <measurement timestamp="1573640540000" avg="0.1311503474911054" min="0.12279105931520462" max="0.14222320914268494" sum="0.3934510424733162" count="3"/>
      <measurement timestamp="1573640550000" avg="0.15329458378255367" min="0.10025250911712646" max="0.24541789293289185" sum="0.6131783351302147" count="4"/>
      <measurement timestamp="1573640560000" avg="176.46291141318423" min="0.10929568856954575" max="1234.3441162109375" sum="1235.2403798922896" count="7"/>
      <measurement timestamp="1573640520000" avg="0.21644921600818634" min="0.1356492042541504" max="0.25897571444511414" sum="0.8657968640327454" count="4"/>
      <measurement timestamp="1573640530000" avg="2400.700741663575" min="0.29454973340034485" max="4801.10693359375" sum="4801.40148332715" count="2"/>
      <measurement timestamp="1573640540000" avg="0.15255059860646725" min="0.12279105931520462" max="0.2167513519525528" sum="0.610202394425869" count="4"/>
      <measurement timestamp="1573640550000" avg="0.15329458378255367" min="0.10025250911712646" max="0.24541789293289185" sum="0.6131783351302147" count="4"/>
      <measurement timestamp="1573640560000" avg="176.46291141318423" min="0.10929568856954575" max="1234.3441162109375" sum="1235.2403798922896" count="7"/>

Hi I found the root cause after adding logger.trace() inside the Ruby script.

elsif y["aggregation"] == "Count"
            count = 0
            measurement.each { |z|
                logger.trace("zcount is: #{z["count"]}" )
                count += z["count"].to_i
            }
            logger.trace("count sum ends:")
            anEvent["value"] = count
            logger.trace("count sum is: #{anEvent["value"]}")
            theEvents << LogStash::Event.new(anEvent)
        end

Here is the output:

[2019-11-13T17:07:22,102][TRACE][logstash.filters.ruby.script.context][main] measure_name is: SGT_HesaplaSigortaPrimiVakifEmeklilik
[2019-11-13T17:07:22,108][TRACE][logstash.filters.ruby.script.context][main] zcount is: 3
[2019-11-13T17:07:22,109][TRACE][logstash.filters.ruby.script.context][main] zcount is: 3
[2019-11-13T17:07:22,109][TRACE][logstash.filters.ruby.script.context][main] zcount is: 4
[2019-11-13T17:07:22,109][TRACE][logstash.filters.ruby.script.context][main] zcount is: 7
[2019-11-13T17:07:22,109][TRACE][logstash.filters.ruby.script.context][main] count sum ends:
[2019-11-13T17:07:22,112][TRACE][logstash.filters.ruby.script.context][main] count sum is: 17
[2019-11-13T17:07:22,113][TRACE][logstash.filters.ruby.script.context][main] measure_name is: SGT_GetirAktifTeklifPoliceMusteriNoTarihIle
[2019-11-13T17:07:22,117][ERROR][logstash.filters.ruby    ][main] Could not process event: no implicit conversion of String into Integer {:script_path=>"/home/sa33670

This is due to the fact that when there is just a single measurement in the measure, there is no array holding each measurement but just a single object as follows:
[1] is NOK
[2] is OK

 **[1] {**
                                  "thresholds" => "false",
                                 "measurement" => {
                                    "timestamp" => "1573640520000",
                                          "max" => "0.12850795686244965",
                                        "count" => "1",
                                          "min" => "0.12850795686244965",
                                          "avg" => "0.12850795686244965",
                                          "sum" => "0.12850795686244965"
                                },
                                "drawingorder" => "5",
                                        "unit" => "num",
                                       "color" => "#0000ff",
                                     "measure" => "SGT_GetirAktifTeklifPoliceMusteriNoTarihIle",
                                 "aggregation" => "Count"
                            },
                            [2] {
                                  "thresholds" => "false",
                                 "measurement" => [
                                    [0] {
                                        "timestamp" => "1573640520000",
                                              "max" => "0.25897571444511414",
                                            "count" => "2",
                                              "min" => "0.18629144132137299",
                                              "avg" => "0.22263357788324356",
                                              "sum" => "0.4452671557664871"
                                    },
                                    [1] {
                                        "timestamp" => "1573640530000",
                                              "max" => "4801.10693359375",
                                            "count" => "1",
                                              "min" => "4801.10693359375",
                                              "avg" => "4801.10693359375",
                                              "sum" => "4801.10693359375"
                                    },
                                    [2] {
                                        "timestamp" => "1573640540000",
                                              "max" => "0.2167513519525528",
                                            "count" => "1",
                                              "min" => "0.2167513519525528",
                                              "avg" => "0.2167513519525528",
                                              "sum" => "0.2167513519525528"
                                    },
                                    [3] {
                                        "timestamp" => "1573640560000",
                                              "max" => "0.20838549733161926",
                                            "count" => "1",
                                              "min" => "0.20838549733161926",
                                              "avg" => "0.20838549733161926",
                                              "sum" => "0.20838549733161926"
                                    }
                                ],

I have to change the script to handle this case.

Here is the final version of my script handling the cases where
measurement.count == 0 and measurement.count == 1

def register(params)

end

def filter(event)
    theEvents = []
    xml =  event.get("[@metadata][theXML]")
    recordTimestamp = event.get("record_timestamp")

    xml["data"]["chartdashlet"].each { |dashlet|
        dashlet["measures"]["measure"].each { |y|
            #logger.trace("measure_name is: #{y["measure"]}")
            measurement = y["measurement"]
           
            anEvent = Hash[ "inputtype", "dynatrace_dashboard_report", "dashboard", xml["name"], "chartdashlet", dashlet["name"], "measure", y["measure"], "measure_agg_type", y["aggregation"] ]
            anEvent["record_timestamp"] = recordTimestamp
            
            if measurement.nil? #handles when there is no measurement under a measure
                anEvent["value"] = 0
            elsif y["aggregation"] == "Average"
                #do if more than 1 measurement
                if measurement.kind_of?(Array)
                    sum = 0.0
                    count = 0
                    measurement.each { |z|
                        sum += z["sum"].to_f
                        count += z["count"].to_i
                    }
                    anEvent["value"] = sum/count
                else #do when there is only 1 record
                    anEvent["value"] = measurement["avg"]
                end

            elsif y["aggregation"] == "Count"
                if measurement.kind_of?(Array)
                    count = 0
                    measurement.each { |z|
                        count += z["count"].to_i
                    }
                    anEvent["value"] = count
                else #handles when there is only 1 record
                   anEvent["value"] = measurement["count"]
                end
            end

            theEvents << LogStash::Event.new(anEvent)
        }
    }
    theEvents
end

For other file based exceptions, logstash adds _rubyexception tag to the input object and my strategy is to directly push these records to my index in order to keep track of exception counts.

For others who may need this kind of event processing, please note that all the input fields (except the ones existing at the time i instantiate a new event in Ruby) are discarded in case of successfull ruby script execution since i produce 5 events out of a single input event, but when it fails, all the input fields are output with an extra _rubyexception tag:

        {
          "@timestamp": "2019-11-15T11:00:59.138Z",
          "input": {
            "type": "log"
          },
          "inputtype": "dynatrace_dashboard_report",
          "tags": [
            "beats_input_codec_plain_applied",
            **"_rubyexception"**
          ],
          "@version": "1",
          "record_timestamp": "2019-11-15T07:14:00.000Z",
          "log": {
            "file": {
              "path": "/home/xxx/input-filebeat/dynatrace_048_(SGT)_BANKA_SIGORTACILIĞI_20191115101400.out"
            },
            "offset": 0,
            "flags": [
              "multiline"
            ]
          },
          "from_millis": "1573802040000",
          "agent": {
            "type": "filebeat",
            "id": "8dcd2d3e-be27-4cc7-ae38-14a7a16874fb",
            "version": "7.4.2",
            "hostname": "somehost",
            "ephemeral_id": "430243c8-e494-407b-b1c0-57155a4cbc45"
          }
        }

the final conf:

 filter {
  xml {
    source => "message"
    target => "[@metadata][theXML]"
    force_array => false
    remove_field => [ "message", "host", "ecs" ]
    add_field => { "inputtype" => "dynatrace_dashboard_report" }
    xpath => [ "/dashboardreport/source/filters/filter/text()" , "[@metadata][report_filter_text]" ]
  }
  dissect {
    mapping => { "[@metadata][report_filter_text]" => "%{?filter_type}?%{from_millis}:%{?end_millis}" }
  }
  date {
    match => [ "from_millis", "UNIX_MS" ]
    target => "record_timestamp"
  }
  ruby {
    path => "/home/xxx/code/dynatrace.rb"

  }
}
output {
  stdout {
    codec => rubydebug { metadata => true }
  }
  if [inputtype] == "dynatrace_dashboard_report" {
    file {
      path => "/home/xxx/debug-logstash1.out"
      codec => "json"
    }
  }
}