Require suggestion for the processing multi-line logs: filebeat or logstash

Hi Elastic team,

Im trying to process logs from liferay (a CMS application)

liferay logs daily single file contains above 1000 lines

Sample log lines inside log file

13:56:59,178 INFO  [ContainerBackgroundProcessor[StandardEngine[Catalina]]][HotDeployEvent:145] Plugin tf-seller-portlet requires marketplace-portlet
13:56:59,180 INFO  [ContainerBackgroundProcessor[StandardEngine[Catalina]]][PortletHotDeployListener:525] Unregistering portlets for tf-seller-portlet
13:56:59,314 INFO  [ContainerBackgroundProcessor[StandardEngine[Catalina]]][PortletHotDeployListener:560] 1 portlet for tf-seller-portlet was unregistered
13:56:59,317 INFO  [ContainerBackgroundProcessor[StandardEngine[Catalina]]][PluginPackageUtil:1016] Reading plugin package for tf-seller-portlet
13:05:34,755 WARN  [ajp-bio-8009-exec-1][SecurityPortletContainerWrapper:623] Reject process action for https://www.abc.com/login1111 on 49
13:05:34,767 WARN  [ajp-bio-8009-exec-2][SecurityPortletContainerWrapper:623] Reject process action for https://www.abc.com/login1111 on 49
13:35:36,194 ERROR [liferay/search_writer/SYSTEM_ENGINE-1][IndexAccessorImpl:336] Check Lucene directory failed for 0
org.apache.lucene.store.LockReleaseFailedException: Cannot forcefully unlock a NativeFSLock which is held by another indexer component: /opt/foss/data/lucene/0/write.lock
	at org.apache.lucene.store.NativeFSLock.release(NativeFSLockFactory.java:294)
	at org.apache.lucene.index.IndexWriter.unlock(IndexWriter.java:4654)
	at com.liferay.portal.search.lucene.IndexAccessorImpl._checkLuceneDir(IndexAccessorImpl.java:132)
	at com.liferay.portal.search.lucene.IndexAccessorImpl.<init>(IndexAccessorImpl.java:87)
	at com.liferay.portal.search.lucene.LuceneHelperImpl.getIndexAccessor(LuceneHelperImpl.java:391)
	at com.liferay.portal.search.lucene.LuceneHelperImpl.updateDocument(LuceneHelperImpl.java:791)
	at com.liferay.portal.search.lucene.LuceneHelperUtil.updateDocument(LuceneHelperUtil.java:428)
	at com.liferay.portal.search.lucene.LuceneIndexWriter.updateDocument(LuceneIndexWriter.java:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:23)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at com.liferay.portal.kernel.messaging.proxy.ProxyRequest.execute(ProxyRequest.java:85)
	at com.liferay.portal.kernel.messaging.proxy.ProxyMessageListener.receive(ProxyMessageListener.java:51)
	at com.liferay.portal.kernel.messaging.InvokerMessageListener.receive(InvokerMessageListener.java:72)
	at com.liferay.portal.kernel.messaging.ParallelDestination$1.run(ParallelDestination.java:69)
	at com.liferay.portal.kernel.concurrent.ThreadPoolExecutor$WorkerTask._runTask(ThreadPoolExecutor.java:682)
	at com.liferay.portal.kernel.concurrent.ThreadPoolExecutor$WorkerTask.run(ThreadPoolExecutor.java:593)
	at java.lang.Thread.run(Thread.java:745)

Trying to achieve

  1. Filter the single log file into ERROR , INFO and others accordingly and process them with grok
  2. Trying to use separate grok for only ERROR lines as these have stack trace
  3. Output all to elastic search and also output to separate files as INFO, ERROR accordingly

Below is pipeline config
my config is not proper

Q1) is it better to use filebeat for dividing the single log into INFO, ERROR etc.. and then process to logstash or directly input to logstash

input {
    
   file {
         path => "/home/foss/liferay/*.log"
         start_position => beginning
          
         codec => multiline {
                     pattern => "^%{TIME}"
                     negate => true
                     what => "previous"
                     auto_flush_interval => 5
                }
        }
}

filter {
    grok {
         match => { "message" => "%{TIME:logTime}\s*%{LOGLEVEL:logLevel}\s*\[%{GREEDYDATA:logThread}\]\s*\[%{GREEDYDATA:logClassName}:%{NUMBER:logLineNumber}\]\s*%{GREEDYDATA:logMessage}" }
                }

if "ERROR" in [logLevel]  {
		mutate {
			replace => { type => "ERROR" }
		}
    grok {
         match => {"mesage" => "%{TIME:logTime}\s*%{LOGLEVEL:logLevel}\s*\[%{GREEDYDATA:logThread}\]\s*\[%{GREEDYDATA:logClassName}:%{NUMBER:logLineNumber}\]\s*%{GREEDYDATA:logMessage}" }
         }
    }
    ruby {
        code => "
            event.set('datetime', [event.get('path').split('/')[-1].split('.')[-2], event.get('logTime').split(',')[0]].join(' '))
        "
    }
    date {
    	match => [ "datetime", "yyyy-MM-dd HH:mm:ss" ]
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"] # load balances between the hosts
	    document_type => "default" # Defaults to "type" field
	    index => "liferay"
	    http_compression => true
    }

if [type] == "ERROR" {
    stdout {
         file =>  /home/foss/proccessed/"%{type}-%{datetime}"
    }
}

What do you expect by using filebeat? You want to remove logstash and do everything in filebeat?

You have 2 outputs in your logstash config. This is not supported in Filebeat.

Hi steffen,

Thank you for your response, So for the input part Im just trying to know how to proceed with my use-case should i go with filebeat first (whether it can separate lines according to INFO, ERROR & WARN along with forming fields ) then send it to logstash or should i use directly logstash

Both are viable options. Filebeat splits logs by lines and has a multiline input. Somewhat similar to the file input. Why exactly you want to use Filebeat? If both Filebeat and Logstash run on the same host, then there is no need for Filebeat. If you want to offload processing/parsing to another host, use Filebeat + Logstash.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.