自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)
生活随笔
收集整理的這篇文章主要介紹了
自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.引入POM文件
如果想調用Flume,需要引入flume相關的jar包依賴,jar包依賴如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cn.com.toto.stormlogPro</artifactId><groupId>stormlogPro</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cn.com.toto.flume</artifactId><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.6.0</version><!-- 設置打包的時候,剔除依賴--><scope>provided</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.com.toto.stromlogpro.log4j.LogInfoBuilder</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build> </project>2.自定義的攔截器的代碼
package cn.com.toto.stromlogpro.flume;import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List;/*** 自定義一個點擊流收集的攔截器* * 1、實現一個Interceptor.Builder接口。* 2、Interceptor.Builder中有個configuref方法,通過configure獲取配置文件中的相應key。* 3、Interceptor.Builder中有個builder方法,通過builder創建一個自定義的AppInterceptor* 4、AppInterceptor中有兩個方法,一個是批處理,一個單條處理,將批處理的邏輯轉換為單條處理* 5、需要在單條數據中添加 appid,由于appid是變量。需要在AppInterceptor的構造器中傳入一些參數。* 6、為自定義的AppInterceptor創建有參構造器,將需要的參數傳入進來。** @author tuzq* @create 2017-06-25 12:48*/ public class AppInterceptor implements Interceptor{//4.定義成員變量appId,用來接收從配置文件中讀取的信息private String appId;public AppInterceptor(String appId) {this.appId = appId;}/*** 單條數據進行處理,通過這個方式為日志添加上系統id* @param event* @return*/@Overridepublic Event intercept(Event event) {String message = null;try {message = new String(event.getBody(), "utf-8");} catch (UnsupportedEncodingException e) {message = new String(event.getBody());}//處理邏輯if (StringUtils.isNotBlank(message)) {message = "aid:"+appId+"||msg:" +message;event.setBody(message.getBytes());//正常邏輯應該執行到這里return event;}return event;}/*** 批量數據進行處理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {List<Event> resultList = new ArrayList<Event>();for (Event event : list) {Event r = intercept(event);if (r != null) {resultList.add(r);}}return resultList;}@Overridepublic void initialize() {}@Overridepublic void close() {}public static class AppInterceptorBuilder implements Interceptor.Builder{//1、獲取配置文件的appIdprivate String appId;@Overridepublic Interceptor build() {//3、構造攔截器return new AppInterceptor(appId);}@Overridepublic void configure(Context context) {//2、當出現default之后,就是點擊流告警系統this.appId = context.getString("appId","default");System.out.println("appId:"+appId);}} }LogInfoBuilder的代碼如下:
package cn.com.toto.stromlogpro.log4j;import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.logging.Logger;/*** 通過這個工程模擬創建日志內容** @author tuzq* @create 2017-06-25 13:51*/ public class LogInfoBuilder {private final static Logger logger = Logger.getLogger("msg");public static void main(String[] args) {Random random = new Random();List<String> list = logInfoList();while(true) {logger.info(list.get(random.nextInt(list.size())));}}private static List<String> logInfoList() {List list = new ArrayList<String>();list.add("aid:1||msg:error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("java.sql.SQLException: You have an error in your SQL syntax;");list.add("error Unable to connect to any of the specified MySQL hosts.");list.add("error:Servlet.service() for servlet action threw exception java.lang.NullPointerException");list.add("error:Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 2");list.add("error:NoSuchMethodError: com/starit/.");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:Java.lang.IllegalStateException");list.add("error:Java.lang.IllegalMonitorStateException");list.add("error:Java.lang.NegativeArraySizeException");list.add("error:java.sql.SQLException: You have an error in your SQL syntax;");list.add("error:Java.lang.TypeNotPresentException ");list.add("error:Java.lang.UnsupprotedOperationException ");list.add("error Java.lang.IndexOutOfBoundsException");list.add("error Java.lang.ClassNotFoundException");list.add("error java.lang.ExceptionInInitializerError ");list.add("error:java.lang.IncompatibleClassChangeError ");list.add("error:java.lang.LinkageError ");list.add("error:java.lang.OutOfMemoryError ");list.add("error java.lang.StackOverflowError");list.add("error: java.lang.UnsupportedClassVersionError");list.add("error java.lang.ClassCastException");list.add("error: java.lang.CloneNotSupportedException");list.add("error: java.lang.EnumConstantNotPresentException ");list.add("error java.lang.IllegalMonitorStateException ");list.add("error java.lang.IllegalStateException ");list.add("error java.lang.IndexOutOfBoundsException ");list.add("error java.lang.NumberFormatException ");list.add("error java.lang.RuntimeException ");list.add("error java.lang.TypeNotPresentException ");list.add("error MetaSpout.java:9: variable i might not have been initialized");list.add("error MyEvaluator.java:1: class Test1 is public, should be declared in a file named Test1.java ");list.add("error Main.java:5: cannot find symbol ");list.add("error NoClassDefFoundError: asa wrong name: ASA ");list.add("error Test1.java:54: 'void' type not allowed here");list.add("error Test5.java:8: missing return statement");list.add("error:Next.java:66: cannot find symbol ");list.add("error symbol : method createTempFile(java.lang.String,java.lang.String,java.lang.String) ");list.add("error invalid method declaration; return type required");list.add("error array required, but java.lang.String found");list.add("error Exception in thread main java.lang.NumberFormatException: null 20. .");list.add("error non-static method cannot be referenced from a static context");list.add("error Main.java:5: non-static method fun1() cannot be referenced from a static context");list.add("error continue outside of loop");list.add("error MyAbstract.java:6: missing method body, or declare abstract");list.add("error Main.java:6: Myabstract is abstract; cannot be instantiated");list.add("error MyInterface.java:2: interface methods cannot have body ");list.add("error Myabstract is abstract; cannot be instantiated");list.add("error asa.java:3: modifier static not allowed here");list.add("error possible loss of precision found: long required:byte var=varlong");list.add("error java.lang.NegativeArraySizeException ");list.add("error java.lang.ArithmeticException: by zero");list.add("error java.lang.ArithmeticException");list.add("error java.lang.ArrayIndexOutOfBoundsException");list.add("error java.lang.ClassNotFoundException");list.add("error java.lang.IllegalArgumentException");list.add("error fatal error C1010: unexpected end of file while looking for precompiled header directive");list.add("error fatal error C1083: Cannot open include file: R…….h: No such file or directory");list.add("error C2011:C……clas type redefinition");list.add("error C2018: unknown character 0xa3");list.add("error C2057: expected constant expression");list.add("error C2065: IDD_MYDIALOG : undeclared identifier IDD_MYDIALOG");list.add("error C2082: redefinition of formal parameter bReset");list.add("error C2143: syntax error: missing : before ");list.add("error C2146: syntax error : missing ';' before identifier dc");list.add("error C2196: case value '69' already used");list.add("error C2509: 'OnTimer' : member function not declared in 'CHelloView'");list.add("error C2555: 'B::f1': overriding virtual function differs from 'A::f1' only by return type or calling convention");list.add("error C2511: 'reset': overloaded member function 'void (int)' not found in 'B'");list.add("error C2660: 'SetTimer' : function does not take 2 parameters");list.add("error warning C4035: 'f……': no return value");list.add("error warning C4553: '= =' : operator has no effect; did you intend '='");list.add("error C4716: 'CMyApp::InitInstance' : must return a value");list.add("error LINK : fatal error LNK1168: cannot open Debug/P1.exe for writing");list.add("error LNK2001: unresolved external symbol public: virtual _ _thiscall C (void)");list.add("error java.lang.IllegalArgumentException: Path index.jsp does not start with");list.add("error org.apache.struts.action.ActionServlet.process(ActionServlet.java:148");list.add("error org.apache.jasper.JasperException: Exception in JSP");list.add("error The server encountered an internal error () that prevented it from fulfilling this request");list.add("error org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467");list.add("error javax.servlet.http.HttpServlet.service(HttpServlet.java:803)");list.add("error javax.servlet.jsp.JspException: Cannot find message resources under key org.apache.struts.action.MESSAGE");list.add("error Stacktrace: org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467)");list.add("error javax.servlet.ServletException: Cannot find bean org.apache.struts.taglib.html.BEAN in any scope");list.add("error no data found");list.add("error exception in thread main org.hibernate.MappingException: Unknown entity:.");list.add("error using namespace std;");list.add("error C2065: 'cout' : undeclared identifier");list.add("error main already defined in aaa.obj");list.add("error syntax error : missing ';' before '}'");list.add("error cout : undeclared identifier");list.add("error weblogic.servlet.internal.WebAppServletContext$ServletInvocationAction.run(WebAp ");list.add("error Caused by: java.lang.reflect.InvocationTargetException");list.add("error Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("error at com.starit.gejie.Util.Trans.BL_getSysNamesByType(Trans.java:220)");return list;} }MyDailyRollingFileAppender的代碼如下:
package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.DailyRollingFileAppender; import org.apache.log4j.Priority;/*** @author tuzq* @create 2017-06-25 13:58*/ public class MyDailyRollingFileAppender extends DailyRollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);} }MyRollingFileAppender的代碼如下:
package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.Priority; import org.apache.log4j.RollingFileAppender;/*** @author tuzq* @create 2017-06-25 14:01*/ public class MyRollingFileAppender extends RollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);} }3.在Flume中的conf配置文件,并將收集的日志下沉到kafka中
a1.sources = r1 a1.channels = c1 a1.sinks = k1a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/info.log a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.com.toto.stromlogpro.flume.AppInterceptor$AppInterceptorBuilder #通過這個參數向自定義的Flume攔截器中傳遞參數(即系統編號) a1.sources.r1.interceptors.i1.appId = 1a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = log_monitor a1.sinks.k1.brokerList = hadoop1:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1總結
以上是生活随笔為你收集整理的自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HBase建表高级属性,hbase应用案
- 下一篇: 法国AB92式92mm火箭筒