flink写入mysql的两种方式

  • Post author:
  • Post category:mysql

方式一 通过JDBCOutputFormat

在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

JDBCOutputFormat用起来很简单,只需要一个prepared statement,driver和database connection,就可以开始使用了。

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .finish();

如下的sql语句可以作为prepared statement:

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";

对应的表的结构:

1 CREATE TABLE cases
2 (
3  caseid VARCHAR(255),
4  tracehash VARCHAR(255)
5 );

但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着我们需要将流中的case转换为row,通过map就能做的。

1 DataStream<Case> cases = ...
2 
3   DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
4    Row row = new Row(2); // our prepared statement has 2 parameters
5    row.setField(0, aCase.getId()); //first parameter is case ID
6    row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
7    return row;
8   });

这样,我们就能添加sink了:

1 rows.writeUsingOutputFormat(jdbcOutput);

这样,你就可以将数据写入mysql了。

但是在你在流上附加了窗口之后,可能会得到下面的报错:

1 "Unknown column type for column %s. Best effort approach to set its value: %s."

因为窗口处理的类型,没有明确的类型定义,如下修改之前的定义,显式的指定类型:

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
6  .finish();

JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

JDBCOutputFormat 还有一个很有用的参数,batchInterval,见名知意,就是多少数据提交一次,尽量高效率的向数据库提交数据。当然还有比如timeout等其他参数,可以探索。

方式二 通过自定义sink提交

我们通过继承RichSinkFunction<IN>来实现自定义sink:

 1 public class RichCaseSink extends RichSinkFunction<Case> {
 2 
 3   private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
 4       + "VALUES (?, ?) "
 5       + "ON CONFLICT (caseid) DO UPDATE SET "
 6       + "  tracehash=?";
 7 
 8   private PreparedStatement statement;
 9 
10 
11   @Override
12   public void invoke(Case aCase) throws Exception {
13 
14     statement.setString(1, aCase.getId());
15     statement.setString(2, aCase.getTraceHash());
16     statement.setString(3, aCase.getTraceHash());
17     statement.addBatch();
18     statement.executeBatch();
19   }
20 
21   @Override
22   public void open(Configuration parameters) throws Exception {
23     Class.forName("com.mysql.jdbc.Driver");
24     Connection connection =
25         DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");
26 
27     statement = connection.prepareStatement(UPSERT_CASE);
28   }
29 
30 }

这样,就可以在流上添加sink 了:

1 DataStream<Case> cases = ...
2 cases.addSink(new RichCaseSink());

当然,上面的实现很简略,没有给出批量提交或者超时提交,这个都可以很容易的添加,比如close()中关闭连接。

但是上面的实现中,最大的问题还是没有跟flink的状态管理相结合,这个才是重头戏。

方式二 加强版的自定义sink

在checkpoint的时候保存数据,继承接口CheckpointedFunction :

 1 @Override
 2 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 3   long checkpointId = context.getCheckpointId();
 4   List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
 5   if(cases == null){
 6     cases = new ArrayList<>();
 7     pendingCasesPerCheckpoint.put(checkpointId, cases);
 8   }
 9   cases.addAll(pendingCases);
10   pendingCases.clear();
11 }

在消息到达的时候不插入数据,只是留存数据:

1 @Override
2 public void invoke(Case aCase) throws Exception {
3   pendingCases.add(aCase);
4 }

这样,通过继承CheckpointListener,我们就能在某个checkpoint完成的时候插入数据:

 1 @Override
 2 public void notifyCheckpointComplete(long checkpointId) throws Exception {
 3 
 4  Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
 5    pendingCasesPerCheckpoint.entrySet().iterator();
 6 
 7  while (pendingCheckpointsIt.hasNext()) {
 8 
 9   Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
10   Long pastCheckpointId = entry.getKey();
11   List<Case> pendingCases = entry.getValue();
12 
13   if (pastCheckpointId <= checkpointId) {
14 
15    for (Case pendingCase : pendingCases) {
16     statement.setString(1, pendingCase.getId());
17     statement.setString(2, pendingCase.getTraceHash());
18     statement.setString(3, pendingCase.getTraceHash());
19     statement.addBatch();
20    }
21    pendingCheckpointsIt.remove();
22   }
23  }
24  statement.executeBatch();
25 
26 }

前提,是需要设置checkpoint,比如:

ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);

这样,每隔10s,当一个checkpoint做成功,就会插入一次数据。

当然,上面的代码验证可用,但不建议在生产环境使用,生产环境需要考虑更多的问题。

转载于:https://www.cnblogs.com/029zz010buct/p/10486104.html