Manipulation with Logstash

Hi All,
I am new to Logstash manipulations and I have no idea how to do the below.
I have a sample data as below:

    Column:Type
    Incident Response P3
    Incident Resolution L1.5  P2
    ...

The column data is not always the same so a grok may not help me.
I want to extract the word 'Response' and 'Resolution' into a new column 'SLA type'

Im looking for something very alike to the below SQL statement:

case when Type like '%Resolution%' then Resolution
when Type like '%Response%' then Response 
end as SLA_Type

How do i manipulate this in Logstash?
Any help is appreciated.

Thanks in advance!

Katara.

Hi Katara,

how are the data ingested in Logstash? What is the input?

You can do a simple pipeline like

input {
  whatever your input plugin is
}

filter {}

output {
  stdout{}
}

and paste here what is returned on the standard output.

It'd help us understand which kind of document we're talking about.

Hi @Fabio-sama,
Below is my conf. I'm using an API input.
input {

http_poller {
    urls => {
    snowinc => {
    url => "https://service-now.com"
    user => "your_user"
    password => "yourpassword"
    headers => {Accept => "application/json"}
    }
}
request_timeout => 60
metadata_target => "http_poller_metadata"
schedule => { cron => "* * * * * UTC"}
codec => "json"
}
}
filter
   {
   json {source => "result" }
   split{ field => ["result"] }
}
output {
  elasticsearch {
    hosts => ["yourelastuicIP"]
    index => "incidentsnow"
    action=>update
    document_id => "%{[result][number]}"
    doc_as_upsert =>true
}
        stdout { codec => rubydebug }
}

The output for the json url looks like the below:

{"result":[
{
"made_sla":"true",
"Type":"incident resolution p3",
"sys_updated_on":"2019-12-23 05:00:00",
"number":"INC0010275",
"category":"Network"} ,
{
"made_sla":"true",
"Type":"incident resolution l1.5 p4",
"sys_updated_on":"2019-12-24 07:00:00",
"number":"INC0010567",
"category":"DB"}]}

Please let me know if anything I can use to extract the response and resolution words only, from the type column and put it as a new column "sla_type"

Thanks in advance!

Katara!

I'm sorry but I'm not getting what is your need. You posted this output

{
    "result": [
        {
            "made_sla": "true",
            "Type": "incident resolution p3",
            "sys_updated_on": "2019-12-23 05:00:00",
            "number": "INC0010275",
            "category": "Network"
        },
        {
            "made_sla": "true",
            "Type": "incident resolution l1.5 p4",
            "sys_updated_on": "2019-12-24 07:00:00",
            "number": "INC0010567",
            "category": "DB"
        }
    ]
}

Would you like to do something like

 - if there is the word resolution in the field Type then add a new field sla_type with value resolution; 
 - if there is the word response in the field Type then add a new field sla_type with value response

?

Can you paste here what returns from the API call WITHOUT applying any filter? Feel free to blur sensitive data, I'm only interested in the structure.

For example, supposing your input message is

{
    "result": [
        {
            "made_sla": "true",
            "Type": "incident resolution p3",
            "sys_updated_on": "2019-12-23 05:00:00",
            "number": "INC0010275",
            "category": "Network"
        },
        {
            "made_sla": "true",
            "Type": "incident resolution l1.5 p4",
            "sys_updated_on": "2019-12-24 07:00:00",
            "number": "INC0010567",
            "category": "DB"
        }
    ]
}

This configuration:

filter {
  json {
    source => "message" 
  }

  ruby {
    code => "
      jsons = event.get('result')
      jsons.map do |j|
        j['sla_type'] = 'Response' if j['Type'].match(/response/i)
        j['sla_type'] = 'Resolution' if j['Type'].match(/resolution/i)
      end
      event.set('result', jsons)
    "
  }
}

Would give you this output:

{
    "result": [
        {
            "category": "Network",
            "number": "INC0010275",
            "sys_updated_on": "2019-12-23 05:00:00",
            "made_sla": "true",
            "Type": "incident response p3",
            "sla_type": "Response"
        },
        {
            "category": "DB",
            "number": "INC0010567",
            "sys_updated_on": "2019-12-24 07:00:00",
            "made_sla": "true",
            "Type": "incident resolution l1.5 p4",
            "sla_type": "Resolution"
        }
    ],
    "message": "{\"result\": [{\"made_sla\": \"true\",\"Type\": \"incident response p3\",\"sys_updated_on\": \"2019-12-23 05:00:00\",\"number\": \"INC0010275\",\"category\": \"Network\"},{\"made_sla\": \"true\",\"Type\": \"incident resolution l1.5 p4\",\"sys_updated_on\": \"2019-12-24 07:00:00\",\"number\": \"INC0010567\",\"category\": \"DB\"}]}",
    "@timestamp": "2020-01-16T15:05:35.878Z",
    "@version": "1"
}
1 Like

Hello @Fabio-sama,
The data I uploaded is the actual data when I hit the API.
And yes, I'm exactly looking to do what your are suggesting.

I will try this out and get back with my results :slight_smile: Thank you so much for helping me out! :slight_smile:

Perfect! Then my previous answer should fit your needs.

Just check whether you need to put "message" in the json filter or if what returns from the API is already seen as a json.

But the core part to add a sla_type according to what is written in the Type field is there.

@Fabio-sama,
My data is already in json form, so your answer should work for mine :slight_smile:

I will try this out!

Katara.

Hi @Fabio-sama,
the filter seems to work but however,
only one row of data is getting uploaded.

Here's my config

input {
  http_poller {
    urls => {
    snowsla => {
  url => "https://service-now.com"
 user => "your_user"
 password => "yourpassword"
    headers => {Accept => "application/json"}
    }
}
request_timeout => 60
metadata_target => "http_poller_metadata"
schedule => { cron => " * * * * * UTC"}
#codec => "json"
}
}
filter
   {
   json
       {
       source => "result"
      }
ruby {
    code => "
      jsons = event.get('result')
      jsons.map do |j|
        j['sla_type'] = 'Response' if j['sla'].match(/Response/i)
        j['sla_type'] = 'Resolution' if j['sla'].match(/Resolution/i)
      end
      event.set('result', jsons)
    "
  }
date {
  match => ["[result][sys_updated_on]","yyyy-MM-dd HH:mm:ss"]
  target => "sys_created_on"
     }

}
output {
  elasticsearch {
    hosts => ["eshost:9200"]
    index => "test"
    action=>update
    document_id => "%{[result][number]}"
    doc_as_upsert =>true
}
        stdout { codec => rubydebug }
}

if i add

split{ field => ["result"] }

to my filter, then the sla_type column isn't manipulated anymore.
Please guide me on where the issue is possibly.

Can you please share the stdout of this pipeline (edit the authentication info in output obviously):

input {
  http_poller {
    urls => {
      snowsla => {
        url => "https://service-now.com"
        user => "your_user"
        password => "yourpassword"
        headers => {Accept => "application/json"}
      }
    }
  request_timeout => 60
  metadata_target => "http_poller_metadata"
  schedule => { cron => " * * * * * UTC"}
  }
}

filter {
}

output {
  stdout {}
}

I'll ask for another one once you post it.

@Fabio-sama I actually have a lot of Java dependencies to look into to bring this into a console output.
I dont want to keep you waiting.

However, I ran this as a service,
Here's my logs:

[2020-01-17T05:45:35,592][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.2.0"}
[2020-01-17T05:45:39,853][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-01-17T05:45:39,857][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"test", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x48649bd4 run>"}
[2020-01-17T05:45:39,887][INFO ][logstash.inputs.http_poller] Registering http_poller Input {:type=>nil, :schedule=>{"cron"=>" * * * * * UTC"}, :timeout=>nil}
[2020-01-17T05:45:39,915][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"test"}
[2020-01-17T05:45:40,002][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:test], :non_running_pipelines=>[]}
[2020-01-17T05:45:40,252][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Is this any helpful?

Mhmh no I'm afraid it is not.

I need what comes next to it. The actual output of the pipeline without applying any filter (basically I wanna know how Logstash is interpreting the API response).

Hi @Fabio-sama,
In the mean time, I tried

if "response" in [result][type] {
    mutate {
        add_field => { "SLA_Type" => "Response" }
    }
}
if "resolution" in [result][type] {
    mutate {
        add_field => { "SLA_Type" => "Resolution" }
    }
}

This is a response for my question from Stackoverflow.
https://stackoverflow.com/questions/59769519/logstash-configuration-for-word-extraction/59788755#59788755
This seems to work fine as well.

I think the issue is that the Document ID wasn't considered properly when I received a single row only. When i apply the above, the document ID is properly recognized.
I will work on to see if I can rectify it somehow. Not sure of the cause yet.

Thank you very much for all the time and help you've given me :slight_smile:

Katara.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.