分布式日志系统搭建记录(Filebeat->Elasticsearch->kibana)

  • Post author:
  • Post category:其他


使用docker快速搭建最简单的分布式日志系统



安装es

我这里用portainer(docker可视化工具)直接搭建,复制粘贴添加一个stack即可

在这里插入图片描述

elasticsearch.yml 如下,也可以输入shell命令 docker-comspoe -f elasticsearch.yml up -d

version: '3.0'
services:
   elasticsearch:
    image: elasticsearch:7.9.3
    container_name: elasticsearch
    ports:
      - 9200:9200
      - 9300:9300
    environment:
      discovery.type: single-node
      cluster.routing.allocation.disk.threshold_enabled: "false"
      #分片个数 默认1000
      cluster.max_shards_per_node: 100000
      #存储库位置
      path.repo.0: /usr/share/elasticsearch/data/backup
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - /data/local/elasticsearch/9200/data:/usr/share/elasticsearch/data
    deploy:
      mode: replicated
      replicas: 1
      resources:
        limits:
          memory: 2060M
        reservations:
          memory: 1024M

访问ip:9200

在这里插入图片描述



安装Filebeat


  1. 下载对应版本的filebeat
  2. tar -zxvf filebeat-7.10.0-linux-x86_64.tar.gz 解压之后得到目录

    在这里插入图片描述
  3. 修改filebeat.yml配置
# ============================== Filebeat inputs ===============================
#参考 https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /logs/*/*/request/*.log
  fields_under_root: true
  json: #当每行有一个JSON对象时,JSON解码才有效。
    keys_under_root: true  #解码后的JSON放置在输出文档中的“ json”键下。如果启用此设置,则将密钥复制到输出文档的顶层。默认为false。
    add_error_key: true  #如果keys_under_root和启用此设置,则在发生冲突时,来自解码的JSON对象的值将覆盖Filebeat通常添加的字段(类型,源,偏移量等)。
    message_key: logType #JSON对象的顶层key 必须包含才写入并且与该密钥关联的值必须是字符串,否则将不会进行过滤或多行聚合。
    ignore_decoding_error: false   #是否记录json转换异常日志
    overwrite_keys: true
  #ignore_older: 5m
  #close_older: 1h   #如果一个文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h,change只会在下一次scan才会被发现
  #force_close_files:  #Filebeat会在没有到达close_older之前一直保持文件的handle,如果在这个时间窗内删除文件会有问题,所以可以把force_close_files设置为true,只要filebeat检测到文件名字发生变化,就会关掉这个handle。
  scan_frequency: 10s  #Filebeat以多快的频率去prospector指定的目录下面检测文件更新(比如是否有新增文件),如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s。
  #tail_files: true     #如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送
  #backoff:   #Filebeat检测到某个文件到了EOF之后,每次等待多久再去检测文件是否有更新,默认为1s。
  #max_backoff:  #Filebeat检测到某个文件到了EOF之后,等待检测文件更新的最大时间,默认是10秒。
  fields:
    #定义自定义类型 后面建索引拿这个判断
    type: request_log
# ============================== Filebeat modules ==============================
filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml
  # Set to true to enable config reloading
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false
#禁用自定义模板
#setup.template.enabled: false 
setup.template.overwrite: false  #是否覆盖现有模板。默认为false
setup.template.name: "request_log"
setup.template.pattern: "request_log*"
setup.ilm.enabled: false
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  enabled: true
  hosts: ["192.168.0.202:9200"]
  #这里的%{[type]} 是上面指定的 因为设置了fields_under_root: true  所以字段在最顶层  否则是%{[fields.type]}   source是文档带过来的变量
  #index: "default-%{[type]}-%{[source]}-%{[agent.version]}-%{+yyy.MM.dd}"
  indices: 
    #这里的%{[type]} 是上面指定的 因为设置了fields_under_root: true  所以字段在最顶层  否则是%{[fields.type]}   source是文档带过来的变量   只有request_log里的文件自带source字段
    - index: "request_log-%{[source]}-%{+yyyy.MM.dd}"
      when.equals:
        type: "request_log"
# ================================= Processors =================================
processors:
  #- add_host_metadata:
  #    when.not.contains.tags: forwarded
  #- add_cloud_metadata: ~
  #- add_docker_metadata: ~
  #- add_kubernetes_metadata: ~
  - drop_fields:    #删除自带的某些字段   agent.version" 和 @timestamp删除不掉
      fields: ["input","log","host","ecs","@timestamp",
      "agent.ephemeral_id","agent.hostname","agent.id","agent.type",agent.version","agent.name","type"]
  1. 启动filebeat
#增加执行权限
chmod +x /usr/local/filebeat/filebeat-7.10.0-linux-x86_64/filebeat
#后台启动filebeat 并追加日志到filebeat.log 运行完之后exit退出回话,纺织回话异常关闭导致程序停止
cd /usr/local/filebeat/filebeat-7.10.0-linux-x86_64 && nohup ./filebeat -e >> filebeat.log 2>&1 &
#前台启动 不使用任何模组  排查问题的时候就不要后台启动了
./filebeat -e



日志写入

建议日志每行为json格式,这样filebeat不需要自定义切割,也不需要定义多行日志截取方式啥的

我的日志格式

在这里插入图片描述



kibana

  1. 安装 跟es一样 我这里使用docker安装

    kibana.yml
version: '3.0'
services:  
  kibana:
    image: kibana:7.9.3
    container_name: kibana
    environment:
      - TZ=Asia/Shanghai
      - XPACK_GRAPH_ENABLED=true
      - TIMELION_ENABLED=true
      - XPACK_MONITORING_COLLECTION_ENABLED="true"
      - ELASTICSEARCH_URL=http://192.168.0.202:9200  #环境变量:elasticsearch 请求地址
      - I18N_LOCALE=zh-CN                  #指定中文
    ports:
      - "5601:5601"
    volumes:
      - /data/local/kibana/config:/usr/share/kibana/config
  1. 可视化

    在这里插入图片描述

    需要注意的是 如果索引管理下多了filebeat-xxx,很可能是没配置好,数据解析出错了,需要检查日志每行是否是json,json没问题再看filebeat.yml配置

最后附上log4j配置和java 请求Aop代码

log4j.properties部分代码

#RequestAop 单独配置一个appender
log4j.logger.RequestAop=DEBUG,requestAop
log4j.appender.requestAop=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAop.File=${logPath}logs/supermanex-app/request/requestAop.log
log4j.appender.requestAop.DatePattern='_'yyyy_MM_dd'.log'
log4j.appender.requestAop.Threshold=INFO
log4j.appender.requestAop.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAop.layout.ConversionPattern=%m%n
#不继承父Logger的appender
log4j.additivity.RequestAop=false

RequestAop.java部分代码

package ;

/**
 * 功能描述:
 *
 * @author
 * 创建时间: 2020/9/15
 * 版权:
 */

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import eu.bitwalker.useragentutils.Browser;
import eu.bitwalker.useragentutils.UserAgent;
import eu.bitwalker.useragentutils.Version;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.aop.ProxyMethodInvocation;
import org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 *
 * @author
 * 2020年9月18日13:56:20
 */
@Aspect
@Component
@Slf4j(topic="RequestAop")
@Order(1)   //设置加载顺序
public class RequestAop {

    //===============================================  是否开启防重复提交功能
    @Value("${system.repeatSubmitAop.enable:false}")
    boolean enableRepeatSubmitAop;
    //=============================================== 是否开启日志
    @Value("${system.log.enable:false}")
    boolean logEnable;
    //=============================================== 是否保存响应中的data数据
    @Value("${system.log.saveReponseData:false}")
    boolean saveReponseData;

    @Autowired
    private RedisDistributedLock redisDistributedLock;

    static RestTemplate restTemplate;
    static SimpleDateFormat df;
    static{
        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        df.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
        restTemplate = new RestTemplate();
    }
    String hostName="";
    String localIP="";
    @Value("${server.port}")
    String localPort;
    @Value("${spring.application.name}")
    String source;
    @PostConstruct
    public void post(){
        try{
            hostName=LocalHostUtil.getHostName();
            localIP=LocalHostUtil.getLocalIP();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Pointcut("execution(* com.wudao.app.controller.*Controller.*(..))")
    public void init() {

    }

    @Before("init()")
    public void beforeAdvice(JoinPoint joinPoint) {
        // 进入方法前拦截
    }

    @Around("init()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable  {
        long startTime = System.currentTimeMillis();
        ThreadLocalUtil.setFalg("req");
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        Object obj = null;
        String message = "success";
        String http_code = "200";
        boolean state = true;
        JSONObject json = new JSONObject();
        JSONObject response = new JSONObject();
        if(logEnable) {
            json = new JSONObject();
            json.put("source", source);
            json.put("startTime", df.format(new Date(startTime)));
            String url = request.getRequestURI();
            json.put("logType", "___rest___");
            json.put("req_url", url);
            String urlArgs = this.urlArgs2String(request, request.getParameterNames(), url);
            json.put("urlArgs", urlArgs);       //url中的参数
            this.bodyArgs2String(pjp.getArgs(), json,url);
        }
        Method method = null;
        try {
            method = getMethod(pjp);
            if(method.isAnnotationPresent(AvoidRepeatSubmit.class)&&enableRepeatSubmitAop){    //禁止重复提交处理
                obj = avoidRepeatSubmit(pjp,request,method.getAnnotation(AvoidRepeatSubmit.class));
            }else{      //默认处理
                obj = pjp.proceed();
            }
            return obj;
        } catch (Throwable e) {
            /*if(e instanceof StatusException){
                StatusException e2 = (StatusException)e;
                LogUtil.Error("Error!", e2);
                return new Status(e2.stateMessage);
            }else{
                // LogUtil.Error("ExceptionError", e);
                return new Status(StateMessage.UN_KNOW_REASON);
            }*/
            /*if(e instanceof CustomException) {
             *//**
             * 拦截到主动抛出的异常
             *//*
                CustomException ex = (CustomException) e;
                this.logInfo2Kafka(request, url, bodyArgs, urlArgs, ex.getMsg(), 400, endTime - startTime, false);
                throw ex;
            } else {
                *//**
             * 拦截到未知异常
             *//*
                StringWriter stringWriter = new StringWriter();
                e.printStackTrace(new PrintWriter(stringWriter));
                this.logInfo2Kafka(request, url, bodyArgs, urlArgs, "未捕获异常: " + stringWriter.toString(), 500, endTime - startTime, false);
                throw new CustomException("未捕获异常: " + e.getMessage());
            }*/
            Exception ex = (Exception) e;
            message="未知异常";
            LogUtil.Error("未知异常",ex);
            http_code="500";
            state = false;
            response.put("message",message);
            response.put("state",state);
            throw e;
        }finally {
            if(logEnable){
                try{
                    //==============================================================  追加日志
                    List log = ThreadLocalUtil.getLogMsg(); //拿到LogUtil.Error(),LogUtil.tag()的日志
                    json.put("tag", log);       //ip地址
                    ThreadLocalUtil.remove();
                    json.put("http_code", http_code);
                    if(method.isAnnotationPresent(ApiOperation.class)){ //写入api中的描述
                        json.put("operation",method.getAnnotation(ApiOperation.class).value());
                    }
                    long endTime = System.currentTimeMillis();
                    String ip = IpUtils.getIpAddr(request);
                    HttpSession httpSession = request.getSession();
                    String sessionId = httpSession.getId();
                    json.put("req_ip", ip);       //ip地址
                    json.put("sessionId", sessionId);       //ip地址
                    json.put("req_type", request.getMethod());   //请求类型
                    json.put("state", state);
                    json.put("endTime", df.format(new Date(endTime)));
                    json.put("exTime", endTime - startTime);   //	执行耗时
                    json.put("channel", request.getHeader("channel"));
                    json.put("userAgentStr", request.getHeader("User-Agent"));
                    JSONObject instance = new JSONObject();
                    instance.put("ip",localIP);
                    instance.put("host", hostName);
                    instance.put("port", localPort);
                    json.put("instance", instance);
                    if(state){
                        response = (JSONObject) JSONObject.toJSON(obj);
                        if(!saveReponseData){
                            response.remove("data");
                        }else{
                            response.put("data",JSON.toJSONString(response.get("data")));
                        }
                    }
                    json.put("res", response);  //响应
                    saveLog(json);
                    //System.out.println("saveLog耗时"+(System.currentTimeMillis()-endTime));
                }catch(Exception e){
                    LogUtil.Error("requestAop异常",e);
                }
            }
        }
    }


    private Method getMethod(ProceedingJoinPoint pjp)
            throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
        MethodInvocationProceedingJoinPoint methodPjp = (MethodInvocationProceedingJoinPoint) pjp;
        Field field = methodPjp.getClass().getDeclaredField("methodInvocation");
        field.setAccessible(true);
        ProxyMethodInvocation invocation = (ProxyMethodInvocation) field.get(methodPjp);
        return invocation.getMethod();
    }
    /**
     *  防止重复请求
     * @param pjp
     * @param request
     * @return
     * @throws Throwable
     */
    public Object avoidRepeatSubmit(ProceedingJoinPoint pjp,HttpServletRequest request,AvoidRepeatSubmit avoidRepeatSubmit) throws Throwable {
        /*long lockSeconds = avoidRepeatSubmit.lockTime();*/
        Assert.notNull(request);
        HttpSession httpSession = request.getSession();
        String sessionId = httpSession.getId();
        String path = request.getServletPath();
        String ip = IpUtils.getIpAddr(request);
        String key = sessionId + ip+ path;
        long lockSeconds = avoidRepeatSubmit.value();
        //long lockSeconds=1000*60*5; //锁过期时间5分钟  单位毫秒
        boolean state = true;
        try{
            state = redisDistributedLock.setLock(key, sessionId, lockSeconds);
        }catch (Exception e){//拿锁时异常 就默认true
            state = true;
            LogUtil.Error("try lock err", e);
        }
        if (state) {
            log.info("try lock  success, key = [{}], clientId = [{}], 锁自动释放时间lockSeconds=[{}]", key, sessionId,lockSeconds);
            Object result;
            try {
                result = pjp.proceed();
            } finally {
                boolean r = redisDistributedLock.releaseLock(key, sessionId);
                log.info("release lock  {}, key = [{}], clientId = [{}]", r, key, sessionId);
            }
            return result;
        } else {
            long ttl = RedisCache.ttl(RedisDistributedLock.REDIS_LOCK_PREFIX+key);
            log.info("try lock failed, key=[{}],锁剩余自动释放时间={}s", key,ttl);
            return new Status(StateMessage.REPEAT_SUBMIT);
        }
    }


    /**
     * 获得浏览器信息
     * @return
     */
    public JSONObject getDeviceByUserAgent(String agent){
        JSONObject userAgentJson = new JSONObject();
        UserAgent userAgent = UserAgent.parseUserAgentString(agent);
        Browser browser = userAgent.getBrowser();
        Version version = userAgent.getBrowserVersion();
        userAgentJson.put("system",userAgent.getOperatingSystem());
        userAgentJson.put("browserName",browser.getName());
        userAgentJson.put("browserVersion",version!=null?version.getVersion():"UNKNOWN");
        return userAgentJson;
    }
    /**
     * 保存日志
     * @param json
     */

    public void saveLog(JSONObject json){
        json.put("userAgent",getDeviceByUserAgent(json.get("userAgentStr").toString()));    //解析浏览器信息
        log.info(json.toJSONString());

    }


    /**
     * 读取requestBody中的参数
     * @author Rex.Tan
     * @date 2019年9月12日 下午2:54:31
     * @param bodyArgs
     * @param json
     * @return
     */
    private String bodyArgs2String(Object[] bodyArgs, JSONObject json,String url) {
        try {
            if(bodyArgs != null && bodyArgs.length > 0) {
                for (Object obj : bodyArgs) {
                    if (obj instanceof BaseReq) {
                        BaseReq baseReq = ((BaseReq) obj);
                        String baseToken = baseReq.getBaseToken();
                        if(StringUtils.isNotBlank(baseToken)){  //获得user
                            json.put("user",getUserInfo(baseToken));
                        }
                    }
                }
                json.put("bodyArgs", JSONArray.toJSONString(bodyArgs));
            }
        } catch(Exception e) {
            LogUtil.Error("=============序列化requestBody中的参数出错, " + url,e);
        }
        return "";
    }

    /**
     * 读取url中的参数
     * @author Tan Ling
     * @date 2019年9月12日 下午2:54:40
     * @param request
     * @param urlArgs
     * @param url
     * @return
     */
    private String urlArgs2String(HttpServletRequest request, Enumeration<String> urlArgs, String url) {
        JSONObject urlArgsJson = new JSONObject();
        try {
            if(urlArgs != null) {
                while(urlArgs.hasMoreElements()) {
                    try {
                        String paraName = (String) urlArgs.nextElement();
                        urlArgsJson.put(paraName, request.getParameter(paraName));
                    } catch(Exception e) {
                        LogUtil.Error("=============记录url中的参数出错,"+url,e);
                        break;
                    }
                }
            }
            return urlArgsJson.toJSONString();
        }catch(Exception e) {
            e.printStackTrace();
            LogUtil.Error("=============记录url中的参数出错, " + url,e);
        }
        return "";
    }

}




版权声明:本文为chen_cxl原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。