异步的AsyncHttpClient使用详解

  • Post author:
  • Post category:其他


背景

前面的一篇文章【同步的HttpClient使用详解】中,提到了服务端通进行网络请求的方式。也讲述了在并发量大的情况下使用HttpClient的连接池来提高性能。此方法虽然很有效果,但是当访问量极大或网络不好的情况下也会出现某些网络请求慢导致其它请求阻塞的情况,为此本文引入了异步的HttpClient包,将网络请求变成一个异步的请求,不影响其它的请求。

异步httpClient需要的jar包

 <!-- httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.1</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.6</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.3</version>
        </dependency>


注意:由于异步的HttpClient比较新,所以尽量使用该文中的版本jar包,或以上版本

使用方法

为了更好的使用,在这里简单的使用了工厂模式。将同步的httpclient与异步的httpclient通过工厂进行实例化

1、定义异步的httpclient

package com.studyproject.httpclient;

import java.nio.charset.CodingErrorAction;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;

import javax.net.ssl.SSLContext;

import org.apache.http.Consts;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.MalformedChallengeException;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.impl.auth.BasicSchemeFactory;
import org.apache.http.impl.auth.DigestSchemeFactory;
import org.apache.http.impl.auth.KerberosSchemeFactory;
import org.apache.http.impl.auth.NTLMSchemeFactory;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;

/**
 * 异步的HTTP请求对象,可设置代理
 */
public class HttpAsyncClient {

	private static int socketTimeout = 5000;// 设置等待数据超时时间5秒钟 根据业务调整

	private static int connectTimeout = 2000;// 连接超时

	private static int poolSize = 3000;// 连接池最大连接数

	private static int maxPerRoute = 1500;// 每个主机的并发最多只有1500,如果后端保有一台应用机就配置3000

	// http代理相关参数
	private String host = "";
	private int port = 0;
	private String username = "";
	private String password = "";

	// 异步httpclient
	private CloseableHttpAsyncClient asyncHttpClient;

	// 异步加代理的httpclient
	private CloseableHttpAsyncClient proxyAsyncHttpClient;

	public HttpAsyncClient() {
		try {
			this.asyncHttpClient = createAsyncClient(false);
			this.proxyAsyncHttpClient = createAsyncClient(true);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public CloseableHttpAsyncClient createAsyncClient(boolean proxy)
			throws KeyManagementException, UnrecoverableKeyException,
			NoSuchAlgorithmException, KeyStoreException,
			MalformedChallengeException, IOReactorException {

		SSLContext sslcontext = SSLContexts.createDefault();

		UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(
				username, password);

		CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
		credentialsProvider.setCredentials(AuthScope.ANY, credentials);

		// 设置协议http和https对应的处理socket链接工厂的对象
		Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
				.<SchemeIOSessionStrategy> create()
				.register("http", NoopIOSessionStrategy.INSTANCE)
				.register("https", new SSLIOSessionStrategy(sslcontext))
				.build();

		// 配置io线程
		IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
				.setIoThreadCount(Runtime.getRuntime().availableProcessors())
				.build();
		// 设置连接池大小
		ConnectingIOReactor ioReactor;
		ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
		PoolingNHttpClientConnectionManager conMgr = new PoolingNHttpClientConnectionManager(
				ioReactor, null, sessionStrategyRegistry, null);

		if (poolSize > 0) {
			conMgr.setMaxTotal(poolSize);
		}

        //每个主机最大的并发量
		if (maxPerRoute > 0) {
			conMgr.setDefaultMaxPerRoute(maxPerRoute);
		} else {
			conMgr.setDefaultMaxPerRoute(10);
		}
       
        //连接相关配置
		ConnectionConfig connectionConfig = ConnectionConfig.custom()
				.setMalformedInputAction(CodingErrorAction.IGNORE)
				.setUnmappableInputAction(CodingErrorAction.IGNORE)
				.setCharset(Consts.UTF_8).build();

		//请求相关配置
		RequestConfig requestConfig = RequestConfig.custom()
				.setConnectTimeout(connectTimeout)
				.setSocketTimeout(socketTimeout).build();

       
		Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder
				.<AuthSchemeProvider> create()
				.register(AuthSchemes.BASIC, new BasicSchemeFactory())
				.register(AuthSchemes.DIGEST, new DigestSchemeFactory())
				.register(AuthSchemes.NTLM, new NTLMSchemeFactory())
				.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
				.register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
				.build();
		conMgr.setDefaultConnectionConfig(connectionConfig);

		if (proxy) {
			return HttpAsyncClients.custom().setConnectionManager(conMgr)
					.setDefaultCredentialsProvider(credentialsProvider)
					.setDefaultAuthSchemeRegistry(authSchemeRegistry)
					.setProxy(new HttpHost(host, port))
					.setDefaultCookieStore(new BasicCookieStore())
					.setDefaultRequestConfig(requestConfig).build();
		} else {
			return HttpAsyncClients.custom().setConnectionManager(conMgr)
					.setDefaultCredentialsProvider(credentialsProvider)
					.setDefaultAuthSchemeRegistry(authSchemeRegistry)
					.setDefaultCookieStore(new BasicCookieStore()).build();
		}

	}

	public CloseableHttpAsyncClient getAsyncHttpClient() {
		return asyncHttpClient;
	}

	public CloseableHttpAsyncClient getProxyAsyncHttpClient() {
		return proxyAsyncHttpClient;
	}
}


2、定义同步的httpclient

package com.studyproject.httpclient;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 同步的HTTP请求对象,支持post与get方法以及可设置代理
 */
public class HttpSyncClient {

	private Logger logger = LoggerFactory.getLogger(HttpSyncClient.class);

	private static int socketTimeout = 1000;// 设置等待数据超时时间5秒钟 根据业务调整

	private static int connectTimeout = 2000;// 连接超时

	private static int maxConnNum = 4000;// 连接池最大连接数

	private static int maxPerRoute = 1500;// 每个主机的并发最多只有1500

	private static PoolingClientConnectionManager cm;

	private static HttpParams httpParams;

	private static final String DEFAULT_ENCODING = Charset.defaultCharset()
			.name();

	// proxy代理相关配置
	private String host = "";
	private int port = 0;
	private String username = "";
	private String password = "";

	private DefaultHttpClient httpClient;

	private DefaultHttpClient proxyHttpClient;

	// 应用启动的时候就应该执行的方法
	public HttpSyncClient() {

		this.httpClient = createClient(false);

		this.proxyHttpClient = createClient(true);
	}

	public DefaultHttpClient createClient(boolean proxy) {

		SchemeRegistry sr = new SchemeRegistry();
		sr.register(new Scheme("http", 80, PlainSocketFactory
				.getSocketFactory()));
		SSLSocketFactory sslFactory;
		try {
			SSLContext sslContext = SSLContext.getInstance("SSL");
			X509TrustManager tm = new X509TrustManager() {
				@Override
				public void checkClientTrusted(X509Certificate[] chain,
						String authType) throws CertificateException {
				}

				@Override
				public void checkServerTrusted(X509Certificate[] chain,
						String authType) throws CertificateException {
				}

				@Override
				public X509Certificate[] getAcceptedIssuers() {
					return null;
				}
			};
			sslContext.init(null, new TrustManager[] { tm },
					new java.security.SecureRandom());
			sslFactory = new SSLSocketFactory(sslContext,
					SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
			sr.register(new Scheme("https", 443, sslFactory));
		} catch (Exception e) {
			e.printStackTrace();
		}
		// 初始化连接池
		cm = new PoolingClientConnectionManager(sr);
		cm.setMaxTotal(maxConnNum);
		cm.setDefaultMaxPerRoute(maxPerRoute);
		httpParams = new BasicHttpParams();
		httpParams.setParameter(CoreProtocolPNames.PROTOCOL_VERSION,
				HttpVersion.HTTP_1_1);
		httpParams.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,
				connectTimeout);// 请求超时时间
		httpParams.setIntParameter(CoreConnectionPNames.SO_TIMEOUT,
				socketTimeout);// 读取数据超时时间
		// 如果启用了NoDelay策略,httpclient和站点之间传输数据时将会尽可能及时地将发送缓冲区中的数据发送出去、而不考虑网络带宽的利用率,这个策略适合对实时性要求高的场景
		httpParams.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true);
		httpParams.setBooleanParameter(
				CoreConnectionPNames.STALE_CONNECTION_CHECK, true);

		DefaultHttpClient httpclient = new DefaultHttpClient(cm, httpParams);

		if (proxy) {
			httpclient.getCredentialsProvider().setCredentials(
					new AuthScope(host, port),
					new UsernamePasswordCredentials(username, password));
		}

		return httpclient;

	}

	public DefaultHttpClient getHttpClient() {
		return httpClient;
	}

	public DefaultHttpClient getProxyClient() {
		return proxyHttpClient;
	}

	public String httpGet(String url, List<BasicNameValuePair> parameters) {

		DefaultHttpClient client = getHttpClient();// 默认会到池中查询可用的连接,如果没有就新建
		HttpGet getMethod = null;
		String returnValue = "";
		try {
			getMethod = new HttpGet(url);

			if (null != parameters) {
				String params = EntityUtils.toString(new UrlEncodedFormEntity(
						parameters, DEFAULT_ENCODING));
				getMethod.setURI(new URI(getMethod.getURI().toString() + "?"
						+ params));
				logger.debug("httpGet-getUrl:{}", getMethod.getURI());
			}

			HttpResponse response = client.execute(getMethod);
			int statusCode = response.getStatusLine().getStatusCode();

			if (statusCode == 200) {
				HttpEntity he = response.getEntity();
				returnValue = new String(EntityUtils.toByteArray(he),
						DEFAULT_ENCODING);
				return returnValue;
			}

		} catch (UnsupportedEncodingException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpGet Send Error,Code error:" + e.getMessage());
		} catch (ClientProtocolException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpGet Send Error,Protocol error:" + e.getMessage());
		} catch (IOException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpGet Send Error,IO error:" + e.getMessage());
		} catch (URISyntaxException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpGet Send Error,IO error:" + e.getMessage());
		} finally {// 释放连接,将连接放回到连接池
			getMethod.releaseConnection();

		}
		return returnValue;

	}

	public String httpPost(String url, List<BasicNameValuePair> parameters,
			String requestBody) {

		DefaultHttpClient client = getHttpClient();// 默认会到池中查询可用的连接,如果没有就新建
		HttpPost postMethod = null;
		String returnValue = "";
		try {
			postMethod = new HttpPost(url);

			if (null != parameters) {
				String params = EntityUtils.toString(new UrlEncodedFormEntity(
						parameters, DEFAULT_ENCODING));
				postMethod.setURI(new URI(postMethod.getURI().toString() + "?"
						+ params));
				logger.debug("httpPost-getUrl:{}", postMethod.getURI());
			}

			if (StringUtils.isNotBlank(requestBody)) {
				StringEntity se = new StringEntity(requestBody,
						DEFAULT_ENCODING);
				postMethod.setEntity(se);
			}

			HttpResponse response = client.execute(postMethod);
			int statusCode = response.getStatusLine().getStatusCode();

			if (statusCode == 200) {
				HttpEntity he = response.getEntity();
				returnValue = new String(EntityUtils.toByteArray(he),
						DEFAULT_ENCODING);
				return returnValue;
			}

		} catch (UnsupportedEncodingException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpPost Send Error,Code error:" + e.getMessage());
		} catch (ClientProtocolException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpPost Send Error,Protocol error:" + e.getMessage());
		} catch (IOException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpPost Send Error,IO error:" + e.getMessage());
		} catch (URISyntaxException e) {
			logger.error(Thread.currentThread().getName()
					+ "httpPost Send Error,IO error:" + e.getMessage());
		} finally {// 释放连接,将连接放回到连接池
			postMethod.releaseConnection();
			// 释放池子中的空闲连接
			// client.getConnectionManager().closeIdleConnections(30L,
			// TimeUnit.MILLISECONDS);
		}
		return returnValue;

	}
}


3、定义httpClient工厂类

package com.studyproject.httpclient;

/**
 * 
 * httpclient 工厂类
 * */
public class HttpClientFactory {

	private static HttpAsyncClient httpAsyncClient = new HttpAsyncClient();

	private static HttpSyncClient httpSyncClient = new HttpSyncClient();

	private HttpClientFactory() {
	}

	private static HttpClientFactory httpClientFactory = new HttpClientFactory();

	public static HttpClientFactory getInstance() {

		return httpClientFactory;

	}

	public HttpAsyncClient getHttpAsyncClientPool() {
		return httpAsyncClient;
	}

	public HttpSyncClient getHttpSyncClientPool() {
		return httpSyncClient;
	}

}


4、定义httpclient业务逻辑处理类,对外的方法可以通过这个类来封装

package com.studyproject.httpclient;

import java.io.IOException;
import java.net.URI;
import java.util.List;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.ParseException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * http client 业务逻辑处理类
 * */
public class HttpClientService {

	private static Logger LOG = LoggerFactory
			.getLogger(HttpClientService.class);

	protected void exeAsyncReq(String baseUrl, boolean isPost,
			List<BasicNameValuePair> urlParams,
			List<BasicNameValuePair> postBody, FutureCallback callback)
			throws Exception {

		if (baseUrl == null) {
			LOG.warn("we don't have base url, check config");
			throw new Exception("missing base url");
		}

		HttpRequestBase httpMethod;
		CloseableHttpAsyncClient hc = null;

		try {
			hc = HttpClientFactory.getInstance().getHttpAsyncClientPool()
					.getAsyncHttpClient();

			hc.start();

			HttpClientContext localContext = HttpClientContext.create();
			BasicCookieStore cookieStore = new BasicCookieStore();

			if (isPost) {
				httpMethod = new HttpPost(baseUrl);

				if (null != postBody) {
					LOG.debug("exeAsyncReq post postBody={}", postBody);
					UrlEncodedFormEntity entity = new UrlEncodedFormEntity(
							postBody, "UTF-8");
					((HttpPost) httpMethod).setEntity(entity);
				}

				if (null != urlParams) {

					String getUrl = EntityUtils
							.toString(new UrlEncodedFormEntity(urlParams));

					httpMethod.setURI(new URI(httpMethod.getURI().toString()
							+ "?" + getUrl));
				}

			} else {

				httpMethod = new HttpGet(baseUrl);

				if (null != urlParams) {

					String getUrl = EntityUtils
							.toString(new UrlEncodedFormEntity(urlParams));

					httpMethod.setURI(new URI(httpMethod.getURI().toString()
							+ "?" + getUrl));
				}
			}

			System.out.println("exeAsyncReq getparams:" + httpMethod.getURI());

			localContext.setAttribute(HttpClientContext.COOKIE_STORE,
					cookieStore);

			hc.execute(httpMethod, localContext, callback);

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	protected String getHttpContent(HttpResponse response) {

		HttpEntity entity = response.getEntity();
		String body = null;

		if (entity == null) {
			return null;
		}

		try {

			body = EntityUtils.toString(entity, "utf-8");

		} catch (ParseException e) {

			LOG.warn("the response's content inputstream is corrupt", e);
		} catch (IOException e) {

			LOG.warn("the response's content inputstream is corrupt", e);
		}
		return body;
	}
}


5、使用httpclient,这里只介绍了异步的用法

package com.studyproject.httpclient;

import java.util.ArrayList;
import java.util.List;

import org.apache.http.HttpResponse;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.message.BasicNameValuePair;

/**
 * http client 使用
 * */
public class HttClientUseDemo extends HttpClientService {

	public static void main(String[] args) {

		new HttClientUseDemo().getConfCall();
	}

	public void getConfCall() {

		String url = "http://220.181.14.110/xxxxx/xxxxx/searchbyappid.do";

		List<BasicNameValuePair> urlParams = new ArrayList<BasicNameValuePair>();
		urlParams.add(new BasicNameValuePair("appid", "2"));
		exeHttpReq(url, false, urlParams, null, new GetConfCall());
	}

	public void exeHttpReq(String baseUrl, boolean isPost,
			List<BasicNameValuePair> urlParams,
			List<BasicNameValuePair> postBody,
			FutureCallback<HttpResponse> callback) {

		try {
			System.out.println("enter exeAsyncReq");
			exeAsyncReq(baseUrl, isPost, urlParams, postBody, callback);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	/**
	 * 被回调的对象,给异步的httpclient使用
	 * 
	 * */
	class GetConfCall implements FutureCallback<HttpResponse> {

		/**
		 * 请求完成后调用该函数
		 */
		@Override
		public void completed(HttpResponse response) {

			System.out.println(response.getStatusLine().getStatusCode());
			System.out.println(getHttpContent(response));

			HttpClientUtils.closeQuietly(response);

		}

		/**
		 * 请求取消后调用该函数
		 */
		@Override
		public void cancelled() {

		}

		/**
		 * 请求失败后调用该函数
		 */
		@Override
		public void failed(Exception e) {

		}

	}
} 

上述代码中的有些参数,在使用的过程中需要替换。



版权声明:本文为angjunqiang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。