griffin与Livy、hdfs、ES7 、kerberos的集成

  • Post author:
  • Post category:其他




操作系统配置:



1、为每台机器添加Livy用户,也可通过LDAP添加



useradd livy -g hadoop



2、将livy文件夹所属用户改为livy



chown -R livy:hadoop livy



3、创建livy的log目录及run目录



mkdir /var/log/livy



mkdir /var/run/livy



chown livy:hadoop /var/log/livy



chown livy:hadoop /var/run/livy



4、在KDC节点创建livy的Kerberos信息,并将生成好的keytab文件拷贝到livy的配置文件夹下



kadmin.local -q “addprinc -randkey livy/your_ip_host0@YOUR REALM.COM”



kadmin.local -q “addprinc -randkey HTTP/your_ip_host0@YOUR REALM.COM”



kadmin.local -q “xst -k /root/livy.keytab livy/your_ip_host0@YOUR REALM.COM ”



kadmin.local -q “xst -k /root/spnego.keytab HTTP/your_ip_host0@YOUR REALM.COM



Livy配置:



修改livy.conf,配置如下属性:



livy.server.port = 8998



livy.spark.master = yarn



livy.spark.deploy-mode = cluster



livy.server.session.timeout-check = true



livy.server.session.timeout-check.skip-busy = false



livy.server.session.timeout = 1h



livy.server.session.state-retain.sec = 600s



livy.impersonation.enabled = true



livy.server.recovery.mode = recovery



livy.server.recovery.state-store=filesystem



livy.server.recovery.state-store.url=/tmp/livy



livy.server.yarn.poll-interval = 20s



livy.ui.enabled = true



#配置livy服务通过kerberos与Yarn交互



livy.server.launch.kerberos.keytab = /bigdata/livy-0.7.1/conf/livy.keytab



livy.server.launch.kerberos.principal =





livy/your_ip_host0@YOUR REALM.COM




#配置客户端访问Livy需过kerberos,并且会以客户端认证的keytab用户去yarn上提交程序;该配置同样限制了访问livy的webui需要kerberos



livy.server.auth.type = kerberos



livy.server.auth.kerberos.keytab = /bigdata/livy-0.7.1/conf/livy_http.keytab



livy.server.auth.kerberos.principal = HTTP/your_ip_host0@YOUR REALM.COM



修改HDFS配置:



CM -> HDFS -> Configuration -> Cluster-wide Advanced Configuration Snippet (Safety Valve) for core-site.xml:



hadoop.proxyuser.livy.groups = *



hadoop.proxyuser.livy.hosts = *



启停livy:



cd /bigdata/livy-0.7.1/bin



sudo -u livy ./livy-server start



sudo -u livy ./livy-server stop



Griffin 配置:



1、修改application.properties如下:



#



# Licensed to the Apache Software Foundation (ASF) under one



# or more contributor license agreements.  See the NOTICE file



# distributed with this work for additional information



# regarding copyright ownership.  The ASF licenses this file



# to you under the Apache License, Version 2.0 (the



# “License”); you may not use this file except in compliance



# with the License.  You may obtain a copy of the License at



#



#   http://www.apache.org/licenses/LICENSE-2.0



#



# Unless required by applicable law or agreed to in writing,



# software distributed under the License is distributed on an



# “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY



# KIND, either express or implied.  See the License for the



# specific language governing permissions and limitations



# under the License.



#



spring.datasource.url=jdbc:mysql://your_ip_host1:3306/quartz?autoReconnect=true&useSSL=false



spring.datasource.username=quartz



spring.datasource.password=Pass_quartz



spring.jpa.generate-ddl=true



spring.datasource.driver-class-name=com.mysql.jdbc.Driver



spring.jpa.show-sql=true



# Hive metastore



hive.metastore.uris=thrift://your_ip_host1:9083



hive.metastore.dbname=default



hive.hmshandler.retry.attempts=15



hive.hmshandler.retry.interval=2000ms



#Hive jdbc



hive.jdbc.className=org.apache.hive.jdbc.HiveDriver



hive.jdbc.url=jdbc:hive2://your_ip_host0:30010/



hive.jdbc.username=admin



hive.jdbc.passwd=YOUR REALM


hive.need.kerberos=false



hive.keytab.user=hive/your_ip_host1@YOUR REALM.COM



#hive.keytab.path=/root/keytabs/hive.keytab



hive.keytab.path=E:\\keytab\\ceshi\\hive.keytab


# Hive cache time



cache.evict.hive.fixedRate.in.milliseconds=900000



# Kafka schema registry



kafka.schema.registry.url=your_ip_host2:9092



# Update job instance state at regular intervals



jobInstance.fixedDelay.in.milliseconds=60000



# Expired time of job instance which is 7 days that is 604800000 milliseconds.Time unit only supports milliseconds



jobInstance.expired.milliseconds=604800000



# schedule predicate job every 5 minutes and repeat 12 times at most



#interval time unit s:second m:minute h:hour d:day,only support these four units



predicate.job.interval=5m



predicate.job.repeat.count=12



# external properties directory location



external.config.location=



# external BATCH or STREAMING env



external.env.location=



# login strategy (“default” or “ldap”)



login.strategy=default



# ldap



ldap.url=ldap://your_ip_host0:389



ldap.email=dongzhuang17@YOUR REALM.com



ldap.searchBase=ou=People,dc=YOUR REALM,dc=com



ldap.searchPattern=(sAMAccountName={0})



# hdfs default name



fs.defaultFS=hdfs://your_ip_host1:8020



# elasticsearch



elasticsearch.host=IP



elasticsearch.port=9200



elasticsearch.scheme=http



# elasticsearch.user = user



# elasticsearch.password = password



# livy



livy.uri=http://your_ip_host0:8998/batches



livy.need.queue=false



livy.task.max.concurrent.count=20



livy.task.submit.interval.second=3



livy.task.appId.retry.count=3



livy.need.kerberos=false



livy.server.auth.kerberos.principal=livy/your_ip_host0@YOUR REALM.COM



#livy.server.auth.kerberos.keytab=/bigdata/livy-0.7.1/conf/livy.keytab



livy.server.auth.kerberos.keytab=E:\\keytab\\ceshi\\livy.keytab



# yarn url



yarn.uri=http://your_ip_host1:8088



# griffin event listener



internal.event.listeners=GriffinJobEventHook


logging.file=logs/griffin-service.log


spring.cache.type=



simple




spring.cache.cache-names=hive,jdbcHive



2、修改sparkProperties.json

{

  "file": "hdfs://your_ip_host1:8020/griffin/griffin-measure.jar",

  "className": "org.apache.griffin.measure.Application",

  "queue": "default",

  "numExecutors": 2,

  "executorCores": 1,

  "driverMemory": "1g",

  "executorMemory": "1g",

  "conf": {

    "spark.yarn.dist.files": "hdfs://your_ip_host1:8020/griffin/hive-site.xml"

  },

  "files": [

  ]

}



3、修改env\env_batch.json



{




“spark”: {




“log.level”: “INFO”



},



“sinks”: [



{




“name”: “Console”,



“type”: “Console”,



“config”: {




“max.log.lines”: 10



}



},



{




“name”: “Hdfs”,



“type”: “Hdfs”,



“config”: {




“path”: “hdfs://your_ip_host1:8020/griffin/persist/”,



“max.persist.lines”: 10000,



“max.lines.per.file”: 10000



}



},



{




“name”: “ElasticSearch”,



“type”: “ElasticSearch”,



“config”: {




“method”: “post”,



“api”: “http://10.**.**.**:9200/griffin/”,



“connection.timeout”: “1m”,



“retry”: 10



}



}



],



“griffin.checkpoint”: []



}




4、修改env_streaming.json



{




“spark”: {




“log.level”: “WARN”,



“checkpoint.dir”: “hdfs://your_ip_host1:8020/griffin/checkpoint/${JOB_NAME}”,



“init.clear”: true,



“batch.interval”: “1m”,



“process.interval”: “5m”,



“config”: {




“spark.default.parallelism”: 4,



“spark.task.maxFailures”: 5,



“spark.streaming.kafkaMaxRatePerPartition”: 1000,



“spark.streaming.concurrentJobs”: 4,



“spark.yarn.maxAppAttempts”: 5,



“spark.yarn.am.attemptFailuresValidityInterval”: “1h”,



“spark.yarn.max.executor.failures”: 120,



“spark.yarn.executor.failuresValidityInterval”: “1h”,



“spark.hadoop.fs.hdfs.impl.disable.cache”: true



}



},



“sinks”: [



{




“name”: “CONSOLE”,



“type”: “CONSOLE”,



“config”: {




“max.log.lines”: 100



}



},



{




“name”: “HDFS”,



“type”: “HDFS”,



“config”: {




“path”: “hdfs://your_ip_host1:8020/griffin/persist”,



“max.persist.lines”: 10000,



“max.lines.per.file”: 10000



}



},



{




“name”: “ELASTICSEARCH”,



“type”: “ELASTICSEARCH”,



“config”: {




“method”: “post”,



“api”: “http://10.**.*.**:9200/griffin”



}



}



],



“griffin.checkpoint”: [



{




“name”: “zk_name”,



“type”: “zk”,



“config”: {




“hosts”: “your_ip_host0:2181,your_ip_host1:2181,your_ip_host2:2181”,



“namespace”: “griffin/infocache”,



“lock.path”: “lock”,



“mode”: “persist”,



“init.clear”: true,



“close.clear”: false



}



}



]



}



5、修改pom.xml




<?xml version=”1.0″ encoding=”UTF-8″?>



<!–



Licensed to the Apache Software Foundation (ASF) under one



or more contributor license agreements.  See the NOTICE file



distributed with this work for additional information



regarding copyright ownership.  The ASF licenses this file



to you under the Apache License, Version 2.0 (the



“License”); you may not use this file except in compliance



with the License.  You may obtain a copy of the License at


http://www.apache.org/licenses/LICENSE-2.0


Unless required by applicable law or agreed to in writing,



software distributed under the License is distributed on an



“AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY



KIND, either express or implied.  See the License for the



specific language governing permissions and limitations



under the License.



–>



<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd”>



<modelVersion>4.0.0</modelVersion>



<!–



<parent>



<groupId>org.apache.griffin</groupId>



<artifactId>griffin</artifactId>



<version>0.7.0-SNAPSHOT</version>



</parent>–>


<parent>



<groupId>org.apache.griffin</groupId>



<artifactId>griffin</artifactId>



<version>0.6.0</version>



</parent>


<artifactId>service</artifactId>



<packaging>jar</packaging>


<name>Apache Griffin :: Web Service</name>


<properties>



<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>



<hive.version>2.1.1-cdh6.3.2</hive.version>



<spring.boot.version>2.1.7.RELEASE</spring.boot.version>



<spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>



<confluent.version>3.2.0</confluent.version>



<quartz.version>2.2.2</quartz.version>



<start-class>org.apache.griffin.core.GriffinWebApplication</start-class>



<powermock.version>2.0.2</powermock.version>



<spring-boot-maven-plugin.version>2.1.7.RELEASE</spring-boot-maven-plugin.version>



<derby.version>10.14.1.0</derby.version>



<eclipselink.version>2.6.0</eclipselink.version>



<mysql.java.version>5.1.47</mysql.java.version>



<postgresql.version>9.4.1212.jre7</postgresql.version>



<livy.core.version>0.7.1-incubating</livy.core.version>



<elasticsearch-rest-client.version>6.2.4</elasticsearch-rest-client.version>



<jackson-databind.version>2.9.9.3</jackson-databind.version>



</properties>


<repositories>



<repository>



<id>confluent</id>



<url>http://packages.confluent.io/maven/</url>



</repository>



<repository>



<id>cloudera</id>



<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>



</repository>



<repository>



<id>Hortonworks</id>



<url>https://repo.hortonworks.com/content/repositories/releases/</url>



</repository>



<repository>



<id>mvn</id>



<url>https://repo1.maven.org/maven2/</url>



</repository>



</repositories>


<dependencyManagement>



<dependencies>



<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-dependencies</artifactId>



<version>${spring.boot.version}</version>



<type>pom</type>



<scope>import</scope>



</dependency>



</dependencies>



</dependencyManagement>


<dependencies>



<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-web</artifactId>



<exclusions>



<exclusion>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-logging</artifactId>



</exclusion>



</exclusions>



</dependency>



<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-log4j2</artifactId>



</dependency>



<!–        <dependency>



&lt;!&ndash; cache缓存 &ndash;&gt;



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-cache</artifactId>



</dependency>–>



<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-properties-migrator</artifactId>



<exclusions>



<exclusion>



<groupId>com.vaadin.external.google</groupId>



<artifactId>android-json</artifactId>



</exclusion>



</exclusions>



</dependency>


<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-json</artifactId>



</dependency>


<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-data-jpa</artifactId>



<exclusions>



<exclusion>



<groupId>org.hibernate</groupId>



<artifactId>*</artifactId>



</exclusion>



<exclusion>



<groupId>org.aspectj</groupId>



<artifactId>aspectjrt</artifactId>



</exclusion>



</exclusions>



</dependency>



<dependency>



<groupId>org.springframework</groupId>



<artifactId>spring-aspects</artifactId>



</dependency>



<dependency>



<groupId>org.springframework.security.kerberos</groupId>



<artifactId>spring-security-kerberos-client</artifactId>



<version>${spring.security.kerberos.version}</version>



</dependency>



<!–eclipse link–>



<dependency>



<groupId>org.eclipse.persistence</groupId>



<artifactId>org.eclipse.persistence.jpa</artifactId>



<version>${eclipselink.version}</version>



</dependency>



<dependency>



<groupId>org.postgresql</groupId>



<artifactId>postgresql</artifactId>



<version>${postgresql.version}</version>



</dependency>



<dependency>



<groupId>mysql</groupId>



<artifactId>mysql-connector-java</artifactId>



<version>${mysql.java.version}</version>



</dependency>



<dependency>



<groupId>com.h2database</groupId>



<artifactId>h2</artifactId>



</dependency>



<dependency>



<groupId>org.springframework.retry</groupId>



<artifactId>spring-retry</artifactId>



</dependency>


<dependency>



<groupId>com.fasterxml.jackson.core</groupId>



<artifactId>jackson-databind</artifactId>



<version>${jackson-databind.version}</version>



</dependency>


<!– to access metastore from hive–>



<dependency>



<groupId>org.apache.hadoop</groupId>



<artifactId>hadoop-client</artifactId>



<version>${hadoop.version}</version>



<!–<scope>provided</scope>–>



<exclusions>



<exclusion>



<groupId>javax.servlet</groupId>



<artifactId>servlet-api</artifactId>



</exclusion>



<exclusion>



<groupId>org.slf4j</groupId>



<artifactId>slf4j-log4j12</artifactId>



</exclusion>



</exclusions>



</dependency>



<dependency>



<groupId>org.apache.hive</groupId>



<artifactId>hive-metastore</artifactId>



<version>${hive.version}</version>



<exclusions>



<exclusion>



<groupId>org.eclipse.jetty.aggregate</groupId>



<artifactId>jetty-all</artifactId>



</exclusion>



<exclusion>



<groupId>org.eclipse.jetty.orbit</groupId>



<artifactId>javax.servlet</artifactId>



</exclusion>



<exclusion>



<groupId>javax.servlet</groupId>



<artifactId>servlet-api</artifactId>



</exclusion>



<exclusion>



<groupId>de.ruedigermoeller</groupId>



<artifactId>fst</artifactId>



</exclusion>



</exclusions>



</dependency>


<!– to access Hive using JDBC –>



<dependency>



<groupId>org.apache.hive</groupId>



<artifactId>hive-jdbc</artifactId>



<version>${hive.version}</version>



<exclusions>



<exclusion>



<groupId>org.eclipse.jetty.aggregate</groupId>



<artifactId>*</artifactId>



</exclusion>



<exclusion>



<groupId>org.eclipse.jetty.orbit</groupId>



<artifactId>javax.servlet</artifactId>



</exclusion>



<exclusion>



<groupId>javax.servlet</groupId>



<artifactId>servlet-api</artifactId>



</exclusion>



<exclusion>



<groupId>org.mortbay.jetty</groupId>



<artifactId>servlet-api-2.5</artifactId>



</exclusion>



<exclusion>



<groupId>org.slf4j</groupId>



<artifactId>slf4j-log4j12</artifactId>



</exclusion>



<exclusion>



<groupId>org.eclipse.jetty</groupId>



<artifactId>jetty-runner</artifactId>



</exclusion>



</exclusions>



</dependency>


<!– to access confluent schema registry –>



<dependency>



<groupId>io.confluent</groupId>



<artifactId>kafka-schema-registry-client</artifactId>



<version>${confluent.version}</version>



<exclusions>



<exclusion>



<groupId>org.slf4j</groupId>



<artifactId>slf4j-log4j12</artifactId>



</exclusion>



</exclusions>



</dependency>


<!–schedule–>



<dependency>



<groupId>org.springframework</groupId>



<artifactId>spring-context-support</artifactId>



</dependency>



<dependency>



<groupId>org.quartz-scheduler</groupId>



<artifactId>quartz</artifactId>



<version>${quartz.version}</version>



</dependency>



<dependency>



<groupId>org.quartz-scheduler</groupId>



<artifactId>quartz-jobs</artifactId>



<version>${quartz.version}</version>



</dependency>


<!– https://mvnrepository.com/artifact/org.apache.livy/livy-core –>



<dependency>



<groupId>org.apache.livy</groupId>



<artifactId>livy-core_2.11</artifactId>



<version>0.7.1-incubating</version>



</dependency>


<!– test –>



<dependency>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-starter-test</artifactId>



<scope>test</scope>



<exclusions>



<exclusion>



<groupId>com.vaadin.external.google</groupId>



<artifactId>android-json</artifactId>



</exclusion>



</exclusions>



</dependency>


<dependency>



<groupId>junit</groupId>



<artifactId>junit</artifactId>



</dependency>


<dependency>



<groupId>org.powermock</groupId>



<artifactId>powermock-api-mockito2</artifactId>



<version>${powermock.version}</version>



<scope>test</scope>



</dependency>



<dependency>



<groupId>org.powermock</groupId>



<artifactId>powermock-module-junit4</artifactId>



<version>${powermock.version}</version>



<scope>test</scope>



</dependency>


<dependency>



<groupId>org.elasticsearch.client</groupId>



<artifactId>elasticsearch-rest-client</artifactId>



<version>${elasticsearch-rest-client.version}</version>



</dependency>



</dependencies>



<profiles>



<!–if you need mysql, please uncomment mysql-connector-java –>



<!–<profile>–>



<!–<id>mysql</id>–>



<!–<activation>–>



<!–<property>–>



<!–<name>mysql</name>–>



<!–</property>–>



<!–</activation>–>



<!–</profile>–>



<profile>



<id>dev</id>



<activation>



<property>



<name>dev</name>



</property>



</activation>



</profile>



<profile>



<id>postgresql</id>



<activation>



<activeByDefault>true</activeByDefault>



<property>



<name>prod</name>



</property>



</activation>



</profile>



</profiles>



<build>



<plugins>



<plugin>



<groupId>com.ethlo.persistence.tools</groupId>



<artifactId>eclipselink-maven-plugin</artifactId>



<version>2.7.0</version>



<executions>



<execution>



<phase>process-classes</phase>



<goals>



<goal>weave</goal>



</goals>



</execution>



</executions>



<dependencies>



<dependency>



<groupId>org.eclipse.persistence</groupId>



<artifactId>org.eclipse.persistence.jpa</artifactId>



<version>${eclipselink.version}</version>



</dependency>



</dependencies>



</plugin>



<plugin>



<groupId>org.apache.maven.plugins</groupId>



<artifactId>maven-jar-plugin</artifactId>



<version>3.1.1</version>



<executions>



<execution>



<phase>package</phase>



<goals>



<goal>jar</goal>



</goals>



<configuration>



<classifier>lib</classifier>



</configuration>



</execution>



</executions>



</plugin>



<plugin>



<groupId>org.springframework.boot</groupId>



<artifactId>spring-boot-maven-plugin</artifactId>



<version>${spring-boot-maven-plugin.version}</version>



<executions>



<execution>



<goals>



<goal>repackage</goal>



</goals>



</execution>



</executions>



<configuration>



<mainClass>org.apache.griffin.core.GriffinWebApplication</mainClass>



<executable>true</executable>



</configuration>



</plugin>



<plugin>



<groupId>org.apache.maven.plugins</groupId>



<artifactId>maven-assembly-plugin</artifactId>



<configuration>



<appendAssemblyId>false</appendAssemblyId>



<skipAssembly>false</skipAssembly>



<outputDirectory>../target</outputDirectory>



<descriptors>



<descriptor>src/main/resources/assembly/assembly.xml</descriptor>



</descriptors>



</configuration>



<executions>



<execution>



<id>assembly</id>



<phase>package</phase>



<goals>



<goal>single</goal>



</goals>



</execution>



</executions>



</plugin>



<plugin>



<groupId>org.apache.maven.plugins</groupId>



<artifactId>maven-compiler-plugin</artifactId>



</plugin>


</plugins>



</build>


</project>



6、修改HiveMetaStoreProxy.java类的initHiveMetastoreClient()方法

    public IMetaStoreClient initHiveMetastoreClient() throws MalformedURLException {

        //System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");

        System.setProperty("java.security.krb5.conf", "E:\\keytab\\ceshi\\krb5.conf");

        HiveConf hiveConf = new HiveConf();

        //修改为kerberos的

  

        //hiveConf.addResource(new File("/etc/hive/conf/core-site.xml").toURI().toURL());

        //hiveConf.addResource(new File("/etc/hive/conf/hive-site.xml").toURI().toURL());

        hiveConf.addResource(new File("E:\\keytab\\ceshi\\core-site.xml").toURI().toURL());

        hiveConf.addResource(new File("E:\\keytab\\ceshi\\hive-site.xml").toURI().toURL());

        UserGroupInformation.setConfiguration(hiveConf);

        try {

  

            ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(keytabUser,

                    keytabPath);

            UserGroupInformation.setLoginUser(ugi);

        } catch (Exception e) {

            e.printStackTrace();

        }

  

        //源码

  /*        hiveConf.set("hive.metastore.local", "false");

        hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES,

            3);

        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uris);

        hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERATTEMPTS, attempts);

        hiveConf.setVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, interval);*/

  

        try {

            client = HiveMetaStoreClient.newSynchronizedClient(new HiveMetaStoreClient(hiveConf));

            /*List<String> databases = client.getAllDatabases();

            databases.forEach(x->{

                System.out.println("############"+x);

            });*/

        } catch (Exception e) {

            LOGGER.error("Failed to connect hive metastore. {}", e);

        }

        return client;

    }



7、修改HiveMetaStoreServiceJdbcImpl.java中的init()方法

@Value("${hive.jdbc.username}")

  private String username;

  

  @Value("${hive.jdbc.passwd}")

  private String passwd;

public void init() {

    if (needKerberos != null && needKerberos.equalsIgnoreCase("true")) {

        LOGGER.info("Hive need Kerberos Auth.");

  

        Configuration conf = new Configuration();

        conf.set("hadoop.security.authentication", "Kerberos");

        UserGroupInformation.setConfiguration(conf);

        try {

            UserGroupInformation.loginUserFromKeytab(keytabUser, keytabPath);

        } catch (IOException e) {

            LOGGER.error("Register Kerberos has error. {}", e.getMessage());

        }

    }else{

        props = new Properties();

        props.setProperty("user", username);

        props.setProperty("password", passwd);

        try {

            Class.forName(hiveClassName);

            if (conn == null) {

                System.out.println("####################init准备连接hive");

                conn = DriverManager.getConnection(hiveUrl,props);

                System.out.println("####################init连接hive完毕");

            }

        } catch (ClassNotFoundException | SQLException e) {

            e.printStackTrace();

        }

  

    }

}



8、我们的ES是7.*,因此需要修改:



MetricStoreImpl.java

// accuracy 改为 _doc

  private static final String TYPE = "_doc";



ElasticSearchSink.scala

//api后面加上_doc
def func(): (Long, Future[Boolean]) = {

  import scala.concurrent.ExecutionContext.Implicits.global

  (timeStamp, Future(HttpUtil.doHttpRequest(api + "_doc", method, params, header, data)))

  

}



9、关于正则的校验,前后端不匹配,前端把正则表达式写死了,不知道这怎么开源出来的,这么多问题。



修改\griffin\ui\angular\src\app\measure\create-measure\pr\pr.component.spec.ts

这个函数transferRule(rule, col)
倒数第二个case改为:



case “Regular Expression Detection Count”:



return (



`count(source.${col.name}) AS \`${col.name}_regexcount\` WHERE source.${col.name} RLIKE ${col.regex}`



);

之前是写死的RLIKE '^[0-9]{4}$'



10、修改\griffin\ui\angular\dist\main.bundle.js

这个函数PrComponent.prototype.transferRule = function (rule, col)
倒数第二个case改为:



case “Regular Expression Detection Count”:



return (“count(source.” + col.name + “) AS `” + col.name + “_regexcount` WHERE source.” + col.name + ” RLIKE ‘${col.regex}'”);



//return (“count(source.” + col.name + “) AS `” + col.name + “_regexcount` WHERE source.” + col.name + ” RLIKE ‘^[0-9]{4}$'”);



11、在mysql建表:(官方给的sql里少建了一张表DATACONNECTOR,气人不?)



DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;



DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;



DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;



DROP TABLE IF EXISTS QRTZ_LOCKS;



DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;



DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;



DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;



DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;



DROP TABLE IF EXISTS QRTZ_TRIGGERS;



DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;



DROP TABLE IF EXISTS QRTZ_CALENDARS;



DROP TABLE IF EXISTS DATACONNECTOR;


CREATE TABLE QRTZ_JOB_DETAILS(



SCHED_NAME VARCHAR(120) NOT NULL,



JOB_NAME VARCHAR(200) NOT NULL,



JOB_GROUP VARCHAR(200) NOT NULL,



DESCRIPTION VARCHAR(250) NULL,



JOB_CLASS_NAME VARCHAR(250) NOT NULL,



IS_DURABLE VARCHAR(1) NOT NULL,



IS_NONCONCURRENT VARCHAR(1) NOT NULL,



IS_UPDATE_DATA VARCHAR(1) NOT NULL,



REQUESTS_RECOVERY VARCHAR(1) NOT NULL,



JOB_DATA BLOB NULL,



PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_TRIGGERS (



SCHED_NAME VARCHAR(120) NOT NULL,



TRIGGER_NAME VARCHAR(200) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



JOB_NAME VARCHAR(200) NOT NULL,



JOB_GROUP VARCHAR(200) NOT NULL,



DESCRIPTION VARCHAR(250) NULL,



NEXT_FIRE_TIME BIGINT(13) NULL,



PREV_FIRE_TIME BIGINT(13) NULL,



PRIORITY INTEGER NULL,



TRIGGER_STATE VARCHAR(16) NOT NULL,



TRIGGER_TYPE VARCHAR(8) NOT NULL,



START_TIME BIGINT(13) NOT NULL,



END_TIME BIGINT(13) NULL,



CALENDAR_NAME VARCHAR(200) NULL,



MISFIRE_INSTR SMALLINT(2) NULL,



JOB_DATA BLOB NULL,



PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),



FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)



REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_SIMPLE_TRIGGERS (



SCHED_NAME VARCHAR(120) NOT NULL,



TRIGGER_NAME VARCHAR(200) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



REPEAT_COUNT BIGINT(7) NOT NULL,



REPEAT_INTERVAL BIGINT(12) NOT NULL,



TIMES_TRIGGERED BIGINT(10) NOT NULL,



PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),



FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)



REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_CRON_TRIGGERS (



SCHED_NAME VARCHAR(120) NOT NULL,



TRIGGER_NAME VARCHAR(200) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



CRON_EXPRESSION VARCHAR(120) NOT NULL,



TIME_ZONE_ID VARCHAR(80),



PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),



FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)



REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_SIMPROP_TRIGGERS



(



SCHED_NAME VARCHAR(120) NOT NULL,



TRIGGER_NAME VARCHAR(200) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



STR_PROP_1 VARCHAR(512) NULL,



STR_PROP_2 VARCHAR(512) NULL,



STR_PROP_3 VARCHAR(512) NULL,



INT_PROP_1 INT NULL,



INT_PROP_2 INT NULL,



LONG_PROP_1 BIGINT NULL,



LONG_PROP_2 BIGINT NULL,



DEC_PROP_1 NUMERIC(13,4) NULL,



DEC_PROP_2 NUMERIC(13,4) NULL,



BOOL_PROP_1 VARCHAR(1) NULL,



BOOL_PROP_2 VARCHAR(1) NULL,



PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),



FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)



REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_BLOB_TRIGGERS (



SCHED_NAME VARCHAR(120) NOT NULL,



TRIGGER_NAME VARCHAR(200) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



BLOB_DATA BLOB NULL,



PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),



INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),



FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)



REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_CALENDARS (



SCHED_NAME VARCHAR(120) NOT NULL,



CALENDAR_NAME VARCHAR(200) NOT NULL,



CALENDAR BLOB NOT NULL,



PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (



SCHED_NAME VARCHAR(120) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_FIRED_TRIGGERS (



SCHED_NAME VARCHAR(120) NOT NULL,



ENTRY_ID VARCHAR(95) NOT NULL,



TRIGGER_NAME VARCHAR(200) NOT NULL,



TRIGGER_GROUP VARCHAR(200) NOT NULL,



INSTANCE_NAME VARCHAR(200) NOT NULL,



FIRED_TIME BIGINT(13) NOT NULL,



SCHED_TIME BIGINT(13) NOT NULL,



PRIORITY INTEGER NOT NULL,



STATE VARCHAR(16) NOT NULL,



JOB_NAME VARCHAR(200) NULL,



JOB_GROUP VARCHAR(200) NULL,



IS_NONCONCURRENT VARCHAR(1) NULL,



REQUESTS_RECOVERY VARCHAR(1) NULL,



PRIMARY KEY (SCHED_NAME,ENTRY_ID))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_SCHEDULER_STATE (



SCHED_NAME VARCHAR(120) NOT NULL,



INSTANCE_NAME VARCHAR(200) NOT NULL,



LAST_CHECKIN_TIME BIGINT(13) NOT NULL,



CHECKIN_INTERVAL BIGINT(13) NOT NULL,



PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE QRTZ_LOCKS (



SCHED_NAME VARCHAR(120) NOT NULL,



LOCK_NAME VARCHAR(40) NOT NULL,



PRIMARY KEY (SCHED_NAME,LOCK_NAME))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE TABLE DATACONNECTOR (



ID int,



CONFIG text,



CREATEDDATE VARCHAR(200),



DATAFRAMENAME VARCHAR(200),



DATATIMEZONE VARCHAR(200),



DATAUNIT VARCHAR(200),



MODIFIEDDATE VARCHAR(200),



NAME VARCHAR(200),



TYPE VARCHAR(200),



VERSION VARCHAR(200),



PRIMARY KEY (id))



ENGINE=InnoDB CHARACTER SET utf8;


CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);



CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);


CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);



CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);



CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);



CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);



CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);



CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);



CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);



CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);



CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);



CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);



CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);



CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);


CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);



CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);



CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);



CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);



CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);



CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);


commit;



12、在idea里的terminal里执行如下maven命令进行编译打包:

mvn -Dmaven.test.skip=true clean install



如果有包下载不下来,就去浏览器去maven下载手动install一下



13、打包完之后,将measure下面的target的measure-0.6.0.jar包上传到HDFS上,并重命名为griffin-measure.jar



我是传到了/griffin/griffin-measure.jar,注意要和前面的配置文件中写的路径一致。hive-site.xml也需要上传,还需要新建checkpoint和persist文件夹。Persist是用来存放metric数据的。



# hdfs dfs -ls /griffin



Found 4 items



drwxrwxrwx   – livy  supergroup          0 2021-06-22 16:26 /griffin/checkpoint



-rw-r–r–   3 admin supergroup   46538594 2021-06-24 15:32 /griffin/griffin-measure.jar



-rwxrwxrwx   3 admin supergroup       6982 2021-06-16 17:47 /griffin/hive-site.xml



drwxrwxrwx   – admin supergroup          0 2021-07-01 14:54 /griffin/persist



14、至此griffin安装完毕。



15、启动:



可以直接在IDEA中启动进行调试,在服务器的话则是去/bigdata/griffin-0.6.0/service/target



nohup java -jar service-0.6.0.jar > out.log 2>&1 &



16、新建measure和job:



Accuracy是指测试 源表和目标表的准确率;



Data Profiling是指按一定的规则进行统计的,比如Null的数量、平均值、最大值、去重后的数量、符合正则的数量等。



Publish没发现有啥用;最后一个是自定义,没测试。



17、







18、简单的正则验证一下手机号吧:







19、将measure做成job定时调度(我是每个measure都做了对应的job):







20、运行结果展示:



正则的:







准确率的:







数据统计的:










版权声明:本文为m0_37534613原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。