-
性能和并发、分布式
- 让所有请求先访问缓存,如果缓存有数据,就不访问数据库,没有数据再访问数据库,适用于sql’请求次数多,但结果不经常改变
-
缺点
-
高并发下缓存与数据库一致性的问题
-
- 原因:先删缓存后更新数据库,删缓存成功更新数据失败,导致缓存没有数据,db是旧数据;先更新数据成功后删缓存失败,导致缓存是旧数据
-
解决:分布式锁、选择先更新数据再删缓存+消息队列弥补删缓存失败重试
- Setex key seconds value设置key并设置过期时间
- Expire key value设置过期时间,防止死锁,如果key是已经过期将会自动删除key
- Del key删除key
- 如果一个商品的key存入到缓存中,说明该商品被上锁了,执行完方法后解锁,最好是在访问数据库时加锁
- 分为最终一致性和强一致性。如果对数据有强一致性要求,不能放缓存。Redis所作的一切,只能保证最终一致性,方案也只能降低不一致发生的概率,无法完全避免
-
-
缓存雪崩、缓存击穿、缓存并发竞争问题
- 大并发项目才会遇到
-
缓存击穿:黑客故意请求缓存中不存在的数据,所有请求都怼到数据库上
- 互斥锁,缓存失效时,先获得锁再请求数据库,没得到锁则休眠一段时间重试
-
缓存雪崩:缓存同一时间大面积失效
- 给缓存的失效时间,加上一个随机值,避免集体失效
-
高并发下缓存与数据库一致性的问题
-
单线程的redis为什么很快
- 属于内存操作,单线程操作避免了频繁的上下文切换,采用了非阻塞io多路复用机制
-
非阻塞io多路复用机制
- 一个线程,根据每个socket的io流的状态,来管理多个io流。Io多路复用程序会同时监听多个socket,当被监听的socket准备好执行accept、read、write、close等操作,与这些操作对应的文件事件就会产生,io多路复用器会把所有产生事件的socket压入一个队列种,然后有序的每次仅一个socket的方式传送给文件事件分派器,文件事件分派其接收到socket之后会根据socket产生的事件类型调用对应的事件处理器进行处理
-
文件事件处理器
- Socket、
- io多路复用程序、
- 文件事件分派器dispather、
-
事件处理器handler
- 连接应答处理器:处理客户端连接请求
- 命令请求处理器:处理客户端传递的命令,如set、ipush等
- 命令回复处理器:用户返回客户端命令执行结果,如set、get
-
事件种类
- ae_readable:与两个事件处理器结合使用
- ae_writeable:当服务器有数据需要回传给客户端时,服务端将命令回复处理器与socket的ae_writeable事件关联起来
- 客户端与服务端交互过程
-
代码实现
-
请求监听器
- public class Acceptor implements Runnable {
- private final ServerSocketChannel ssc;
- private final Selector selector;
- public Acceptor(Selector selector, ServerSocketChannel ssc) {
- this.ssc=ssc;
- this.selector=selector;
- }
- @Override
- public void run() {
- try {
- SocketChannel sc= ssc.accept(); // 接受client連線請求
- if(sc!=null) {
- sc.configureBlocking(false); // 設置為非阻塞
- SelectionKey sk =sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key
- selector.wakeup(); // 使一個阻塞住的selector操作立即返回
- sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
-
事件派发器
- public class TCPReactor implements Runnable {
- private final ServerSocketChannel ssc;
- private final Selector selector;
- public TCPReactor(int port) throws IOException {
- selector = Selector.open();
- ssc = ServerSocketChannel.open();
- InetSocketAddress addr = new InetSocketAddress(port);
- ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
- ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
- SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件
- sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象
- }
- @Override
- public void run() {
- while (!Thread.interrupted()) { // 在線程被中斷前持續運行
- try {
- if (selector.select() == 0) // 若沒有事件就緒則不往下執行
- continue;
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
- Iterator<SelectionKey> it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
- it.remove();
- }
- }
- }
- private void dispatch(SelectionKey key) {
- Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
- if (r != null)
- r.run();
- }
- }
-
事件处理器
- public class TCPHandler implements Runnable {
- private final SelectionKey sk;
- private final SocketChannel sc;
- int state;
- public TCPHandler(SelectionKey sk, SocketChannel sc) {
- this.sk = sk;
- this.sc = sc;
- state = 0; // 初始狀態設定為READING
- }
- @Override
- public void run() {
- try {
- if (state == 0)
- read(); // 讀取網絡數據
- else
- send(); // 發送網絡數據
- } catch (IOException e) {
- System.out.println(“[Warning!] A client has been closed.”);
- closeChannel();
- }
- }
- private void closeChannel() {
- try {
- sk.cancel();
- sc.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- private synchronized void read() throws IOException {
- byte[] arr = new byte[1024];
- ByteBuffer buf = ByteBuffer.wrap(arr);
- int numBytes = sc.read(buf); // 讀取字符串
- if(numBytes == -1)
- {
- System.out.println(“[Warning!] A client has been closed.”);
- closeChannel();
- return;
- }
- String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
- if ((str != null) && !str.equals(” “)) {
- process(str); // 邏輯處理
- System.out.println(sc.socket().getRemoteSocketAddress().toString()
- + ” > ” + str);
- state = 1; // 改變狀態
- sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
- sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
- }
- }
- private void send() throws IOException {
- // get message from message queue
- String str = “Your message has sent to ”
- + sc.socket().getLocalSocketAddress().toString() + “\r\n”;
- ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
- while (buf.hasRemaining()) {
- sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
- }
- state = 0; // 改變狀態
- sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
- sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
- }
- }
- 客户端代码
- public class Client {
- public static void main(String[] args) {
- String hostname=args[0];
- int port = Integer.parseInt(args[1]);
- try {
- Socket client = new Socket(hostname, port); // 連接至目的地
- PrintWriter out = new PrintWriter(client.getOutputStream());
- BufferedReader in=new BufferedReader(new InputStreamReader(client.getInputStream()));
- BufferedReader stdIn=new BufferedReader(new InputStreamReader(System.in));
- String input;
- while((input=stdIn.readLine()) != null) { // 讀取輸入
- out.println(input); // 發送輸入的字符串
- out.flush(); // 強制將緩衝區內的數據輸出
- if(input.equals(“exit”))
- {
- break;
- }
- System.out.println(“server: “+in.readLine());
- }
- client.close();
- } catch (UnknownHostException e) {
- System.err.println(“Don’t know about host: ” + hostname);
- } catch (IOException e) {
- System.err.println(“Couldn’t get I/O for the socket connection”);
-
请求监听器
-
-
- Redis的瓶颈不是cpu,而是内存或者带宽
-
数据类型
- String、hash、list、set、sorted set
-
过期策略:Redis采用的是定期删除+惰性删除策略
- 默认每隔100ms检查,是否有过期的key,有酒删除,但是并不是将所有key都检查一次,而是随机抽取检查,因此定期删除会导致很多key到时间没删除
- 于是,惰性删除配合酒完美了。只要客户端请求了key,如果是过期的酒会自动删除
-
如果定期删除没删除key,客户端也没有即时去请求key,redis的内存也还是会越来越高,这样就会采用内存淘汰机制,再redis.conf有一行配置#maxmemory-policy volatile-lru就是配置内存淘汰策略的
- Noeviction:当内存不足以容纳新写入数据时,新写入操作会报错
- Allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key,推荐使用
- Allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个key,
- Volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的key,这种情况一般把redis既当作缓存,又做持久化存储的时候才用,不推荐
- Volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key,不推荐
- Volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除,不推荐
- 如果没有设置expire的key,不满足先决条件,那么以上的策略行为和noeviction没区别
-
Redis三种集群方式
-
主从复制
- 包含一个主数据库,一个或多个从数据库实例,
- 客户端可对主数据库进行读写,怼从数据库进行读,主数据写入的数据会实时自动同步给从数据库
-
工作机制:
- 从数据库启动后,向主数据库发送sync命令,主数据库接收到命令后通过bgsave保存快照,并使用缓冲区记录快照这段时间执行的写命令
- 主数据库将保存的快照文件发送给从数据库,并继续记录执行的写命令
- 从数据库接收到快照文件后,加载快照文件,载入数据
- 主数据库快照发送完后开始向从数据库发送缓冲区的写命令,保持数据一致性
-
部署:
-
Redis.conf主要配置
- Protected-mode no #关闭保护模式
- Prot 6379 设置监听端口
- Timeout 30客户端连接空闲多久后端口连接
- Daemonize yes后台运行
- Pidfile redis_9379.pid
- Logfile redis.log
- #以下持久化配置
- Save 900 1 #900秒至少一次写操作则执行bgsave进行rdb持久化
- Save 300 10
- Save 60 10000
- #如果禁用rdb持久化,可以添加save
- Rdbcompression yes 是否怼rdb文件进行压缩,建议no
- Dofilename dump.rdb rdb文件名称
- Dir /redis/datas #rdb文件保存路径,aof也保存在这里
- #aof配置
- Appendonly yes
- Appendfsync everysec
- #设置密码
- Requirepass 123456
-
配置主从复制只需调整salve的配置即可
- Replicaof 127.0.0.1:6379 #master的ip,prot
- Masterauth 1234556 #master的密码
- Replica-serve-stale-data no #如果slave无法与master同步,设置为从不可读方便监控脚本
-
启动主从数据库
- Redis-server master.conf
- Redis-server salve1.conf
- Redis-server salve2.conf
- 从主写,从从读
- 执行info replication可以查看连接该为数据库的其他库连接信息
-
Redis.conf主要配置
- 哨兵模式
-
Redis-cluster集群
- 特点:客户端与redis节点直连,不需要中间代理层,客户端不需要连接集群所有节点,连接集群中任何一个可用节点
-
主从复制
-
-
- 引入主从复制模式,一个主节点对应一个或多个从节点,当主节点宕机,就会启用从节点
- 当其他主节点ping一个主节点时,如果半数以上的主节点超时,那么认为主节点宕机了,就会切换到从节点
-
部署
-
复制6个redis,再每个reidis的redis.conf中配置
- Port 7100 #7100,7200,7300,7400,7500,7600
- Daemonize yes #后台运行
- Pidfile redis_7100.pid #pidfile对应7100,7200,7300,7400,7500,7600
- Cluster-enabled yes #开启集群模式
- Masterauth password #如果设置了密码,指定master密码
- Cluster-config-file nodes_7100.conf #集群的配置文件,对应6个
- Cluster-node-timeout 15000 请求超时,默认15秒
-
启动6个实例
- Redis-server redis_7100.conf 对应6个节点
-
通过命令将6个节点组成一个三主节点三从节点的集群
- Redis-cli-cluster create –cluster-replicas 1 127.0.0.1:7100
- 127.0.0.1:7200 127.0.0.1:7300 127.0.0.1:7400 127.0.0.1:7500
- 127.0.0.1:7600 -a password
-
连接7100设置一个值
- Redis-cli -p 7100 -c – a password c表示集群
- Cluster nodes可查看集群的节点信息
- 通过7200 :kill -9 pid杀死进程来验证集群的高可用,重新进入集群执行cluster nodes可以看到7200fail了,但是7400成了master。重新启动7200,就成了slave从节点
-
复制6个redis,再每个reidis的redis.conf中配置
-
版权声明:本文为a617332635原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。