1.RibbonRoutingFilter类
@Override
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders();
try {
RibbonCommandContext commandContext = buildCommandContext(context);
ClientHttpResponse response = forward(commandContext);
setResponse(response);
return response;
}
//省略(catch信息)
}
点击forward(commandContext)进入如下代码。
2.RibbonRoutingFilter类
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}
}
分析:
1)第1行是解析context里面的信息,封装为一个map。
2)点击下面这行代码,进入
第3步
RibbonCommand command = this.ribbonCommandFactory.create(context);
3)核心逻辑,点击 command.execute()进入
第4步
ClientHttpResponse response = command.execute();
3.HttpClientRibbonCommandFactory类
@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
//省略
return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}
分析:
1)这个可以用于做同步降级,后面项目会用到
ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
2)返回的是一个HttpClientRibbonCommand对象
4.HystrixCommand类
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
点击queue()进入如下代码
5.HystrixCommand类
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
/*
省略
*/
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
/*
省略
*/
}
}
return f;
}
分析:
1)
好好研究下Future。
2)进一步断点进入
第6步
6.AbstractRibbonCommand类
@Override
protected ClientHttpResponse run() throws Exception {
final RequestContext context = RequestContext.getCurrentContext();
RQ request = createRequest();
RS response;
boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
if (retryableClient) {
response = this.client.execute(request, config);
} else {
response = this.client.executeWithLoadBalancer(request, config);
}
context.set("ribbonResponse", response);
if (this.isResponseTimedOut()) {
if (response != null) {
response.close();
}
}
return new RibbonHttpResponse(response);
}
分析:
1)retryableClient默认为false,所以在if-else语句中执行的是
response = this.client.executeWithLoadBalancer(request, config);
2)这里进一步执行会跳转到
第7步中
3)根据拿到的server,构造实际的请求
URI finalUri = reconstructURIWithServer(server, request.getUri());
finalUri的值为:
http://localhost:9090/ServiceB/user/sayHello/1
4)这行代码调用底层的http组件,发送请求到对应的机器上去。
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
5)将执行结果response设置到context中去。
context.set("ribbonResponse", response);
7.AbstractRibbonCommand类
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
//省略
}
}
进入这个方法一直断点会到
第8步中
8.LoadBalancerCommand类
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
分析:
1)点击getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey)进入
9.LoadBalancerContext类
public Server getServerFromLoadBalancer(@Nullable URI original,
@Nullable Object loadBalancerKey) throws ClientException {
/*
省略
*/
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
//省略
return svc;
} else {
//省略
}
}
分析:
1)这行代码是拿到一个server。这里loadBalancerKey默认为null。
Server svc = lb.chooseServer(loadBalancerKey);
这里全速前进会返回到
第6步中的方法中