目录
4.编写JSONUtils类(在com.zj.flume.interceptor下)
5.编写ETLInterceptor类(在com.zj.flume.interceptor下)
6.编写TimeStampInterceptor类(在com.zj.flume.interceptor下)
一、IDEA部分
1.创建maven项目
2.创建包
com.zj.flume.interceptor
3.添加依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4.编写JSONUtils类(在com.zj.flume.interceptor下)
package com.zj.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
public class JSONUtils {
public static boolean isJSONValidate(String log){
try {
JSON.parse(log);
return true;
}
catch (JSONException e){
return false;
}
}
}
5.编写ETLInterceptor类(在com.zj.flume.interceptor下)
package com.zj.flume.interceptor;
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
public class ETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
if (JSONUtils.isJSONValidate(log)) {
return event;
} else {
return null;
}
}
@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()){
Event next = iterator.next();
if(intercept(next)==null){
iterator.remove();
}
}
return list;
}
//com.zj.flume.interceptor.ETLInterceptor.Builder
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
@Override
public void close() {
}
}
6.编写TimeStampInterceptor类(在com.zj.flume.interceptor下)
package com.zj.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TimeStampInterceptor implements Interceptor {
private ArrayList<Event> events = new ArrayList<>();
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
events.clear();
for (Event event : list) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
7.打包
二、Linux部分
1.上传jar包到flume的lib目录下
2.编写.conf文件
flume生产者
file-flume-kafka
通过监控文件夹获取日志数据再把数据传输到kafka,运用ETLInterceptor类的拦截器
#主件命名
a1.sources = r1
#4拦截器配置
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zj.flume.interceptor.ETLInterceptor$Builder
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zj.flume.interceptor.ETLInterceptor$Builder
flume消费者
kafka-flume-hdfs
kafka获取数据,flume消费数据,把数据传输到Hadoop的hdfs,运用TimeStampInterceptor类的拦截器(时间拦截器)
#组件
a1.sources=r1
#flume拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type
a1.sources.r1.interceptors.i1.type = com.zj.flume.interceptor.TimeStampInterceptor$Builder
#flume拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zj.flume.interceptor.TimeStampInterceptor$Builder
本文参考多篇博客和网络资料写成,已找不到来源! 如有错误,恳请指正!!!