目录
- 环境:jdk1.8,MySQL 5.5.41,mysql-connector-java-5.1.26.jar,lombok-1.18.6.jar
- 需求:我们常用各种数据库连接池,如druid、c3p0、dbcp、tomcat-jdbc或是SpringBoot默认使用的hikari等等,但是数据库连接池的实现原理是怎样的,我们可以通过自己实现一个简单的数据库连接池,来理解它的底层机制。
- 准备工作:(1)建表ticket;(2)封装一Jdbc工具类MyJdbcConnect,用于获取和关闭Jdbc连接。
一、准备工作
(1)建表ticket,插入一条测试记录,如下:
(2)封装一Jdbc工具类
MyJdbcConnect
,用于获取和关闭Jdbc连接。代码如下:
package com.szh.jdbcpool;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public class MyJdbcConnect {
private static String driverClass = "com.mysql.jdbc.Driver";
private static String url = "jdbc:mysql://127.0.0.1:3306/cjia2?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
private static String username = "root";
private static String password = "root";
static {
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
log.error(e.getMessage());
}
}
private Connection connection;
public MyJdbcConnect() {
try {
connection = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
log.error(e.getMessage());
}
}
public void close() {
try {
if (connection != null && !connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
log.error(e.getMessage());
}
}
}
二、非连接池方式
我们先使用最传统的方式, 使200个线程同时去获取Jdbc连接并查询唯一的一张车票ticket,代码如下:
package com.szh;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import com.szh.jdbcpool.MyJdbcConnect;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NonPoolTests {
final static int threadNum = 200;
private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadNum);
public static void main(String[] args) {
for (int i = 0; i < threadNum; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
COUNT_DOWN_LATCH.await();
String sql = "select * from ticket limit 1";
Connection connection = new MyJdbcConnect().getConnection();
ResultSet resultSet = connection.createStatement().executeQuery(sql);
resultSet.next();
log.info("{} 查询结果:{}", Thread.currentThread().getName(), resultSet.getString("ticket_no"));
} catch (InterruptedException | SQLException e) {
log.error(e.getMessage());
}
}
}).start();
COUNT_DOWN_LATCH.countDown();
}
}
}
运行一下,看看200个线程是否都能成功获取到Jdbc连接并查询成功,结果如下:
23:17:10.388 [Thread-64] INFO com.szh.NonPoolTests - Thread-64 查询结果:G7001
23:17:10.391 [Thread-28] ERROR com.szh.jdbcpool.MyJdbcConnect - Data source rejected establishment of connection, message from server: "Too many connections"
Exception in thread "Thread-154" java.lang.NullPointerException
at com.szh.NonPoolTests$1.run(NonPoolTests.java:27)
at java.lang.Thread.run(Thread.java:745)
...
结果表明
,
数据源拒绝建立连接,来自服务器的消息:“连接太多”
。显而易见,需要同时建立的Jdbc连接太多,而Jdbc连接的建立又较费资源和时间,所以必须使用数据库连接池达到Jdbc连接复用,以解决高并发问题。
三、自定义连接池方式
我们常用各种数据库连接池,所以对它的常用配置属性比较了解,如最大连接数量
maxActive
、超时等待时间
maxWait
和最大空闲连接数量
maxIdle
等等,但是各个配置是如何生效的,底层的运行机制是怎样?
3.1 自定义连接池
接下来,我们针对这3个配置来实现一个自己的简单的数据库连接池
MyPool
,代码如下:
package com.szh.jdbcpool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MyPool {
int maxActive;
long maxWait;
int maxIdle;
// 数据库连接池里的Jdbc连接有2种状态,一是正在使用中,二是用过以后又被返还的空闲中
LinkedBlockingQueue<MyJdbcConnect> busy;
LinkedBlockingQueue<MyJdbcConnect> idle;
// 目前在池子中已创建的连接数(不能大于最大连接数maxActive)
AtomicInteger createdCount = new AtomicInteger(0);
/**
* 连接池初始化
*
* @param maxActive
* 最大连接数量,连接数量不能超过该值
* @param maxWait
* 超时等待时间以毫秒为单位 6000毫秒/1000等于60秒,当连接超过该时间便认为其是空闲连接
* @param maxIdle
* 最大空闲连接,当空闲连接超过该值时就挨个关闭多余的连接,但不能小于minldle
*/
public void init(int maxActive, long maxWait, int maxIdle) {
this.maxActive = maxActive;
this.maxWait = maxWait;
this.maxIdle = maxIdle;
this.busy = new LinkedBlockingQueue<>();
this.idle = new LinkedBlockingQueue<>();
}
/**
* 从连接池中获取数据库连接。忽略poll、offer的结果判断。
*/
public MyJdbcConnect getResource() throws Exception {
MyJdbcConnect myJdbcConnect = idle.poll();
// 有空闲的可以用
if (myJdbcConnect != null) {
boolean offerResult = busy.offer(myJdbcConnect);
return myJdbcConnect;
}
// 没有空闲的,看当前已建立的连接数是否已达最大连接数maxActive
if (createdCount.get() < maxActive) {
// 已建立9个,maxActive=10。3个线程同时进来..
if (createdCount.incrementAndGet() <= maxActive) {
myJdbcConnect = new MyJdbcConnect();
boolean offerResult = busy.offer(myJdbcConnect);
return myJdbcConnect;
} else {
createdCount.decrementAndGet();
}
}
// 达到了最大连接数,需等待释放连接
myJdbcConnect = idle.poll(maxWait, TimeUnit.MILLISECONDS);
if (myJdbcConnect != null) {
boolean offerResult = busy.offer(myJdbcConnect);
return myJdbcConnect;
} else {
throw new Exception("等待超时!");
}
}
/**
* 将数据库连接返还给连接池。忽略poll、offer的结果判断。
*/
public void returnResource(MyJdbcConnect jdbcConnect) {
if (jdbcConnect == null) {
return;
}
// 忽略连接状态的检查
// jdbcConnect.getConnection().isClosed()
boolean removeResult = busy.remove(jdbcConnect);
if (removeResult) {
// 控制空闲连接的数量
if (maxIdle <= idle.size()) {
jdbcConnect.close();
createdCount.decrementAndGet();
return;
}
boolean offerResult = idle.offer(jdbcConnect);
if (!offerResult) {
jdbcConnect.close();
createdCount.decrementAndGet();
}
} else {
// 无法复用
jdbcConnect.close();
createdCount.decrementAndGet();
}
}
}
3.2 运行测试自定义连接池
对非连接池方式的
NonPoolTests
稍加改造,初始化连接池,以连接池的方式获取和返还连接,如下:
package com.szh;
import java.sql.Connection;
import java.sql.ResultSet;
import java.util.concurrent.CountDownLatch;
import com.szh.jdbcpool.MyJdbcConnect;
import com.szh.jdbcpool.MyPool;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PoolTests {
final static int threadNum = 200;
private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadNum);
public static void main(String[] args) {
MyPool pool = new MyPool();
pool.init(20, 2000, 10);
for (int i = 0; i < threadNum; i++) {
new Thread(new Runnable() {
@Override
public void run() {
MyJdbcConnect connect = null;
try {
COUNT_DOWN_LATCH.await();
String sql = "select * from ticket limit 1";
connect = pool.getResource();
Connection connection = connect.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery(sql);
resultSet.next();
log.info("{} 查询结果:{}", Thread.currentThread().getName(), resultSet.getString("ticket_no"));
} catch (Exception e) {
log.error(e.getMessage());
} finally {
pool.returnResource(connect);
}
}
}).start();
COUNT_DOWN_LATCH.countDown();
}
}
}
运行一下,看看200个线程是否都能成功获取到Jdbc连接并查询成功,结果如下:
23:52:24.044 [Thread-6] INFO com.szh.PoolTests - Thread-6 查询结果:G7001
...
23:52:24.249 [Thread-198] INFO com.szh.PoolTests - Thread-198 查询结果:G7001
结果表明
,200个线程均成功从连接池中获取到连接并成功查询,可以有效复用Jdbc连接,减轻服务器和数据库的资源压力。当然,不同的连接池使用的思路都有不同,本文只是一种实现方案。
3.3 技术总结答疑
(1)
在手写Jdbc连接池的过程中,使用到了哪些关键技术和设计模式?
答:关键技术(JDBC、多线程、阻塞队列BlockingQueue、原子操作类AtomicInteger、计数器CountDownLatch);设计模式(
享元模式
)。
(2)
为何使用队列
Queue
而不用别的数据结构或集合类?
答:首先说明为什么使用队列
Queue
,它具有这个特性,先进先出刚好满足Jdbc连接的时效性。以下是队列的API:
(3)
Queue
的实现那么多,为何使用阻塞队列
BlockingQueue
而不用非阻塞队列
ConcurrentLinkedQueue
等?
答:这便牵扯到
阻塞非阻塞队列的区别
了,阻塞队列在offer的时候,若队列已经满了,则阻塞住(加锁)一段时间等待有空闲位置;同理,阻塞队列在poll的时候,若队列为空,也阻塞住(加锁)一段时间等待队列有元素;
阻塞队列
的这个特性刚好可以满足连接池的
maxWait
属性的需求,因为,从数据库连接池的设计来看,当发生上面两种情景时,我们应该先等待连接池一段时间以更大的概率来获得和复用宝贵的Jdbc连接(更多的复用正是各种池或享元模式的核心),而不能没有一点耐心地直接要求连接池返回给我们一个成功与否的结果,而这正是
非阻塞队列
的特性。从源码分析,也不难看出,阻塞队列的offer/poll函数可接收timeout的阻塞等待时长,内部使用ReentrantLock和Condition的加锁机制达到队列阻塞效果;反观非阻塞队列的offer/poll函数实现,则没有这样的阻塞处理。
(4)
为何用
LinkedBlockingQueue
而不用
ArrayBlockingQueue
?
答:这便牵扯到
链表和数组的区别
了,链表便于节点增删,数组便于查找。而对数据库连接池的设计来说,不存在要根据索引查找(RandomAccess随机查找)队列里的某个中间位置的元素,更多的是队列头部元素(即最早进来的Jdbc连接对象)的offer/poll操作。
(5)
为何用
Queue
而不用
Deque
?二者有何区别和关联?
答:
Queue
很常见,而
Deque
则相对少一些,Deque即Double ended Queue,用作双端队列的场景,它是Queue的子接口。显然,从Jdbc连接对象的时效性来看,只能先进先出,最后进的没有理由也要求先出,所以Deque不适用。
(6)
最大等待时间
maxWait
的实现核心poll(long timeout, TimeUnit unit)内部如何实现自动通知?
答:依赖于
ReentrantLock
和
Condition
的加锁机制。可翻阅jdk源码
LinkedBlockingQueue<E>.poll(long timeout, TimeUnit unit)
。
(7)
AtomicInteger
为何能实现线程安全?
答:归功于
CAS机制
(compareAndSwap,无锁机制,乐观锁)。如果有多个线程要对内存中的数值10进行自增操作,那么,每个线程都会去内存中获取并记录下原本的数值10,然后再记录下自增以后的期望值11,等到真正要做自增操作时,会先比较内存中变量的最新值和自己记录过的原本的数值10,若相等,则这个线程可以对该变量进行原子自增;若不相等,代表该数值10已被别的线程自增过,则需要再次记录最新的修改后的数值(假设)11,同样再记录下自增以后的期望值12。