最近团队考虑用flink + clickhouse做一些事情,然后由我负责做一些调研工作。
我们大数据平台提供的flink最新是1.10版本的,看了官网,还没有支持clickhouse的connector,但可以通过JDBC方法访问。看了一圈,flink写clickhouse可以通过JDBCAppendTableSink这个类,不能直接使用flink sql。而且这个类有一点缺陷,就是没有提供时间间隔配置flush数据。要是能像flink sql连接mysql那样就好了。基于这样的目的,就研究了flink-jdbc的源码。flink1.10并不提供其他JDBC的外部系统的扩展类,要想flink sql支持clickhouse,需要修改源码,添加clinkhouse的类型:
/**
* Default Jdbc dialects.
*/
public final class JDBCDialects {
private static final List<JDBCDialect> DIALECTS = Arrays.asList(
new DerbyDialect(),
new MySQLDialect(),
new PostgresDialect(),
new ClickHouseDialect()
);
/**
* Fetch the JDBCDialect class corresponding to a given database url.
*/
public static Optional<JDBCDialect> get(String url) {
for (JDBCDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
......
//clickhouse的delete和update与标准的sql方式不一样,因为用不到,所以这里就不实现了
//upsert目前不知道该如何处理,初步想法是upsert也是使用insert的方式处理,clickhouse表使用ReplacingMergeTree引擎
private static class ClickHouseDialect implements JDBCDialect{
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
}
}
打包替换掉原来的flink-jdbc包就可以了
版权声明:本文为zhangdongan1991原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。