flink写clickhouse

  • Post author:
  • Post category:其他


最近团队考虑用flink + clickhouse做一些事情,然后由我负责做一些调研工作。

我们大数据平台提供的flink最新是1.10版本的,看了官网,还没有支持clickhouse的connector,但可以通过JDBC方法访问。看了一圈,flink写clickhouse可以通过JDBCAppendTableSink这个类,不能直接使用flink sql。而且这个类有一点缺陷,就是没有提供时间间隔配置flush数据。要是能像flink sql连接mysql那样就好了。基于这样的目的,就研究了flink-jdbc的源码。flink1.10并不提供其他JDBC的外部系统的扩展类,要想flink sql支持clickhouse,需要修改源码,添加clinkhouse的类型:

/**
 * Default Jdbc dialects.
 */
public final class JDBCDialects {

	private static final List<JDBCDialect> DIALECTS = Arrays.asList(
		new DerbyDialect(),
		new MySQLDialect(),
		new PostgresDialect(),
		new ClickHouseDialect()
	);

	/**
	 * Fetch the JDBCDialect class corresponding to a given database url.
	 */
	public static Optional<JDBCDialect> get(String url) {
		for (JDBCDialect dialect : DIALECTS) {
			if (dialect.canHandle(url)) {
				return Optional.of(dialect);
			}
		}
		return Optional.empty();
	}

	......
    //clickhouse的delete和update与标准的sql方式不一样,因为用不到,所以这里就不实现了
    //upsert目前不知道该如何处理,初步想法是upsert也是使用insert的方式处理,clickhouse表使用ReplacingMergeTree引擎
	private static class ClickHouseDialect implements JDBCDialect{

		@Override
		public boolean canHandle(String url) {
			return url.startsWith("jdbc:clickhouse:");
		}

		@Override
		public Optional<String> defaultDriverName() {
			return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
		}

		@Override
		public String quoteIdentifier(String identifier) {
			return "`" + identifier + "`";
		}

	}
}

打包替换掉原来的flink-jdbc包就可以了



版权声明:本文为zhangdongan1991原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。