Handling multiline with filebeat

Hello,

I have the following snippet, and I am trying to capture all of it as a multiline

<< JESI>> [ERROR] [TIME:29 Mar 2022 04:34:53][Tid:OUTBOUND_RECOVERY_01339]Exception @[NodeId=6;Element=Channel@amdocs/eai/adaptor/outbound/QueryAccountByCustID.xml]Error=Exception @[NodeId=6;Element=Channel@amdocs/eai/adaptor/outbound/QueryAccountByCustID.xml]Error=amdocs.ecommerce.esi.exceptions.CommunicationException: [LoginFault [ApiFault exceptionCode='INVALID_LOGIN'
exceptionMessage='Invalid username, password, security token; or user locked out.'
]
]

amdocs.ecommerce.esi.exceptions.CommunicationException: [LoginFault [ApiFault exceptionCode='INVALID_LOGIN'
exceptionMessage='Invalid username, password, security token; or user locked out.'
]
]

    at amdocs.ecommerce.esi.proxy.SFDCProxy.connect(SFDCProxy.java:206)
    at amdocs.ecommerce.esi.handlers.channels.SFDCProxyChannel.publish(SFDCProxyChannel.java:78)
    at amdocs.ecommerce.esi.utils.flow.invoke.Channel.doExecute(Channel.java:305)
    at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
    at amdocs.ecommerce.esi.utils.flow.ProcessFlow.execute(ProcessFlow.java:1004)
    at amdocs.ecommerce.esi.utils.flow.ProcessFlowEngine.execute(ProcessFlowEngine.java:638)
    at amdocs.ecommerce.esi.utils.flow.FLOW.doExecute(FLOW.java:339)
    at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
    at amdocs.ecommerce.esi.utils.flow.TRY.doExecute(TRY.java:81)
    at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
    at amdocs.ecommerce.esi.utils.flow.TRYCATCH.doExecute(TRYCATCH.java:218)
    at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
    at amdocs.ecommerce.esi.utils.flow.ELSE.doExecute(ELSE.java:76)
    at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
    at amdocs.ecommerce.esi.utils.flow.IF.doExecute(IF.java:147)
    at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
    at amdocs.ecommerce.esi.utils.flow.ProcessFlow.execute(ProcessFlow.java:1004)
    at amdocs.ecommerce.esi.utils.flow.ProcessFlowEngine.execute(ProcessFlowEngine.java:677)
    at amdocs.ecommerce.esi.eai.api.flows.sub.GenericProcessFlowHandler.executeFlow(GenericProcessFlowHandler.java:51)

You can use a multiline configuration:

- type: log
  paths:
    - "/var/log/application.log"
  ignore_older: 1h
  fields_under_root: true
  multiline:
    pattern: '^<< JESI>>'
    negate: true
    match: after
    timeout: 5s

Thanks for the reply. I went ahead an implemented it, but it seems now that events are appended to each other as one document in Elasticsearch. Could I be doing something wrong? I am sending the events from filebeat to kafka, then I read kafka topic and write it to ES

<< JESI>> [INFO] [TIME:6 Apr 2022 17:55:19][Tid:ADAPTOR_POST_SUB_01_05323][ID=20220406175519_239735_dfwlnpkgjesi-01_40820_bfebb84e-29b8-4666-afc6-001acc567c7c;Flow=amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]callPrimaryMCByRepIdService from Input : 1
<< JESI>> [INFO] [TIME:6 Apr 2022 17:55:19][Tid:ADAPTOR_POST_SUB_01_05323][ID=20220406175519_239735_dfwlnpkgjesi-01_40820_bfebb84e-29b8-4666-afc6-001acc567c7c;Flow=amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]primaryMcID from Input : 195280
<< JESI>> [INFO] [TIME:6 Apr 2022 17:55:19][Tid:ADAPTOR_POST_SUB_01_05323][ID=20220406175519_239736_dfwlnpkgjesi-01_40820_4a1ee972-dbee-4a4e-bb6f-7289e7879431;Flow=amdocs/eai/adaptor/outbound/QueryUserByRepID.xml]Query = SELECT Id FROM User where Dex_kGen_Employee_ID__c = '195280' LIMIT 1
<< JESI>> [INFO] [TIME:6 Apr 2022 17:55:19][Tid:ADAPTOR_POST_SUB_01_05323][ID=20220406175519_239736_dfwlnpkgjesi-01_40820_4a1ee972-dbee-4a4e-bb6f-7289e7879431;Flow=amdocs/eai/adaptor/outbound/QueryUserByRepID.xml]Query = [SELECT Id FROM User where Dex_kGen_Employee_ID__c = '195280' LIMIT 1]
java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor172.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at amdocs.ecommerce.esi.proxy.SFDCProxy.connect(SFDCProxy.java:198)
	at amdocs.ecommerce.esi.handlers.channels.SFDCProxyChannel.publish(SFDCProxyChannel.java:78)
	at amdocs.ecommerce.esi.utils.flow.invoke.Channel.doExecute(Channel.java:305)
	at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
	at amdocs.ecommerce.esi.utils.flow.ProcessFlow.execute(ProcessFlow.java:1004)
	at amdocs.ecommerce.esi.utils.flow.ProcessFlowEngine.execute(ProcessFlowEngine.java:638)
	at amdocs.ecommerce.esi.utils.flow.FLOW.doExecute(FLOW.java:339)
	at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
	at amdocs.ecommerce.esi.utils.flow.THEN.doExecute(THEN.java:79)
	at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)
	at amdocs.ecommerce.esi.utils.flow.IF.doExecute(IF.java:141)
	at amdocs.ecommerce.esi.utils.flow.BaseElement.execute(BaseElement.java:216)

Also, is there a way for me to drop events via filebeat, if an event with specific string match is found?

How do you mean, appended in kibana?
Can you provide an example (json export)

{
  "_index": "kgjesi-2022.04.07",
  "_type": "kg-jesi-logs-st1",
  "_id": "AYAFBDR9l1hx9GHPv1PX",
  "_version": 1,
  "_score": 2,
  "_source": {
    "@timestamp": "2022-04-07T17:13:08.782Z",
    "beat": {
      "hostname": "dfwlnpkgjesi-01",
      "name": "dfwlnpkgjesi-01"
    },
    "input_type": "log",
    "message": "<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6;Element=Channel@amdocs/eai/adaptor/outbound/QueryAccountByCustID.xml]Error=Exception @[NodeId=6;Element=Channel@amdocs/eai/adaptor/outbound/QueryAccountByCustID.xml]Error=amdocs.ecommerce.esi.exceptions.CommunicationException: [LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]\nFlow:[Id=20220407121257_103850_dfwlnpkgjesi-01_40820_bd76cc0f-9412-40bb-894e-f28f5fa31478;name=amdocs/eai/adaptor/outbound/QueryAccountByCustID.xml;nodeId=6;name=]\nMessage:Execute - Process (20220407121257_103850_dfwlnpkgjesi-01_40820_bd76cc0f-9412-40bb-894e-f28f5fa31478-amdocs/eai/adaptor/outbound/QueryAccountByCustID.xml) Terminated with Error!!!amdocs.ecommerce.esi.exceptions.CommunicationException: [LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8.1.1.1.2;Element=FLOW@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8.1.1.1.2;Element=FLOW@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=amdocs.ecommerce.esi.exceptions.CommunicationException: [LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8.1.1.1;Element=TRY@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8.1.1.1;Element=TRY@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=amdocs.ecommerce.esi.exceptions.CommunicationException: [LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]\nFlow:[Id=20220407121257_103849_dfwlnpkgjesi-01_40820_5baefb47-c2c4-487d-a753-c9af60335f6a;name=amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml;nodeId=6.2.1.1.8.1.1.2.1;name=]\nMessage:Execute - [handleRetrieveAccountContactException] Java code Failed!\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8.1.1.2.1;Element=CODE@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8.1.1.2.1;Element=CODE@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-4;Description=[LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8.1.1.2;Element=CATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8.1.1.2;Element=CATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-4;Description=[LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8.1.1;Element=TRYCATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8.1.1;Element=TRYCATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-4;Description=[LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8.1;Element=IF-THEN@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8.1;Element=IF-THEN@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-4;Description=[LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1.8;Element=IF@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1.8;Element=IF@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-4;Description=[LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.1;Element=TRY@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.1;Element=TRY@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-4;Description=[LoginFault [ApiFault  exceptionCode='INVALID_LOGIN'\n exceptionMessage='Invalid username, password, security token; or user locked out.'\n]\n]\n\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]\nFlow:[Id=20220407121257_103849_dfwlnpkgjesi-01_40820_5baefb47-c2c4-487d-a753-c9af60335f6a;name=amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml;nodeId=6.2.1.2.1;name=]\nMessage:Execute - [handleExecuteException] Java code Failed!\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.2.1;Element=CODE@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.2.1;Element=CODE@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-1;Description=amdocs.ecommerce.common.exceptions.SystemException:\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1.2;Element=CATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1.2;Element=CATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-1;Description=amdocs.ecommerce.common.exceptions.SystemException:\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2.1;Element=TRYCATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2.1;Element=TRYCATCH@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-1;Description=amdocs.ecommerce.common.exceptions.SystemException:\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6.2;Element=IF-ELSE@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6.2;Element=IF-ELSE@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-1;Description=amdocs.ecommerce.common.exceptions.SystemException:\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]Exception @[NodeId=6;Element=IF@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Exception @[NodeId=6;Element=IF@amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml]Error=Code=JESIERR-1;Description=amdocs.ecommerce.common.exceptions.SystemException:\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319]\nFlow:[Id=20220407121257_103849_dfwlnpkgjesi-01_40820_5baefb47-c2c4-487d-a753-c9af60335f6a;name=amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml;nodeId=6.2.1.2.1;name=]\nMessage:Execute - Process (20220407121257_103849_dfwlnpkgjesi-01_40820_5baefb47-c2c4-487d-a753-c9af60335f6a-amdocs/eai/adaptor/outbound/OUTBOUND_UPSERT_CUST.xml) Terminated with Error!!!Code=JESIERR-1;Description=amdocs.ecommerce.common.exceptions.SystemException:\n<< JESI>> [ERROR] [TIME:7 Apr 2022 12:13:07][Tid:ADAPTOR_POST_SUB_01_06319][EG:Name=ADAPTOR_POST_SUB_01_06;Type=JDBC(1649351587317)][InterfaceException] Code[JESIERR-1] Description[amdocs.ecommerce.common.exceptions.SystemException:] StackTrace[amdocs.ecommerce.esi.exceptions.SubscribeException: \n\tat amdocs.ecommerce.esi.handlers.SubscribeMessageManagerImpl.postSubscribe(SubscribeMessageManagerImpl.java:704)\n\tat amdocs.ecommerce.esi.api.EJBInterface.EJBInterfaceAPIBean.postSubscribe(EJBInterfaceAPIBean.java:265)\n\tat amdocs.ecommerce.esi.api.EJBInterface.EJBInterfaceAPIOfflineImpl.postSubscribe(EJBInterfaceAPIOfflineImpl.java:970)\n\tat amdocs.ecommerce.esi.handlers.events.SubscribeTimerEventHandler.callFlow(SubscribeTimerEventHandler.java:227)\n\tat amdocs.ecommerce.esi.handlers.events.SubscribeTimerEventHandler.doBeanExecute(SubscribeTimerEventHandler.java:282)\n\tat amdocs.ecommerce.esi.api.EventGeneratorManager.EventGeneratorManagerBean.executeOnEventRecord(EventGeneratorManagerBean.java:163)\n\tat amdocs.ecommerce.esi.api.EventGeneratorManager.EventGeneratorManagerOfflineImpl.executeOnEventRecord(EventGeneratorManagerOfflineImpl.java:2239)\n\tat amdocs.ecommerce.esi.handlers.events.JDBCPublishTimerHandler.executeOne(JDBCPublishTimerHandler.java:1312)\n\tat amdocs.ecommerce.esi.handlers.events.JDBCPublishTimerHandler.execute(JDBCPublishTimerHandler.java:1225)\n\tat amdocs.ecommerce.esi.handlers.events.BaseGenericEvent.processEvent(BaseGenericEvent.java:651)\n\tat amdocs.ecommerce.esi.api.EventGeneratorManager.EventGeneratorManagerBean.processEvent(EventGeneratorManagerBean.java:227)\n\tat amdocs.ecommerce.esi.api.EventGeneratorManager.EventGeneratorManagerOfflineImpl.processEvent(EventGeneratorManagerOfflineImpl.java:112)\n\tat amdocs.ecommerce.esi.handlers.events.BaseGenericEvent.callEGBean(BaseGenericEvent.java:821)\n\tat amdocs.ecommerce.esi.handlers.events.BaseTimerEvent.onTimer(BaseTimerEvent.java:502)\n\tat amdocs.ecommerce.esi.handlers.events.BaseTimerEvent.run(BaseTimerEvent.java:342)\n\tat amdocs.ecommerce.esi.handlers.events.GenericJavaTimerTask.run(Ge\n<< JESI>> [INFO] [TIME:7 Apr 2022 12:13:08][Tid:BANK_ADP_POST_SUB_01_00388][ID=20220407121308_103867_dfwlnpkgjesi-01_40820_d0ee945e-71c1-4305-ad17-4ca6a63d8b19;Flow=amdocs/eai/adaptor/outbound/authrequest/NEW_ORDER_AUTH.xml]... Checking CHASE_SERVER = 1.",
    "offset": 15902514,
    "source": "/kgnusr/rhd/prdaweb/jesiPRD_PUB/jesidomain/logs/JESI_amdocsOnline.log",
    "tags": [
      "jesiPRD_PUB"
    ],
    "type": "kg-jesi-logs-st1",
    "@version": "1",
    "kafka": {
      "msg_size": 10806,
      "topic": "kg-jesi-logs-st1",
      "consumer_group": "logstash",
      "partition": 7,
      "key": null
    },
    "platform": "kgjesi"
  },
  "fields": {
    "@timestamp": [
      1649351588782
    ]
  }
}

You can see the new lines in the message field

Could you perhaps share your Filebeat and logstash configurations

Here is the filebeat config

filebeat:
  prospectors:
    -
      paths:
        - /kgnusr/rhd/prdcweb/prdAcrm1_01/WLServers/config/logs/online.log       
      document_type: kg-acrm-logs-st1
      tags: ["prdAcrm1_01"]
      multiline.pattern: '^<<JESI>>'
      multiline.negate: true
      multiline.match: after
      exclude_files: ['\.gz$']

    -
      paths:
        - /kgnusr/rhd/prdcweb/prdAcrm2_01/WLServers/config/logs/online.log
      document_type: kg-acrm-logs-st1
      tags: ["prdAcrm2_01"]
      multiline.pattern: '^<<JESI>>'
      multiline.negate: true
      multiline.match: after
      exclude_files: ['\.gz$']

    -
      paths:
        - /kgnusr/rhd/prdcweb/prdAcrm3_01/WLServers/config/logs/online.log
      document_type: kg-acrm-logs-st1
      tags: ["prdAcrm3_01"]
      multiline.pattern: '^<<JESI>>'
      multiline.negate: true
      multiline.match: after
      exclude_files: ['\.gz$']

    -
      paths:
        - /kgnusr/rhd/prdcweb/prdAcrm4_01/WLServers/config/logs/online.log
      document_type: kg-acrm-logs-st1
      tags: ["prdAcrm4_01"]
      multiline.pattern: '^<<JESI>>'
      multiline.negate: true
      multiline.match: after
      exclude_files: ['\.gz$']

  close_older: 480m
logging:
    level: debug 

    # enable file rotation with default configuration
    to_files: true

    # do not log to syslog
    to_syslog: false

    files:
      path: /filebeat-5.0.0-alpha3-linux-x64/var/log
      name: filebeat.log
      keepfiles: 7


output:

  kafka:
    hosts: ["blah:9092"]
    topics:
       - topic: '%{[type]}'
    use_type: true 
    worker: 3 
    compression: none 

And the logstash

input {
   kafka {
          zk_connect =>  "blah:2181"
          white_list => "kg-acrm-logs-st1"
          decorate_events => true

          codec => json 
         }
      }

filter {

  mutate {
     add_field => { "platform" => "kgacrm" }
         }

  if "prdAcrm" in [tags] {

        if [type] == "kg-acrm-logs-st1" {
           mutate {
              add_field => { "kg_host" => "%{[beat][hostname]}" }
              remove_field => ["[beat]"]
                  }
        }
           date {
             match => [ "timestamp", "MMM dd HH:mm:ss","MMM  d HH:mm:ss","MMM dd yyyy HH:mm:ss","MMM  d yyyy HH:mm:ss"]
             target => "@timestamp"
             timezone => "America/Chicago"
        }      
  }


}


output {

         elasticsearch {
             hosts => [ "blah:9200", "blah2:9200" ]
             index => "%{platform}-%{+YYYY.MM.dd}"
                     }

         stdout { codec => rubydebug }
       } 

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.