grpc java 拦截器的使用(包含server&client)

  • Post author:
  • Post category:java


server端

package com.quantfn.portfolio.insterceptor;

import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.RedisTemplate;

import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.netty.util.internal.StringUtil;

public class UserAuthServerInsterceptor implements ServerInterceptor{



    RedisTemplate<String, String> redisTemplate;

    @SuppressWarnings("unchecked")
    public UserAuthServerInsterceptor(ApplicationContext context) {
        redisTemplate = context.getBean("redisTemplate", RedisTemplate.class);
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
                                                                 Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        //获取客户端参数
        Metadata.Key<String> token = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
        Metadata.Key<String> user_Id = Metadata.Key.of("userId", Metadata.ASCII_STRING_MARSHALLER);
        String tokenStr = headers.get(token);
        if (StringUtil.isNullOrEmpty(tokenStr)){
            System.err.println("未收到客户端token,关闭此连接");
            call.close(Status.DATA_LOSS,headers);
        }
        //获得token去中查询redis 查询
        String userId = redisTemplate.opsForValue().get(tokenStr);
        if(StringUtil.isNullOrEmpty(userId)){
             System.err.println("客户端token错误,关闭此连接");
             call.close(Status.DATA_LOSS,headers);
        }
        //服务端写回参数
        ServerCall<ReqT, RespT> serverCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
            @Override
            public void sendHeaders(Metadata headers) {
                headers.put(user_Id,userId);
                super.sendHeaders(headers);
            }
        };
        return next.startCall(serverCall,headers);
    }
}

client端

/*
 * Copyright 2015, gRPC Authors All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.quantfn.portfolio.insterceptor;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.logging.Logger;

/**
 * A interceptor to handle client header.
 */
public class UserAuthClientInterceptor implements ClientInterceptor {

    private static final Logger logger = Logger.getLogger(UserAuthClientInterceptor.class.getName());

    Metadata.Key<String> token = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions, Channel next) {
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                //此处为你登录后获得的token的值
                headers.put(token, "A2D05E5ED2414B1F8C6AEB19F40EF77C");
                super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
                    @Override
                    public void onHeaders(Metadata headers) {
                        /**
                         * if you don't need receive header from server, you can
                         * use {@link io.grpc.stub.MetadataUtils#attachHeaders}
                         * directly to send header
                         */
                        logger.info("header received from server:" + headers);
                        super.onHeaders(headers);
                    }
                }, headers);
            }
        };
    }
}

最后需要分别 addService 添加 server端拦截器 到 server的启动端

添加 client端拦截器到 client端

这里只把 add部分代码粘上

server端

.addService(ServerInterceptors.intercept(IUserAuthServiceGrpc.bindService(new UserAuthServiceImpl(context)), new UserAuthServerInsterceptor(context)))

client端

ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
        Channel channel1 = ClientInterceptors.intercept(channel, new UserAuthClientInterceptor());
        blockingStub = IUserAuthServiceGrpc.newBlockingStub(channel1);



版权声明:本文为mengxb12138原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。