Atlas之HiveHook源码简介
介绍Hive中的Hook种类及各种Hook在HiveQL生命周期中的体现,最后介绍Atlas中的HiveHook流程
Hive中的Hook种类
Hook (钩子)是一种处理过程中拦截事件、消息或函数调用的机制。Hive hooks是绑定到了Hive内部的工作机制,无需重新编译Hive。从这个意义上讲,提供了使用Hive扩展和集成外部功能的能力。换句话说,Hive Hadoop可用于在查询处理的各个步骤中运行/注入一些代码。
-
Pre-execution / Post-execution hooks
:在执行引擎执行查询任务之前和之后分别调用 -
Failure-execution hooks
:在查询执行失败时调用 -
Pre-driver-run / Post-driver-run hooks
:在driver执行查询之前和之后分别调用 -
Pre-semantic-analyzer / Post-semantic-analyzer hooks
:在HiveQL查询字符串上运行语义分析器之前和之后分别调用。
HiveQL生命周期
在HiveQL的执行流程中的各个阶段提供了不同种类的Hook,实现这些Hook可以在HiveQL执行中注入代码,实现扩展功能。Altas就是实现了Pre-execution Hook来解析HookContext,获取元数据。
Atlas-HiveHook原理
Atlas的HiveHook实现了Hive提供的ExecuteWithHookContext接口的run()方法,通过hive的hookContext,根据oper将context转为event,并通过父类AtlasHook的notifyEntities方法将message发送给kafka。
org.apache.atlas.hive.hook.HiveHook extends AtlasHook implements ExecuteWithHookContext{
void run(HookContext hookContext){
(final Map<String, HiveOperation>) OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation); //enum HiveOperation 记录所有hive的操作
HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());//从hook上下文获取oper,所以hookContext中记录着本次事件的操作
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects(), isSkipTempTables());
BaseHiveEvent event = null;
switch (oper) { //根据oper创建event
case CREATEDATABASE:
BaseHiveEvent event = new CreateDatabase(context);
break;
...
}
if (event != null) {
final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
//将event通过NotifyNentities通知给Kafka
super.notifyEntities(ActiveEntityFilter.apply(event.getNotificationMessages()), ugi);
}
}
}
其中,HiveHook的父类AtlasHook中通过NotificationProvider.get()初始化了KafkaNotification。并在notifyEntities()方法中通过同步或异步的方式将任务委托给notifyEntitiesInternal()。
package org.apache.atlas.hook.AtlasHook;
//notificationInterface初始化 -- KafkaNotification
notificationInterface = NotificationProvider.get(){
Configuration conf = ApplicationProperties.get();
KafkaNotification kafka = new KafkaNotification(conf);
notificationProvider = kafka;
return notificationProvider;
};
public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
if (executor == null) { // send synchronously
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
//notificationInterface就是kafkaNotification,实现了将message转为json格式,sendToKafka等功能。
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
}
});
}
}
NotificationInterface notificationInterface = new kafkaNotification(),
1. AbstractNotification implements NotificationInterface
定义了将message转为json格式的createNotificationMessages()和抽象方法sendInternal().
实现了接口的send()方法,send()中调用了sendInternal()
2. KafkaNotification extends AbstractNotification,
重写了sendInternal(),用于将json格式的message发送给kafka
故,notificationInterface 实例中具备将message转为json并发送给kafka的功能。
在notifyEntitiesInternal()中调用notificationInterface .send()。
static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi,
NotificationInterface notificationInterface,
boolean shouldLogFailedMessages, FailedMessagesLogger logger, MessageSource source) {
if (ugi == null) {
notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages, source);
} else {
PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
notificationInterface.send(
//KafkaNotification重写了父类AbstractNotification的抽象方法sendInternal(),
//AbstractNotification
// 1. 定义了createNotificationMessages(),用于将message转为Json
// 2. 实现了NotificationInterface接口的send(),并在其中定义了抽象方法sendInternal(),由KafkaNotification重写,用于将Json-message发送给kafka。
NotificationInterface.NotificationType.HOOK,//来自Hook的通知,
messages,
source);
return messages;
}
};
ugi.doAs(privilegedNotify);
}
版权声明:本文为qq_41528616原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。