最近公司对老服务器进行退役,需要将Flume服务迁移至新服,在迁移过程中有一条链路因为拦截器初始化失败造成数据丢失。按推理Flume具有事务性,应该不会造成数据丢失,对此问题进行了排查,记录一下。
查阅资料的过程中
flume【源码分析】分析Flume的启动过程
文章给我提供了莫大的帮助,感谢大佬。
Flume具有事务性为什么会造成数据丢失?
背景描述:Flume迁移至新服务后,由于自定义拦截器初始化失败(因为需要使用到其他环境,未配置白名单),于是我停止异常的拦截器,解决问题后对拦截器进行重启,第二天就报警数据出现了差异。
在将数据进行修复后,百思不得其解,Flume具有事务性,按理说应该不会导致数据丢失才对,可是为什么会造成数据丢失?难道其事务性不稳定?带着疑问,我进行了部分源码的阅读。在
ChannelProcessor
中有这么一个方法:
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");
// 拦截器链拦截
events = interceptorChain.intercept(events);
// ......省略
// source开始put数据
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
reqChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = optChannelQueue.get(optChannel);
for (Event event : batch) {
optChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
上面方法可知,拦截器事务是在拦截器链之后,而我在Flume拦截器初始化失败后就关闭了Flume服务,因为Flume已经消费了数据,Flume拦截器异常,数据没有进入channel,导致已经消费了kafka的数据丢失。
如何解决数据丢失问题?
答案很简单:在拦截器初始化异常时,进行捕获,关闭Flume就好了。
是的,原理很简单,那么如何关闭Flume?我用的
Syste,.exit(1)
,在测试时,发送异常是执行了关闭,能够看见执行了关闭的钩子函数,当时使用
ps
指令查看flume进程时,Flume并没有停止,甚至kill不掉,必须使用
kill -9
才有效。于是我使用
jstack
查看jvm信息,我发现
agent-shutdown-hook
钩子函数处于
BLOCKED
状态(阻塞状态)。于是我对源码研究了一下。
原因如下:
Flume的启动类Application中:
private final ReentrantLock lifecycleLock;
// 省略。。。。
// 启动方法
public void start() {
this.lifecycleLock.lock();
try {
Iterator var1 = this.components.iterator();
while(var1.hasNext()) {
LifecycleAware component = (LifecycleAware)var1.next();
this.supervisor.supervise(component, new AlwaysRestartPolicy(), LifecycleState.START);
}
} finally {
this.lifecycleLock.unlock();
}
}
// 停止方法
public void stop() {
this.lifecycleLock.lock();
this.stopAllComponents();
try {
this.supervisor.stop();
if (this.monitorServer != null) {
this.monitorServer.stop();
}
} finally {
this.lifecycleLock.unlock();
}
}
可以看见start与stop都i是有锁的,在Flume拦截器还没有初始化结束时,因拦截器异常就调用stop,而此时start还没有结束导致stop方法也无法执行,形成死锁,造成阻塞。
private void stopAllComponents() {
if (this.materializedConfiguration != null) {
logger.info("Shutting down configuration: {}", this.materializedConfiguration); // 这行日志也会被打印
for (Entry<String, SourceRunner> entry :
this.materializedConfiguration.getSourceRunners().entrySet()) {
try {
logger.info("Stopping Source " + entry.getKey()); // 此处日志会打印,造成假的关闭现场
supervisor.unsupervise(entry.getValue()); // 这个方法里面会发送死锁
} catch (Exception e) {
logger.error("Error while stopping {}", entry.getValue(), e);
}
}
//省略。。。。
}
// 看看unsupervise方法
public synchronized void unsupervise(LifecycleAware lifecycleAware) {
Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware),
"Unaware of " + lifecycleAware + " - can not unsupervise");
logger.debug("Unsupervising service:{}", lifecycleAware);
synchronized (lifecycleAware) { // 需要锁,而此时锁还在start方法手上,造成死锁
Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
supervisoree.status.discard = true;
this.setDesiredState(lifecycleAware, LifecycleState.STOP);
logger.info("Stopping component: {}", lifecycleAware);
lifecycleAware.stop();
}
// 省略。。。。。
而且锁是
private final
的,无法在外部进行改变。因此,如果初始化拦截器异常,可以使用
Syste.exit(1)
关闭,当时会造成死锁,可以阻止Flume启动消费数据。但问题是需要kill -9来杀死程序。
有大佬有更好的方法欢迎指正。