微服务日志增加链路追踪requestId

  • Post author:
  • Post category:其他



目录


背景


网关


全局请求过滤器


鉴权过滤器/全局异常处理器改造


响应头处理过滤器


ThreadContext工具类


Common公共包中新增全局过滤器


公共过滤器


sevrlet 相关常量类


各个服务配置


FeignConfiguration


logback.xml配置


继续完善


使子线程能够继承线程上下文映射


扩展Hystrix


其他


设置filter执行顺序


背景

以下内容主要在原框架的基础上做部分改造,以及新增。改造是因为要兼容原filter等内容,但是实际上自己重新开始做的话也差不多。

实现了微服务下的日志链路追踪以及微服务中请求的耗时监控。

网关

网关中的全局请求过滤器、鉴权过滤器、全局异常处理器改造,增加响应头处理过滤器。

(ps:根据项目不同,过滤器、拦截器、处理器可能不同,但是原理是相同的,当前的系统只需要修改上面三个地方就可以了)

全局请求过滤器

import com.gene.gateway.utils.LogGlobalFilterUtil;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;


@Component
@Slf4j
public class GatewayFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        try {
            ServerHttpRequest serverHttpRequest = exchange.getRequest();
            ServerHttpResponse serverHttpResponse = exchange.getResponse();
            LogGlobalFilterUtil.setThreadContext(serverHttpRequest, serverHttpResponse);
            log.info("请求地址:{}", exchange.getRequest().getURI().getPath());
            return chain.filter(exchange);
        } finally {
            ThreadContext.remove(LogGlobalFilterUtil.REQUEST_ID_HEADER);
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

1.在请求和响应头中放入RequestId

2.在ThreadContext中放入RequestId(log4j2 1.x版本用MDC,2.x版本用ThreadContext)

ServerHttpRequest serverHttpRequest = exchange.getRequest();
ServerHttpResponse serverHttpResponse = exchange.getResponse();
LogGlobalFilterUtil.setThreadContext(serverHttpRequest, serverHttpResponse);

避免内存溢出

​
finally { ThreadContext.remove(LogGlobalFilterUtil.REQUEST_ID_HEADER); }

鉴权过滤器/全局异常处理器改造

跟全局请求过滤器对于request、response和ThreadContext的处理逻辑相同,只是其他业务逻辑不同.

(个人觉得这里并不完善,可以通过改造异步线程池以及设置request的子线程共享去实现在全局请求过滤器中处理一次就可以,本人没有试过,只是有个思路)

响应头处理过滤器

该过滤器的主要作用是处理响应中的header值重复的,因为在网关处理过一次response的header,在各个微服务中也要处理response的header,调用链上多次的处理会导致值重复,所以在返回的时候去重。

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.stream.Collectors;

/**
 * @description: 头部信息去重过滤器
 * @author: gene
 * @date: 2021/11/30 13:14
 */
@Component
@Slf4j
public class HeaderFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return chain.filter(exchange).then(
                Mono.defer(() -> {
                    HttpHeaders httpHeaders = exchange.getResponse().getHeaders();
                    httpHeaders.forEach((k, v) -> {
                        if (null != v && v.size() > 1) {
                            List<String> list = v.stream().distinct().collect(Collectors.toList());
                            httpHeaders.replace(k, list);
                        }
                    });
                    return chain.filter(exchange);
                })
        );
    }

    @Override
    public int getOrder() {
        // 指定此过滤器位于NettyWriteResponseFilter之后, 待处理完响应体后接着处理响应头
        return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
    }
}

指定此过滤器位于NettyWriteResponseFilter之后, 待处理完响应体后接着处理响应头

NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1

ThreadContext工具类

import org.apache.logging.log4j.ThreadContext;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;

import java.util.UUID;
import java.util.function.Consumer;

/**
 * @description: ThreadContext工具类
 * @author: gene
 * @date: 2021/11/29 17:43
 */
public class LogGlobalFilterUtil {
    /**
     * 响应头/MDCkey, RequestId
     */
    public static final String REQUEST_ID_HEADER = "RequestId";

    public static void setThreadContext(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse) {

        String requestId = serverHttpRequest.getHeaders().getFirst(REQUEST_ID_HEADER);
        if (null == requestId) {
            requestId = UUID.randomUUID().toString();
            String finalRequestId = requestId;
            // requestId放入请求中
            Consumer<HttpHeaders> headersConsumer = httpHeaders -> httpHeaders.add(REQUEST_ID_HEADER, finalRequestId);
            serverHttpRequest.mutate().headers(headersConsumer);
        }
        // requestId放入响应中 重置一遍以请求中的为准
        serverHttpResponse.getHeaders().remove(REQUEST_ID_HEADER);
        serverHttpResponse.getHeaders().add(REQUEST_ID_HEADER, requestId);
        ThreadContext.put(REQUEST_ID_HEADER, requestId);
    }
}

Common公共包中新增全局过滤器

拦截所有请求,优先级高于spring中的security相关的鉴权过滤器,顺便做了一下简单的统计耗时。

公共过滤器

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.ThreadContext;
import org.apache.tomcat.util.http.MimeHeaders;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.Duration;
import java.time.LocalTime;
import java.util.UUID;

import static com.gene.common.constant.ServletConstant.COYOTE_REQUEST;
import static com.gene.common.constant.ServletConstant.HEADERS;
import static com.gene.common.constant.ServletConstant.REQUEST;
import static com.gene.common.constant.ServletConstant.REQUEST_ID_HEADER;

/**
 * @description: 日志过滤器
 * @author: gene
 * @date: 2021/11/29 16:45
 */
@WebFilter(filterName = "geneLogFilter", urlPatterns = "/*")
@Component
@Slf4j
public class GeneLogFilter implements Filter, Ordered {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @SneakyThrows
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        LocalTime startTime = LocalTime.now();
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        HttpServletResponse httpResponse = (HttpServletResponse) response;

        String requestId = httpRequest.getHeader(REQUEST_ID_HEADER);
        if (requestId == null) {
            requestId = UUID.randomUUID().toString();
            reflectSetparam(httpRequest, requestId);
        }
        // TODO: 2021/11/29 这里可以扩展日志打印IP,参数等
        ThreadContext.put(REQUEST_ID_HEADER, requestId);

        if (!requestId.equals(httpResponse.getHeader(REQUEST_ID_HEADER))) {
            httpResponse.setHeader(REQUEST_ID_HEADER, requestId);
        }

        try {
            chain.doFilter(request, response);
        } finally {
            // 计算请求耗时
            LocalTime endTime = LocalTime.now();
            Duration total = Duration.between(startTime, endTime);
            log.info("请求URL:{},耗时:{}ms", httpRequest.getRequestURI(), total.toMillis());
            ThreadContext.remove(REQUEST_ID_HEADER);
        }
    }

    @Override
    public void destroy() {

    }

    private void reflectSetparam(HttpServletRequest request, String value)
            throws NoSuchFieldException, IllegalAccessException {
        Class<? extends HttpServletRequest> requestClass = request.getClass();
        Field field = requestClass.getDeclaredField(REQUEST);
        field.setAccessible(true);
        Object o = field.get(request);
        Field coyoteRequest = o.getClass().getDeclaredField(COYOTE_REQUEST);
        coyoteRequest.setAccessible(true);
        Object o1 = coyoteRequest.get(o);
        Field headers = o1.getClass().getDeclaredField(HEADERS);
        headers.setAccessible(true);
        MimeHeaders o2 = (MimeHeaders)headers.get(o1);
        o2.addValue(REQUEST_ID_HEADER).setString(value);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

利用反射修改请求头

reflectSetparam(httpRequest, requestId);

private void reflectSetparam(HttpServletRequest request, String value)
            throws NoSuchFieldException, IllegalAccessException {
        Class<? extends HttpServletRequest> requestClass = request.getClass();
        Field field = requestClass.getDeclaredField(REQUEST);
        field.setAccessible(true);
        Object o = field.get(request);
        Field coyoteRequest = o.getClass().getDeclaredField(COYOTE_REQUEST);
        coyoteRequest.setAccessible(true);
        Object o1 = coyoteRequest.get(o);
        Field headers = o1.getClass().getDeclaredField(HEADERS);
        headers.setAccessible(true);
        MimeHeaders o2 = (MimeHeaders)headers.get(o1);
        o2.addValue(REQUEST_ID_HEADER).setString(value);
    }

优先级最高的过滤器

@Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }

sevrlet 相关常量类

/**
 * @description: sevrlet 相关常量类
 * @author: gene
 * @date: 2021/11/29 17:31
 */
public class ServletConstant {
    /**
     * 响应头/ThreadContext key, RequestId
     */
    public static final String REQUEST_ID_HEADER = "RequestId";

    /**
     * REQUEST
     */
    public static final String REQUEST = "request";

    /**
     * COYOTE_REQUEST
     */
    public static final String COYOTE_REQUEST = "coyoteRequest";

    /**
     * HEADERS
     */
    public static final String HEADERS = "headers";
}

各个服务配置

FeignConfiguration

@Configuration
public class FeignConfiguration implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        RequestAttributes requestAttributes = getRequestAttributesSafely();
        if (null == requestAttributes) {
            return;
        }
        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) requestAttributes;
        HttpServletRequest req = servletRequestAttributes.getRequest();
        Map<String, Collection<String>> headerMap = new HashMap();
        // 获取需要传递的头信息
        String requestId = req.getHeader(REQUEST_ID_HEADER);
        headerMap.put(REQUEST_ID_HEADER, Arrays.asList(requestId));
        requestTemplate.headers(headerMap);
    }

    public RequestAttributes getRequestAttributesSafely() {
        try {
            return RequestContextHolder.currentRequestAttributes();
        } catch (IllegalStateException e) {
            return null;
        }
    }
}


实现RequestInterceptor的主要目的是让requestId在feign接口之前传递,这点很重要,当然这只是传递方式的一种

。方式参考:

  • 如果采取的是zuul等作为网关的话,请求经过网关之后,就生成requestqId,每个请求都需要经过网关才可以。
  • 如果直接使用http客户端对其他系统做调用,不管采用的是httpclient还是okhttp,皆可做全局全局配置,即构造一个HttpReqProxy的对象,在Proxy对象里面将获取到的请求头设置进去,方便传递。
  • 采用Spring的RestTemplate,这种时候可以编写一个工具类,在工具类内注入RestTemplate或者new一个RestTemplate,然后通过 HttpServletRequest request =((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getRequest(); 获取到request对象,从而获取请求头设置到RestTemplate中。


无法传递的原因:

开启hystrix后,feign请求会运行在hystrix管理的另一线程下。也就是说从请求方模块发送到服务方的请求参数可以正常传递,但是HttpServletRequest request对象并没有正常传递,从一个线程到另一个线程中时,request并没有跟随一起。需要手工组装request请求中的值。


如果你也遇到该异常:

No thread-bound request found: Are you referring to request attributes outside of an actual web…

网络上的解决思路是:

//异步调用,设置子线程共享 
ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); 
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {     
RequestContextHolder.setRequestAttributes(servletRequestAttributes,true);//设置子线程共享 
//此处你的业务逻辑 
});

但是我们的场景可能不适应,由于项目的不同我这里的解决方案是:

RequestAttributes requestAttributes = getRequestAttributesSafely();
public RequestAttributes getRequestAttributesSafely() {
        try {
            return RequestContextHolder.currentRequestAttributes();
        } catch (IllegalStateException e) {
            return null;
        }
    }

logback.xml配置

log4j2和logback配置差不多的

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <contextName>gene-text</contextName>

    <property name="LOG_PATTERN" value="%d %-5level [%thread] [%X{RequestId}] %logger{0} %L : %msg%n" />

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>/logs/gene-text/foundation.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>/logs/gene-text/foundation.%d{yyyy-MM-dd}.%i.log
            </fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy
                    class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>500MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <root level="info">
        <appender-ref ref="console"/>
        <appender-ref ref="file"/>
    </root>
    <logger name="org.springframework.web" level="WARN"/>
    <logger name="org.springframework.controllers" level="WARN"/>
</configuration>

主要是在pattern中添加[%X{RequestId}],样式可以自己定义。

继续完善

使子线程能够继承线程上下文映射

1.将系统属性`log4j2.isThreadContextMapInheritable` 设置为`true`,使子线程能够继承线程上下文映射。

  • -DisThreadContextMapInheritable=true
  • System.setProperty(“isThreadContextMapInheritable”, “true”);


如果通过系统参数的方式不起作用,就需要手动复制了,可以参考下面的扩展线程池的方式,该方式是将主线程MDC中的参数复制给子线程,同样的ThreadContext也可以

2.扩展ThreadPoolTaskExecutor线程池,实现将父线程的ThreadContext内容复制给子线程。

如果你使用的是MDC,需要用下面的方式

import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.Map;
 
/**
 * 这是{@link ThreadPoolTaskExecutor}的一个简单替换,可以在每个任务之前设置子线程的MDC数据。
 * <p/>
 * 在记录日志的时候,一般情况下我们会使用MDC来存储每个线程的特有参数,如身份信息等,以便更好的查询日志。
 * 但是Logback在最新的版本中因为性能问题,不会自动的将MDC的内存传给子线程。所以Logback建议在执行异步线程前
 * 先通过MDC.getCopyOfContextMap()方法将MDC内存获取出来,再传给线程。
 * 并在子线程的执行的最开始调用MDC.setContextMap(context)方法将父线程的MDC内容传给子线程。
 * <p>
 * https://logback.qos.ch/manual/mdc.html
 *
 */
public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
 
    /**
     * 所有线程都会委托给这个execute方法,在这个方法中我们把父线程的MDC内容赋值给子线程
     * https://logback.qos.ch/manual/mdc.html#managedThreads
     *
     * @param runnable
     */
    @Override
    public void execute(Runnable runnable) {
        // 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null
        Map<String, String> context = MDC.getCopyOfContextMap();
        super.execute(() -> run(runnable, context));
    }
 
    /**
     * 子线程委托的执行方法
     *
     * @param runnable {@link Runnable}
     * @param context  父线程MDC内容
     */
    private void run(Runnable runnable, Map<String, String> context) {
        // 将父线程的MDC内容传给子线程
        MDC.setContextMap(context);
        try {
            // 执行异步操作
            runnable.run();
        } finally {
            // 清空MDC内容
            MDC.clear();
        }
    }
}

扩展Hystrix

扩展Hystrix线程池隔离支持日志链路跟踪

/**
 * Hystrix线程池隔离支持日志链路跟踪
 *
 */
public class MdcHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
 
    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new MdcAwareCallable(callable, MDC.getCopyOfContextMap());
    }
 
    private class MdcAwareCallable<T> implements Callable<T> {
 
        private final Callable<T> delegate;
 
        private final Map<String, String> contextMap;
 
        public MdcAwareCallable(Callable<T> callable, Map<String, String> contextMap) {
            this.delegate = callable;
            this.contextMap = contextMap != null ? contextMap : new HashMap();
        }
 
        @Override
        public T call() throws Exception {
            try {
                MDC.setContextMap(contextMap);
                return delegate.call();
            } finally {
                MDC.clear();
            }
        }
    }
}

配置Hystrix

@Configuration
public class HystrixConfig {
 
    //用来拦截处理HystrixCommand注解
    @Bean
    public HystrixCommandAspect hystrixAspect() {
        return new HystrixCommandAspect();
    }
 
    @PostConstruct
    public void init() {
        HystrixPlugins.getInstance().registerConcurrencyStrategy(new MdcHystrixConcurrencyStrategy());
    }
 
}

(对于线程池扩展和Hystrix扩展参考:

MDC机制实现日志的链路追踪_luoqinglong的专栏-CSDN博客

其他

设置filter执行顺序

filter优先级高于interceptor

我们可以给filter设置order来控制执行顺序,数字越小优先级越高

设置order的方式有三种

  • @Order(0)
  • 实现Ordered接口
  • FilterRegistrationBean 注册的时候设置顺序,由于本文中在过滤器上直接使用了@WebFilter(filterName = “geneLogFilter”, urlPatterns = “/*”)注解来完成的注册,所以没有这个注册的配置类
@Configuration
public class RequestBodyConfig {

    @Bean
    public FilterRegistrationBean requestBodyFilterRegistration(){
        FilterRegistrationBean registration = new FilterRegistrationBean();
        //添加过滤器
        registration.setFilter(new RequestBodyFilter());
        //设置过滤路径,/*所有路径
        registration.addUrlPatterns("/*");
        registration.setName("requestBodyFilter");
        //设置优先级
        registration.setOrder(0);
        return registration;
    }
}



版权声明:本文为gxdogjava原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。