Flume拦截器初始化异常造成数据丢失

  • Post author:
  • Post category:其他


最近公司对老服务器进行退役,需要将Flume服务迁移至新服,在迁移过程中有一条链路因为拦截器初始化失败造成数据丢失。按推理Flume具有事务性,应该不会造成数据丢失,对此问题进行了排查,记录一下。

查阅资料的过程中

flume【源码分析】分析Flume的启动过程

文章给我提供了莫大的帮助,感谢大佬。


Flume具有事务性为什么会造成数据丢失?

背景描述:Flume迁移至新服务后,由于自定义拦截器初始化失败(因为需要使用到其他环境,未配置白名单),于是我停止异常的拦截器,解决问题后对拦截器进行重启,第二天就报警数据出现了差异。

在将数据进行修复后,百思不得其解,Flume具有事务性,按理说应该不会导致数据丢失才对,可是为什么会造成数据丢失?难道其事务性不稳定?带着疑问,我进行了部分源码的阅读。在

ChannelProcessor

中有这么一个方法:

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
	// 拦截器链拦截
    events = interceptorChain.intercept(events);
	// ......省略
    
    // source开始put数据
    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          reqChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " + reqChannel, t);
          throw (Error) t;
        } else if (t instanceof ChannelException) {
          throw (ChannelException) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

上面方法可知,拦截器事务是在拦截器链之后,而我在Flume拦截器初始化失败后就关闭了Flume服务,因为Flume已经消费了数据,Flume拦截器异常,数据没有进入channel,导致已经消费了kafka的数据丢失。


如何解决数据丢失问题?


答案很简单:在拦截器初始化异常时,进行捕获,关闭Flume就好了。

是的,原理很简单,那么如何关闭Flume?我用的

Syste,.exit(1)

,在测试时,发送异常是执行了关闭,能够看见执行了关闭的钩子函数,当时使用

ps

指令查看flume进程时,Flume并没有停止,甚至kill不掉,必须使用

kill -9

才有效。于是我使用

jstack

查看jvm信息,我发现

agent-shutdown-hook

钩子函数处于

BLOCKED

状态(阻塞状态)。于是我对源码研究了一下。

原因如下:

Flume的启动类Application中:

	private final ReentrantLock lifecycleLock;
	// 省略。。。。
	
	// 启动方法
	public void start() {
	    this.lifecycleLock.lock();
	    try {
	        Iterator var1 = this.components.iterator();
	
	        while(var1.hasNext()) {
	            LifecycleAware component = (LifecycleAware)var1.next();
	            this.supervisor.supervise(component, new AlwaysRestartPolicy(), LifecycleState.START);
	        }
	    } finally {
	        this.lifecycleLock.unlock();
	    }
	
	}
 
	 // 停止方法
     public void stop() {
        this.lifecycleLock.lock();
        this.stopAllComponents();
        try {
            this.supervisor.stop();
            if (this.monitorServer != null) {
                this.monitorServer.stop();
            }
        } finally {
            this.lifecycleLock.unlock();
        }

    }   

可以看见start与stop都i是有锁的,在Flume拦截器还没有初始化结束时,因拦截器异常就调用stop,而此时start还没有结束导致stop方法也无法执行,形成死锁,造成阻塞。

  private void stopAllComponents() {
    if (this.materializedConfiguration != null) {
      logger.info("Shutting down configuration: {}", this.materializedConfiguration); // 这行日志也会被打印
      for (Entry<String, SourceRunner> entry :
           this.materializedConfiguration.getSourceRunners().entrySet()) {
        try {
          logger.info("Stopping Source " + entry.getKey()); // 此处日志会打印,造成假的关闭现场
          supervisor.unsupervise(entry.getValue()); // 这个方法里面会发送死锁
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);
        }
      }

//省略。。。。
  }

	// 看看unsupervise方法
  public synchronized void unsupervise(LifecycleAware lifecycleAware) {

    Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware),
        "Unaware of " + lifecycleAware + " - can not unsupervise");

    logger.debug("Unsupervising service:{}", lifecycleAware);

    synchronized (lifecycleAware) { // 需要锁,而此时锁还在start方法手上,造成死锁
      Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
      supervisoree.status.discard = true;
      this.setDesiredState(lifecycleAware, LifecycleState.STOP);
      logger.info("Stopping component: {}", lifecycleAware);
      lifecycleAware.stop();
    }
    // 省略。。。。。

而且锁是

private final

的,无法在外部进行改变。因此,如果初始化拦截器异常,可以使用

Syste.exit(1)

关闭,当时会造成死锁,可以阻止Flume启动消费数据。但问题是需要kill -9来杀死程序。

有大佬有更好的方法欢迎指正。



版权声明:本文为qq_40705355原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。