Kafka是一个高吞吐量的分布式发布订阅消息系统,它的应用场景很多,如日志采集、消息系统、运营指标等。在日志采集的场景中,我们项目的重要服务可能会通过集群进行部署,每个服务有它自己的日志记录产生,这些日志都是散落在它们自己的服务器上,这种日志记录不集中的形式给我们分析日志的时候带来了很大的不便,因此我们需要通过日志采集将这些散落在各个服务器上的日志记录集中起来,便于我们在解决问题时进行日志分析和查看。
本文以Spring Boot + Kafka + ELK的方式实现日志采集,在此之前先简单介绍一下什么是ELK。
一、什么是ELK?
ELK是Elasticsearch + Logstash + Kibana三个组件的简称。
1、
Elasticsearch
:是一个分布式可扩展的实时搜索和分析引擎。
2、
Logstash
:是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
3、
Kibana
:是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana,可以通过各种图表进行高级数据分析及展示。
这三个组件的下载地址:
https://www.elastic.co/cn/downloads/past-releases/
,为了避免版本冲突的问题,下载的时候三个组件的版本最好选择相同的。顺便附上Kafka的安装教程地址:
https://blog.csdn.net/weixin_37968613/article/details/104606277
下面再来看一下Kafka + ELK是怎样帮我们实现日志采集的。
|
日志通过Spring boot的logback写入到Kafka中,Logstash订阅了对应的日志主题接收到日志记录,在经过日志过滤后写入Elasticsearch中,最终我们可以通过Kibana直接在页面上可视化的查看分析我们的日志记录。这里不把日志直接传到elasticsearch中,而是先传到kafka再传给elasticsearch,是因为Logstash在这里面相当于一根数据管道,而kafka正凭借其高吞吐量的特性可以缓解数据进入Logstash时的压力。
二、pom依赖
下面是日志采集需要用到的依赖。
<!-- kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--以下依赖是logback接入kafka所需依赖,日志采集实例-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
</dependency>
<!--以上依赖是logback接入kafka所需依赖,日志采集实例-->
三、启动Zookeeper
在上面Kafka的安装教程链接中可以知道在安装Kafka的时候我们就要安装Zookeeper,因为kafka是依赖于Zookeeper的。所以在启动Kafka之前要先启动Zookeeper,如果按照教程已经配置好了系统环境变量,那么在cmd中直接输入命令:zkServer即可启动。
四、启动Kafka
通过上面教程安装kafka并修改配置后,在kafka根目录下打开cmd,输入命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
即可启动kafka。
接着在kafka根目录下再打开一个cmd创建一个主题:collectionlog,用于接收日志记录:
.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic collectionlog
,主题名称可以随意,但要记住这个名称后面要用到。
五、启动Elasticsearch
进入elasticsearch的bin目录下点击elasticsearch.bat启动即可。
六、启动Logstash
需要先在Logstash的根目录下创建一个名为:
core.conf
的配置文件,内容如下。该配置是kafka和ELK的关联配置,Logstash接收kafka对应的主题消息,再发往es,注意里面的topic名称要改为刚才创建的主题。
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
#此处是kafka的ip和端口(千万不要写成zookeeper的ip端口,否则无法收集到日志)
bootstrap_servers => "localhost:9092"
#此处是我前面提到的topic名称
topics => ["collectionlog"]
auto_offset_reset => "latest"
}
}
output {
elasticsearch {
#es的Ip和端口
hosts => ["localhost:9200"]
}
}
然后进入bin目录下打开cmd,输入命令:
logstash.bat -f ..\core.conf
启动即可。
七、启动Kibana
进入Kibana的bin目录下点击启动kibana.bat即可。后面我们可以通过
http://localhost:5601
查看到收集的日志记录。
八、logback-spring.xml配置
下面回到Spring Boot项目中,在resources目录下创建logback-spring.xml配置文件,这里面的topic名称也需要修改为刚才创建的主题,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。 scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,
默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。 debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。
默认值为false。 -->
<!-- <configuration scan="false" scanPeriod="60 seconds" debug="false"> -->
<configuration>
<!--设置上下文名称,用于区分不同应用程序的记录。一旦设置不能修改, 可以通过%contextName来打印日志上下文名称 -->
<contextName>kafka-log-test</contextName>
<!-- 定义日志的根目录 -->
<property name="logDir" value="F:\Kafka\kafka-log" />
<!-- 定义日志文件名称 -->
<property name="logName" value="kafkaLog"></property>
<!-- ConsoleAppender 表示控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{10}) - %cyan(%msg%n)</pattern>
</encoder>
</appender>
<!-- 异常错误日志记录到文件 -->
<appender name="logfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- <Encoding>UTF-8</Encoding> -->
<File>${logDir}/${logName}.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logDir}/history/test_log.%d{yyyy-MM-dd}.rar</FileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<!-- 此处为kafka的Topic名称,千万不要写错 -->
<topic>collectionlog</topic>
<!-- we don't care how the log messages will be partitioned -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch. -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=0</producerConfig>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="logfile"/>
<appender-ref ref="kafkaAppender" />
</root>
</configuration>
然后在application.yml中配置。
logging:
config: classpath:logback-spring.xml
九、测试接口
最后在controller中写一个测试接口,打印一条日志。
package com.hedong.controller;
import com.hedong.api.KafkaLogCollectionApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: DoNg
* @description: DoNg
* @create: 2021-01-30 15:06
**/
@Slf4j
@RestController
public class KafkaLogCollectionController implements KafkaLogCollectionApi {
@Override
public void logCollection(String content) {
log.info(content);
}
}
九、测试
启动Spring Boot并通过接口打印一条日志,然后在浏览器中打开Kibana:
http://localhost:5601
可以看到我们的日志记录已经被收集到elasticsearch中了