Hi Elastic team,
Im trying to process logs from liferay (a CMS application)
liferay logs daily single file contains above 1000 lines
Sample log lines inside log file
13:56:59,178 INFO [ContainerBackgroundProcessor[StandardEngine[Catalina]]][HotDeployEvent:145] Plugin tf-seller-portlet requires marketplace-portlet
13:56:59,180 INFO [ContainerBackgroundProcessor[StandardEngine[Catalina]]][PortletHotDeployListener:525] Unregistering portlets for tf-seller-portlet
13:56:59,314 INFO [ContainerBackgroundProcessor[StandardEngine[Catalina]]][PortletHotDeployListener:560] 1 portlet for tf-seller-portlet was unregistered
13:56:59,317 INFO [ContainerBackgroundProcessor[StandardEngine[Catalina]]][PluginPackageUtil:1016] Reading plugin package for tf-seller-portlet
13:05:34,755 WARN [ajp-bio-8009-exec-1][SecurityPortletContainerWrapper:623] Reject process action for https://www.abc.com/login1111 on 49
13:05:34,767 WARN [ajp-bio-8009-exec-2][SecurityPortletContainerWrapper:623] Reject process action for https://www.abc.com/login1111 on 49
13:35:36,194 ERROR [liferay/search_writer/SYSTEM_ENGINE-1][IndexAccessorImpl:336] Check Lucene directory failed for 0
org.apache.lucene.store.LockReleaseFailedException: Cannot forcefully unlock a NativeFSLock which is held by another indexer component: /opt/foss/data/lucene/0/write.lock
at org.apache.lucene.store.NativeFSLock.release(NativeFSLockFactory.java:294)
at org.apache.lucene.index.IndexWriter.unlock(IndexWriter.java:4654)
at com.liferay.portal.search.lucene.IndexAccessorImpl._checkLuceneDir(IndexAccessorImpl.java:132)
at com.liferay.portal.search.lucene.IndexAccessorImpl.<init>(IndexAccessorImpl.java:87)
at com.liferay.portal.search.lucene.LuceneHelperImpl.getIndexAccessor(LuceneHelperImpl.java:391)
at com.liferay.portal.search.lucene.LuceneHelperImpl.updateDocument(LuceneHelperImpl.java:791)
at com.liferay.portal.search.lucene.LuceneHelperUtil.updateDocument(LuceneHelperUtil.java:428)
at com.liferay.portal.search.lucene.LuceneIndexWriter.updateDocument(LuceneIndexWriter.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:23)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.liferay.portal.kernel.messaging.proxy.ProxyRequest.execute(ProxyRequest.java:85)
at com.liferay.portal.kernel.messaging.proxy.ProxyMessageListener.receive(ProxyMessageListener.java:51)
at com.liferay.portal.kernel.messaging.InvokerMessageListener.receive(InvokerMessageListener.java:72)
at com.liferay.portal.kernel.messaging.ParallelDestination$1.run(ParallelDestination.java:69)
at com.liferay.portal.kernel.concurrent.ThreadPoolExecutor$WorkerTask._runTask(ThreadPoolExecutor.java:682)
at com.liferay.portal.kernel.concurrent.ThreadPoolExecutor$WorkerTask.run(ThreadPoolExecutor.java:593)
at java.lang.Thread.run(Thread.java:745)
Trying to achieve
- Filter the single log file into ERROR , INFO and others accordingly and process them with grok
- Trying to use separate grok for only ERROR lines as these have stack trace
- Output all to elastic search and also output to separate files as INFO, ERROR accordingly
Below is pipeline config
my config is not proper
Q1) is it better to use filebeat for dividing the single log into INFO, ERROR etc.. and then process to logstash or directly input to logstash
input {
file {
path => "/home/foss/liferay/*.log"
start_position => beginning
codec => multiline {
pattern => "^%{TIME}"
negate => true
what => "previous"
auto_flush_interval => 5
}
}
}
filter {
grok {
match => { "message" => "%{TIME:logTime}\s*%{LOGLEVEL:logLevel}\s*\[%{GREEDYDATA:logThread}\]\s*\[%{GREEDYDATA:logClassName}:%{NUMBER:logLineNumber}\]\s*%{GREEDYDATA:logMessage}" }
}
if "ERROR" in [logLevel] {
mutate {
replace => { type => "ERROR" }
}
grok {
match => {"mesage" => "%{TIME:logTime}\s*%{LOGLEVEL:logLevel}\s*\[%{GREEDYDATA:logThread}\]\s*\[%{GREEDYDATA:logClassName}:%{NUMBER:logLineNumber}\]\s*%{GREEDYDATA:logMessage}" }
}
}
ruby {
code => "
event.set('datetime', [event.get('path').split('/')[-1].split('.')[-2], event.get('logTime').split(',')[0]].join(' '))
"
}
date {
match => [ "datetime", "yyyy-MM-dd HH:mm:ss" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"] # load balances between the hosts
document_type => "default" # Defaults to "type" field
index => "liferay"
http_compression => true
}
if [type] == "ERROR" {
stdout {
file => /home/foss/proccessed/"%{type}-%{datetime}"
}
}