使用flume对FTP服务器进行文件的获取解析入库mysql数据库

  • Post author:
  • Post category:mysql


使用flume对FTP服务器进行文件的获取解析入库mysql数据库

原创幼稚猿大叔 最后发布于2018-09-05 19:15:04 阅读数 2283  收藏

展开

使用flume对FTP服务器进行文件的获取解析入库mysql数据库

flume的基本介绍

flume的搭建和启动

FTP做source的插件使用

自定义sink将数据导入mysql中

flume的基本介绍

系统描述:

Flume是一个分布式的、可靠的、高可用的海量日志采集、聚合和传输的系统。

数据流模型:Source-Channel-Sink。

当 Flume 来源收到事件后,它会将这些事件存储在一个或多个通道 中。通道是一种被动存储,它将事件保留到被 Flume 接收器 使用为止。例如,一个文件通道使用了本地文件系统;接收器从通道提取事件,并将它放在一个外部存储库(比如 HDFS)中,或者将它转发到流中下一个 Flume 代理(下一个跃点)的 Flume 来源;给定代理中的来源和接收器与暂存在通道中的事件同步运行。

架构图:

网上很多关于flume的介绍我就不多说了。

Flume基本组件:

Event:消息的基本单位,有header和body组成

Agent:JVM进程,负责将一端外部来源产生的消息转 发到另一端外部的目的地

Source:从外部来源读入event,并写入channel

Channel:event暂存组件,source写入后,event将会一直保存,

Sink:从channel读入event,并写入目的地

1.3.1Source意为来源、源头。

主要作用:从外界采集各种类型的数据,将数据传递给Channel。

比如:监控某个文件只要增加数据就立即采集新增的数据、监控某个目录一旦有新文件产生就采集新文件的内容、监控某个端口等等。

常见采集的数据类型:

Exec Source、Avro Source、NetCat Source、Spooling Directory Source等

Source具体作用:

– **AvroSource:监听一个avro服务端口,采集Avro数据序列化后的数据;

– **Thrift Source:监听一个Thrift 服务端口,采集Thrift数据序列化后的数据;

– **Exec Source:基于Unix的command在标准输出上采集数据;tail -F 和tail -f 区别。基于log4j切割文件时的能否读取问题。

– **JMS Source:Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;

– **Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;

– **Kafka Source:从kafka服务中采集数据。

– **NetCat Source: 绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入

– **HTTP Source:监听HTTP POST和 GET产生的数据的采集

Channel:

一个数据的存储池,中间通道。

主要作用:接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重写,不会造成数据丢失,因此很可靠。

channel的类型很多比如:内存中、jdbc数据源中、文件形式存储等。

常见采集的数据类型:

– **Memory Channel

– **File Channel

– **Spillable Memory Channel等

Channel具体作用:

– **Memory Channel:使用内存作为数据的存储。速度快

– **File Channel:使用文件来作为数据的存储。安全可靠

– **Spillable Memory Channel:使用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中。

– **JDBC Channel:使用jdbc数据源来作为数据的存储。

– **Kafka Channel:使用kafka服务来作为数据的存储。

Sink:

主要作用:接受channel写入的数据以指定的形式表现出来(或存储或展示)。

sink的表现形式很多比如:打印到控制台、hdfs上、avro服务中、文件中等。

常见采集的数据类型:

 HDFS Sink

 Hive Sink

 Logger Sink

 Avro Sink

 Thrift Sink

 File Roll Sink

 HBaseSink

 Kafka Sink等

详细查看:

HDFSSink需要有hdfs的配置文件和类库。一般采取多个sink汇聚到一台采集机器负责推送到hdfs。

Sink具体作用:

Logger Sink:将数据作为日志处理(根据flume中的设置的日志的级别显示)。

HDFS Sink:将数据传输到hdfs集群中。

Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。

Thrift Sink:数据被转换成Thrift Event,然后发送到指定的的服务端口上。

File Roll Sink:数据传输到本地文件中。

Hive Sink:将数据传输到hive的表中。

IRC Sink:数据向指定的IRC服务和端口中发送。

Null Sink:取消数据的传输,即不发送到任何目的地。

HBaseSink:将数据发往hbase数据库中。

MorphlineSolrSink:数据发送到Solr搜索服务器(集群)。

ElasticSearchSink:数据发送到Elastic Search搜索服务器(集群)。

Kafka Sink:将数据发送到kafka服务中。(注意依赖类库)

Flume搭建和启动

首先官网下载二进制的安装包

http://flume.apache.org/download.html

直接解压 运行需要有jdk的环境,根据flume的版本不同所需要的jdk版本也不一样 1.7的flume至少需要的是1.7以上的jdk

然后到flume的bin目录下面就可以启动flume了

如:./flume-ng agent -c ../conf -f ../conf/g01-taildir-avro-sink.conf -n agent1 -Dflume.root.logger=INFO,console

-c/–conf 后跟配置目录,-f/–conf-file 后跟具体的配置文件,-n/–name 指定agent的名称

使用FTP做source

因为我的需求是从客户的ftp服务器中获取需要的文件数据所以需要有一个source从ftp获取,但是由于flime内置的souce中没有ftp,所以我们从github中down一个frp做source的插件使用

https://github.com/keedio/flume-ftp-source

!!!!感谢大神提供的插件啊!

下载之后解压然后打包生成jar包并且把另外两个依赖包

commons-net-3.3.jar和jsch-0.1.54.jar 加入之前解压的flume中的lib 中

然后编写配置文件 按照 插件里面的配置文件模板编写自己的soure的配置

如:

agent.sources.ftp2.type = org.keedio.flume.source.ftp.source.Source

agent.sources.ftp2.client.source = ftp

agent.sources.ftp2.name.server = 172.168.27.98 //ftp服务地址

agent.sources.ftp2.port = 21 //端口

agent.sources.ftp2.user = ftp_huge //ftp用户名

agent.sources.ftp2.password = 2970667 //密码

agent.sources.ftp2.working.directory = /OCS/CRM/bill/in //文件所在目录 (相对于根目录)

agent.sources.ftp2.filter.pattern = .+\.unl //文件的名称匹配 (java正则表达式)

agent.sources.ftp2.folder = /home/ftp_test //ftp的根目录

agent.sources.ftp2.file.name = ftp2-status-file.ser

agent.sources.ftp2.run.discover.delay=5000

agent.sources.ftp2.flushlines = true

agent.sources.ftp2.search.recursive = true

agent.sources.ftp2.processInUse = false

agent.sources.ftp2.processInUseTimeout = 30

agent.sources.ftp2.channels = ch2

自定义sink将数据导入mysql中

有了之前ftp插件引用的经验 现在我们需要自己定义一个sink将数据存入MySQL中

首先新建maven 项目,pom.xml 文件如下:

<?xml version=”1.0″ encoding=”UTF-8″?>

<project xmlns=”http://maven.apache.org/POM/4.0.0″

xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”

xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>

<modelVersion>4.0.0</modelVersion>

<groupId>com.us</groupId>

<artifactId>flumeDemo</artifactId>

<version>1.0-SNAPSHOT</version>

<properties>        <maven.compiler.target>1.8</maven.compiler.target>

<maven.compiler.source>1.8</maven.compiler.source>

<version.flume>1.7.0</version.flume>

</properties>

<dependencies>

<dependency>

<groupId>org.apache.flume</groupId>

<artifactId>flume-ng-core</artifactId>

<version>${version.flume}</version>

</dependency>

<dependency>

<groupId>org.apache.flume</groupId>

<artifactId>flume-ng-configuration</artifactId>

<version>${version.flume}</version>

</dependency>

<!– https://mvnrepository.com/artifact/mysql/mysql-connector-java –>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>6.0.5</version>

</dependency>

</dependencies>

</project>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

废话不多说直接上码

入口:MySqlSink

package com.tesnik.flume.sink;

import com.google.common.base.Preconditions;

import java.util.HashMap;

import java.util.Map;

import org.apache.flume.Channel;

import org.apache.flume.ChannelException;

import org.apache.flume.Context;

import org.apache.flume.CounterGroup;

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.Transaction;

import org.apache.flume.Sink.Status;

import org.apache.flume.conf.Configurable;

import org.apache.flume.sink.AbstractSink;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MySqlSink extends AbstractSink implements Configurable {


private static final Logger logger = LoggerFactory.getLogger(MySqlSink.class);

private String databaseName;

private String tableName;

private String partition;

private String iscustom;

private String password;

private String user;

private String driver;

private String db_url;

private CounterGroup counterGroup = new CounterGroup();

DAOClass daoClass = new DAOClass();

public MySqlSink() {


}

public void configure(Context context) {


this.db_url = context.getString(“url”);

this.password = context.getString(“password”);

this.user = context.getString(“user”);

this.driver = context.getString(“driver”);

this.tableName = context.getString(“tableName”);

this.partition = context.getString(“partition”);

this.iscustom = context.getString(“iscustom”);

this.databaseName = context.getString(“databaseName”);

Preconditions.checkState(this.password != null, “No password specified”);

Preconditions.checkState(this.user != null, “No user specified”);

}

public void start() {


logger.info(“Mysql sink starting”);

try {


this.daoClass.createConnection(this.driver, this.db_url, this.user, this.password);

} catch (Exception var2) {


logger.error(“Unable to create MySQL client using url:” + this.db_url + ” username:” + this.user + “. Exception follows.”, var2);

this.daoClass.destroyConnection(this.db_url, this.user);

return;

}

super.start();

logger.debug(“MySQL sink {} started”, this.getName());

}

public void stop() {


logger.info(“MySQL sink {} stopping”, this.getName());

this.daoClass.destroyConnection(this.db_url, this.user);

super.stop();

logger.debug(“MySQL sink {} stopped. Metrics:{}”, this.getName(), this.counterGroup);

}

public Status process() throws EventDeliveryException {


Status status = Status.READY;

Channel channel = this.getChannel();

Transaction transaction = channel.getTransaction();

try {


transaction.begin();

this.daoClass.createConnection(this.driver, this.db_url, this.user, this.password);

Event event = channel.take();

if (event == null) {


this.counterGroup.incrementAndGet(“event.empty”);

status = Status.BACKOFF;

} else {


Map<String, String> params = new HashMap();

params.put(“tableName”, this.tableName);

params.put(“partition”, this.partition);

params.put(“iscustom”, this.iscustom);

params.put(“databaseName”, this.databaseName);

this.daoClass.insertData(event, params);

this.counterGroup.incrementAndGet(“event.mysql”);

}

transaction.commit();

} catch (ChannelException var10) {


transaction.rollback();

logger.error(“Unable to get event from channel. Exception follows.”, var10);

status = Status.BACKOFF;

} catch (Exception var11) {


transaction.rollback();

logger.error(“Unable to communicate with MySQL server. Exception follows.”, var11);

status = Status.BACKOFF;

this.daoClass.destroyConnection(this.db_url, this.user);

} finally {


transaction.close();

}

return status;

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

具体操作类 ClassDao

//

// Source code recreated from a .class file by IntelliJ IDEA

// (powered by Fernflower decompiler)

//

package com.tesnik.flume.sink;

import com.tesnik.flume.sink.util.ImvnoUtil;

import java.io.IOException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.apache.flume.Event;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class DAOClass {


private static final Logger logger = LoggerFactory.getLogger(DAOClass.class);

private static Map<String, String> columnsMap = new HashMap();

private static String INSERT_QUERY;

private Connection connection;

public DAOClass() {


}

public void insertData(Event event, Map<String, String> params) {


try {


String body = new String(event.getBody());

Map<String, String> headers = event.getHeaders();

String fileName = (String)headers.get(“fileName”);

String databaseName = (String)params.get(“databaseName”);

String tableName = (String)params.get(“tableName”);

String partition = (String)params.get(“partition”);

String iscustom = (String)params.get(“iscustom”);

if (!”false”.equals(iscustom)) {


if (“true”.equals(iscustom)) {


;

}

} else {


String columns = “”;

Statement st = this.connection.createStatement();

String nowDate;

if (columnsMap.get(tableName) == null) {


nowDate = “select COLUMN_NAME from INFORMATION_SCHEMA.Columns where table_name='” + tableName + “‘ and  table_schema='” + databaseName + “‘”;

for(ResultSet resultSet = st.executeQuery(nowDate); resultSet.next(); columns = columns + resultSet.getString(“COLUMN_NAME”) + “,”) {


;

}

columns = columns.substring(0, columns.length() – 1);

columnsMap.put(tableName, columns);

} else {


columns = (String)columnsMap.get(tableName);

}

nowDate = getNowDate();

String[] datas = body.split(partition);

List<String> dataList = new ArrayList();

String IID = ImvnoUtil.getIID();

dataList.add(IID);

String[] arr$ = datas;

int i1 = datas.length;

for(int i$ = 0; i$ < i1; ++i$) {


String data = arr$[i$];

dataList.add(data);

}

dataList.add(fileName);

dataList.add(nowDate);

String indexs = “”;

i1 = 0;

while(true) {


if (i1 >= dataList.size()) {


indexs = indexs.substring(0, indexs.length() – 1);

INSERT_QUERY = “INSERT INTO ” + tableName + “(” + columns + “) values (” + indexs + “)”;

break;

}

String ind = “”;

if (((String)dataList.get(i1)).equals(“”)) {


ind = “null,”;

} else {


ind = “‘” + (String)dataList.get(i1) + “‘,”;

}

indexs = indexs + ind;

++i1;

}

}

PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY);

insertStmnt.execute();

} catch (SQLException var20) {


var20.printStackTrace();

}

}

public void createConnection(String driver, String db_url, String user, String password) throws IOException {


if (this.connection == null) {


try {


Class.forName(driver);

this.connection = DriverManager.getConnection(db_url, user, password);

} catch (ClassNotFoundException var6) {


var6.printStackTrace();

} catch (SQLException var7) {


var7.printStackTrace();

}

}

}

public void destroyConnection(String db_url, String user) {


if (this.connection != null) {


logger.debug(“Destroying connection to: {}:{}”, db_url, user);

try {


this.connection.close();

} catch (SQLException var4) {


var4.printStackTrace();

}

}

this.connection = null;

}

public static String escapeSQL(String s) {


return s.replaceAll(“‘”, “\\'”);

}

public Connection getConnection() {


return this.connection;

}

private static String getNowDate() {


SimpleDateFormat df = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);

String INSERT_DATE = df.format(new Date());

return INSERT_DATE;

}

}


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

大功告成

最后打包扔进lib 并且加入mysql 驱动jar包

编写配置文件内容(根据自己业务情况)


agent.channels.ch1.type = memory

agent.channels.ch1.capacity = 100000000

agent.channels.ch1.transactionCapacity = 100000000

agent.sinks.k1.type = com.tesnik.flume.sink.MySqlSink

agent.sinks.k1.url = jdbc:mysql://172.168.27.58:3306/imvno?autoReconnect=true&failOverReadOnly=false

agent.sinks.k1.user= root

agent.sinks.k1.password= 123456

agent.sinks.k1.driver= com.mysql.jdbc.Driver

agent.sinks.k1.databaseName = imvno

agent.sinks.k1.tableName = mvno_cdma_prepaid

agent.sinks.k1.partition = \\|

agent.sinks.k1.iscustom = false

agent.sinks.k1.channel = ch1

运行起来 ok

————————————————

版权声明:本文为CSDN博主「幼稚猿大叔」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/qq_40015759/article/details/82429117