使用docker快速搭建最简单的分布式日志系统
安装es
我这里用portainer(docker可视化工具)直接搭建,复制粘贴添加一个stack即可
elasticsearch.yml 如下,也可以输入shell命令 docker-comspoe -f elasticsearch.yml up -d
version: '3.0'
services:
elasticsearch:
image: elasticsearch:7.9.3
container_name: elasticsearch
ports:
- 9200:9200
- 9300:9300
environment:
discovery.type: single-node
cluster.routing.allocation.disk.threshold_enabled: "false"
#分片个数 默认1000
cluster.max_shards_per_node: 100000
#存储库位置
path.repo.0: /usr/share/elasticsearch/data/backup
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- /data/local/elasticsearch/9200/data:/usr/share/elasticsearch/data
deploy:
mode: replicated
replicas: 1
resources:
limits:
memory: 2060M
reservations:
memory: 1024M
访问ip:9200
安装Filebeat
-
下载对应版本的filebeat
-
tar -zxvf filebeat-7.10.0-linux-x86_64.tar.gz 解压之后得到目录
- 修改filebeat.yml配置
# ============================== Filebeat inputs ===============================
#参考 https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
filebeat.inputs:
- type: log
enabled: true
paths:
- /logs/*/*/request/*.log
fields_under_root: true
json: #当每行有一个JSON对象时,JSON解码才有效。
keys_under_root: true #解码后的JSON放置在输出文档中的“ json”键下。如果启用此设置,则将密钥复制到输出文档的顶层。默认为false。
add_error_key: true #如果keys_under_root和启用此设置,则在发生冲突时,来自解码的JSON对象的值将覆盖Filebeat通常添加的字段(类型,源,偏移量等)。
message_key: logType #JSON对象的顶层key 必须包含才写入并且与该密钥关联的值必须是字符串,否则将不会进行过滤或多行聚合。
ignore_decoding_error: false #是否记录json转换异常日志
overwrite_keys: true
#ignore_older: 5m
#close_older: 1h #如果一个文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h,change只会在下一次scan才会被发现
#force_close_files: #Filebeat会在没有到达close_older之前一直保持文件的handle,如果在这个时间窗内删除文件会有问题,所以可以把force_close_files设置为true,只要filebeat检测到文件名字发生变化,就会关掉这个handle。
scan_frequency: 10s #Filebeat以多快的频率去prospector指定的目录下面检测文件更新(比如是否有新增文件),如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s。
#tail_files: true #如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送
#backoff: #Filebeat检测到某个文件到了EOF之后,每次等待多久再去检测文件是否有更新,默认为1s。
#max_backoff: #Filebeat检测到某个文件到了EOF之后,等待检测文件更新的最大时间,默认是10秒。
fields:
#定义自定义类型 后面建索引拿这个判断
type: request_log
# ============================== Filebeat modules ==============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
#禁用自定义模板
#setup.template.enabled: false
setup.template.overwrite: false #是否覆盖现有模板。默认为false
setup.template.name: "request_log"
setup.template.pattern: "request_log*"
setup.ilm.enabled: false
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
enabled: true
hosts: ["192.168.0.202:9200"]
#这里的%{[type]} 是上面指定的 因为设置了fields_under_root: true 所以字段在最顶层 否则是%{[fields.type]} source是文档带过来的变量
#index: "default-%{[type]}-%{[source]}-%{[agent.version]}-%{+yyy.MM.dd}"
indices:
#这里的%{[type]} 是上面指定的 因为设置了fields_under_root: true 所以字段在最顶层 否则是%{[fields.type]} source是文档带过来的变量 只有request_log里的文件自带source字段
- index: "request_log-%{[source]}-%{+yyyy.MM.dd}"
when.equals:
type: "request_log"
# ================================= Processors =================================
processors:
#- add_host_metadata:
# when.not.contains.tags: forwarded
#- add_cloud_metadata: ~
#- add_docker_metadata: ~
#- add_kubernetes_metadata: ~
- drop_fields: #删除自带的某些字段 agent.version" 和 @timestamp删除不掉
fields: ["input","log","host","ecs","@timestamp",
"agent.ephemeral_id","agent.hostname","agent.id","agent.type",agent.version","agent.name","type"]
- 启动filebeat
#增加执行权限
chmod +x /usr/local/filebeat/filebeat-7.10.0-linux-x86_64/filebeat
#后台启动filebeat 并追加日志到filebeat.log 运行完之后exit退出回话,纺织回话异常关闭导致程序停止
cd /usr/local/filebeat/filebeat-7.10.0-linux-x86_64 && nohup ./filebeat -e >> filebeat.log 2>&1 &
#前台启动 不使用任何模组 排查问题的时候就不要后台启动了
./filebeat -e
日志写入
建议日志每行为json格式,这样filebeat不需要自定义切割,也不需要定义多行日志截取方式啥的
我的日志格式
kibana
-
安装 跟es一样 我这里使用docker安装
kibana.yml
version: '3.0'
services:
kibana:
image: kibana:7.9.3
container_name: kibana
environment:
- TZ=Asia/Shanghai
- XPACK_GRAPH_ENABLED=true
- TIMELION_ENABLED=true
- XPACK_MONITORING_COLLECTION_ENABLED="true"
- ELASTICSEARCH_URL=http://192.168.0.202:9200 #环境变量:elasticsearch 请求地址
- I18N_LOCALE=zh-CN #指定中文
ports:
- "5601:5601"
volumes:
- /data/local/kibana/config:/usr/share/kibana/config
-
可视化
需要注意的是 如果索引管理下多了filebeat-xxx,很可能是没配置好,数据解析出错了,需要检查日志每行是否是json,json没问题再看filebeat.yml配置
最后附上log4j配置和java 请求Aop代码
log4j.properties部分代码
#RequestAop 单独配置一个appender
log4j.logger.RequestAop=DEBUG,requestAop
log4j.appender.requestAop=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAop.File=${logPath}logs/supermanex-app/request/requestAop.log
log4j.appender.requestAop.DatePattern='_'yyyy_MM_dd'.log'
log4j.appender.requestAop.Threshold=INFO
log4j.appender.requestAop.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAop.layout.ConversionPattern=%m%n
#不继承父Logger的appender
log4j.additivity.RequestAop=false
RequestAop.java部分代码
package ;
/**
* 功能描述:
*
* @author
* 创建时间: 2020/9/15
* 版权:
*/
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import eu.bitwalker.useragentutils.Browser;
import eu.bitwalker.useragentutils.UserAgent;
import eu.bitwalker.useragentutils.Version;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.aop.ProxyMethodInvocation;
import org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.*;
/**
*
* @author
* 2020年9月18日13:56:20
*/
@Aspect
@Component
@Slf4j(topic="RequestAop")
@Order(1) //设置加载顺序
public class RequestAop {
//=============================================== 是否开启防重复提交功能
@Value("${system.repeatSubmitAop.enable:false}")
boolean enableRepeatSubmitAop;
//=============================================== 是否开启日志
@Value("${system.log.enable:false}")
boolean logEnable;
//=============================================== 是否保存响应中的data数据
@Value("${system.log.saveReponseData:false}")
boolean saveReponseData;
@Autowired
private RedisDistributedLock redisDistributedLock;
static RestTemplate restTemplate;
static SimpleDateFormat df;
static{
df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
restTemplate = new RestTemplate();
}
String hostName="";
String localIP="";
@Value("${server.port}")
String localPort;
@Value("${spring.application.name}")
String source;
@PostConstruct
public void post(){
try{
hostName=LocalHostUtil.getHostName();
localIP=LocalHostUtil.getLocalIP();
}catch (Exception e){
e.printStackTrace();
}
}
@Pointcut("execution(* com.wudao.app.controller.*Controller.*(..))")
public void init() {
}
@Before("init()")
public void beforeAdvice(JoinPoint joinPoint) {
// 进入方法前拦截
}
@Around("init()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
long startTime = System.currentTimeMillis();
ThreadLocalUtil.setFalg("req");
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
Object obj = null;
String message = "success";
String http_code = "200";
boolean state = true;
JSONObject json = new JSONObject();
JSONObject response = new JSONObject();
if(logEnable) {
json = new JSONObject();
json.put("source", source);
json.put("startTime", df.format(new Date(startTime)));
String url = request.getRequestURI();
json.put("logType", "___rest___");
json.put("req_url", url);
String urlArgs = this.urlArgs2String(request, request.getParameterNames(), url);
json.put("urlArgs", urlArgs); //url中的参数
this.bodyArgs2String(pjp.getArgs(), json,url);
}
Method method = null;
try {
method = getMethod(pjp);
if(method.isAnnotationPresent(AvoidRepeatSubmit.class)&&enableRepeatSubmitAop){ //禁止重复提交处理
obj = avoidRepeatSubmit(pjp,request,method.getAnnotation(AvoidRepeatSubmit.class));
}else{ //默认处理
obj = pjp.proceed();
}
return obj;
} catch (Throwable e) {
/*if(e instanceof StatusException){
StatusException e2 = (StatusException)e;
LogUtil.Error("Error!", e2);
return new Status(e2.stateMessage);
}else{
// LogUtil.Error("ExceptionError", e);
return new Status(StateMessage.UN_KNOW_REASON);
}*/
/*if(e instanceof CustomException) {
*//**
* 拦截到主动抛出的异常
*//*
CustomException ex = (CustomException) e;
this.logInfo2Kafka(request, url, bodyArgs, urlArgs, ex.getMsg(), 400, endTime - startTime, false);
throw ex;
} else {
*//**
* 拦截到未知异常
*//*
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
this.logInfo2Kafka(request, url, bodyArgs, urlArgs, "未捕获异常: " + stringWriter.toString(), 500, endTime - startTime, false);
throw new CustomException("未捕获异常: " + e.getMessage());
}*/
Exception ex = (Exception) e;
message="未知异常";
LogUtil.Error("未知异常",ex);
http_code="500";
state = false;
response.put("message",message);
response.put("state",state);
throw e;
}finally {
if(logEnable){
try{
//============================================================== 追加日志
List log = ThreadLocalUtil.getLogMsg(); //拿到LogUtil.Error(),LogUtil.tag()的日志
json.put("tag", log); //ip地址
ThreadLocalUtil.remove();
json.put("http_code", http_code);
if(method.isAnnotationPresent(ApiOperation.class)){ //写入api中的描述
json.put("operation",method.getAnnotation(ApiOperation.class).value());
}
long endTime = System.currentTimeMillis();
String ip = IpUtils.getIpAddr(request);
HttpSession httpSession = request.getSession();
String sessionId = httpSession.getId();
json.put("req_ip", ip); //ip地址
json.put("sessionId", sessionId); //ip地址
json.put("req_type", request.getMethod()); //请求类型
json.put("state", state);
json.put("endTime", df.format(new Date(endTime)));
json.put("exTime", endTime - startTime); // 执行耗时
json.put("channel", request.getHeader("channel"));
json.put("userAgentStr", request.getHeader("User-Agent"));
JSONObject instance = new JSONObject();
instance.put("ip",localIP);
instance.put("host", hostName);
instance.put("port", localPort);
json.put("instance", instance);
if(state){
response = (JSONObject) JSONObject.toJSON(obj);
if(!saveReponseData){
response.remove("data");
}else{
response.put("data",JSON.toJSONString(response.get("data")));
}
}
json.put("res", response); //响应
saveLog(json);
//System.out.println("saveLog耗时"+(System.currentTimeMillis()-endTime));
}catch(Exception e){
LogUtil.Error("requestAop异常",e);
}
}
}
}
private Method getMethod(ProceedingJoinPoint pjp)
throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
MethodInvocationProceedingJoinPoint methodPjp = (MethodInvocationProceedingJoinPoint) pjp;
Field field = methodPjp.getClass().getDeclaredField("methodInvocation");
field.setAccessible(true);
ProxyMethodInvocation invocation = (ProxyMethodInvocation) field.get(methodPjp);
return invocation.getMethod();
}
/**
* 防止重复请求
* @param pjp
* @param request
* @return
* @throws Throwable
*/
public Object avoidRepeatSubmit(ProceedingJoinPoint pjp,HttpServletRequest request,AvoidRepeatSubmit avoidRepeatSubmit) throws Throwable {
/*long lockSeconds = avoidRepeatSubmit.lockTime();*/
Assert.notNull(request);
HttpSession httpSession = request.getSession();
String sessionId = httpSession.getId();
String path = request.getServletPath();
String ip = IpUtils.getIpAddr(request);
String key = sessionId + ip+ path;
long lockSeconds = avoidRepeatSubmit.value();
//long lockSeconds=1000*60*5; //锁过期时间5分钟 单位毫秒
boolean state = true;
try{
state = redisDistributedLock.setLock(key, sessionId, lockSeconds);
}catch (Exception e){//拿锁时异常 就默认true
state = true;
LogUtil.Error("try lock err", e);
}
if (state) {
log.info("try lock success, key = [{}], clientId = [{}], 锁自动释放时间lockSeconds=[{}]", key, sessionId,lockSeconds);
Object result;
try {
result = pjp.proceed();
} finally {
boolean r = redisDistributedLock.releaseLock(key, sessionId);
log.info("release lock {}, key = [{}], clientId = [{}]", r, key, sessionId);
}
return result;
} else {
long ttl = RedisCache.ttl(RedisDistributedLock.REDIS_LOCK_PREFIX+key);
log.info("try lock failed, key=[{}],锁剩余自动释放时间={}s", key,ttl);
return new Status(StateMessage.REPEAT_SUBMIT);
}
}
/**
* 获得浏览器信息
* @return
*/
public JSONObject getDeviceByUserAgent(String agent){
JSONObject userAgentJson = new JSONObject();
UserAgent userAgent = UserAgent.parseUserAgentString(agent);
Browser browser = userAgent.getBrowser();
Version version = userAgent.getBrowserVersion();
userAgentJson.put("system",userAgent.getOperatingSystem());
userAgentJson.put("browserName",browser.getName());
userAgentJson.put("browserVersion",version!=null?version.getVersion():"UNKNOWN");
return userAgentJson;
}
/**
* 保存日志
* @param json
*/
public void saveLog(JSONObject json){
json.put("userAgent",getDeviceByUserAgent(json.get("userAgentStr").toString())); //解析浏览器信息
log.info(json.toJSONString());
}
/**
* 读取requestBody中的参数
* @author Rex.Tan
* @date 2019年9月12日 下午2:54:31
* @param bodyArgs
* @param json
* @return
*/
private String bodyArgs2String(Object[] bodyArgs, JSONObject json,String url) {
try {
if(bodyArgs != null && bodyArgs.length > 0) {
for (Object obj : bodyArgs) {
if (obj instanceof BaseReq) {
BaseReq baseReq = ((BaseReq) obj);
String baseToken = baseReq.getBaseToken();
if(StringUtils.isNotBlank(baseToken)){ //获得user
json.put("user",getUserInfo(baseToken));
}
}
}
json.put("bodyArgs", JSONArray.toJSONString(bodyArgs));
}
} catch(Exception e) {
LogUtil.Error("=============序列化requestBody中的参数出错, " + url,e);
}
return "";
}
/**
* 读取url中的参数
* @author Tan Ling
* @date 2019年9月12日 下午2:54:40
* @param request
* @param urlArgs
* @param url
* @return
*/
private String urlArgs2String(HttpServletRequest request, Enumeration<String> urlArgs, String url) {
JSONObject urlArgsJson = new JSONObject();
try {
if(urlArgs != null) {
while(urlArgs.hasMoreElements()) {
try {
String paraName = (String) urlArgs.nextElement();
urlArgsJson.put(paraName, request.getParameter(paraName));
} catch(Exception e) {
LogUtil.Error("=============记录url中的参数出错,"+url,e);
break;
}
}
}
return urlArgsJson.toJSONString();
}catch(Exception e) {
e.printStackTrace();
LogUtil.Error("=============记录url中的参数出错, " + url,e);
}
return "";
}
}
版权声明:本文为chen_cxl原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。