最核心的逻辑来了:RibbonRoutingFilter是如何将请求转发给服务的

  • Post author:
  • Post category:其他




1.RibbonRoutingFilter类

@Override
public Object run() {
	RequestContext context = RequestContext.getCurrentContext();
	this.helper.addIgnoredHeaders();
	try {
		RibbonCommandContext commandContext = buildCommandContext(context);
		ClientHttpResponse response = forward(commandContext);
		setResponse(response);
		return response;
	}
	//省略(catch信息)
}

点击forward(commandContext)进入如下代码。



2.RibbonRoutingFilter类

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
	Map<String, Object> info = this.helper.debug(context.getMethod(),
			context.getUri(), context.getHeaders(), context.getParams(),
			context.getRequestEntity());

	RibbonCommand command = this.ribbonCommandFactory.create(context);
	try {
		ClientHttpResponse response = command.execute();
		this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
		return response;
	}
	catch (HystrixRuntimeException ex) {
		return handleException(info, ex);
	}
}

分析:

1)第1行是解析context里面的信息,封装为一个map。

2)点击下面这行代码,进入

第3步

RibbonCommand command = this.ribbonCommandFactory.create(context);  

3)核心逻辑,点击 command.execute()进入

第4步

ClientHttpResponse response = command.execute();     	



3.HttpClientRibbonCommandFactory类

@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
	ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
	//省略
	return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
			clientFactory.getClientConfig(serviceId));
}

分析:

1)这个可以用于做同步降级,后面项目会用到

ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());

2)返回的是一个HttpClientRibbonCommand对象



4.HystrixCommand类

public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

点击queue()进入如下代码



5.HystrixCommand类

public Future<R> queue() {     
    final Future<R> delegate = toObservable().toBlocking().toFuture();
	
    final Future<R> f = new Future<R>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            /*
			省略
			*/
		}

        @Override
        public boolean isCancelled() {
            return delegate.isCancelled();
		}

        @Override
        public boolean isDone() {
            return delegate.isDone();
		}

        @Override
        public R get() throws InterruptedException, ExecutionException {
            return delegate.get();
        }

        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return delegate.get(timeout, unit);
        }        	
    };

    if (f.isDone()) {
        try {
            f.get();
            return f;
        } catch (Exception e) {
          		/*
				省略
				*/
          }
    }

    return f;
}

分析:

1)

好好研究下Future。


2)进一步断点进入

第6步



6.AbstractRibbonCommand类

@Override
protected ClientHttpResponse run() throws Exception {
	final RequestContext context = RequestContext.getCurrentContext();

	RQ request = createRequest();
	RS response;
	
	boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
			&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
	
	if (retryableClient) {
		response = this.client.execute(request, config);
	} else {
		response = this.client.executeWithLoadBalancer(request, config);
	}
	context.set("ribbonResponse", response);

	if (this.isResponseTimedOut()) {
		if (response != null) {
			response.close();
		}
	}

	return new RibbonHttpResponse(response);
}

分析:

1)retryableClient默认为false,所以在if-else语句中执行的是

response = this.client.executeWithLoadBalancer(request, config);

2)这里进一步执行会跳转到

第7步中


3)根据拿到的server,构造实际的请求

URI finalUri = reconstructURIWithServer(server, request.getUri());    

finalUri的值为:


http://localhost:9090/ServiceB/user/sayHello/1


4)这行代码调用底层的http组件,发送请求到对应的机器上去。

return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));

5)将执行结果response设置到context中去。

context.set("ribbonResponse", response);



7.AbstractRibbonCommand类

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
     LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

      try {
          return command.submit(
              new ServerOperation<T>() {
                  @Override
                  public Observable<T> call(Server server) {
                      URI finalUri = reconstructURIWithServer(server, request.getUri());
                      S requestForServer = (S) request.replaceUri(finalUri);
                      try {
                          return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                      } 
                      catch (Exception e) {
                          return Observable.error(e);
                      }
                  }
              })
              .toBlocking()
              .single();
      } catch (Exception e) {
        //省略
      }
      
  }

进入这个方法一直断点会到

第8步中



8.LoadBalancerCommand类

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

分析:

1)点击getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey)进入



9.LoadBalancerContext类

 public Server getServerFromLoadBalancer(@Nullable URI original, 
 			@Nullable Object loadBalancerKey) throws ClientException {
       /*
	省略
	*/
       ILoadBalancer lb = getLoadBalancer();
       if (host == null) {
           // Partial URI or no URI Case
           // well we have to just get the right instances from lb - or we fall back
           if (lb != null){
               Server svc = lb.chooseServer(loadBalancerKey);
               //省略
               return svc;
           } else {
            	//省略
             }
   }

分析:

1)这行代码是拿到一个server。这里loadBalancerKey默认为null。

Server svc = lb.chooseServer(loadBalancerKey);

这里全速前进会返回到

第6步中的方法中