Java架构之redis集群使用场景

  • Post author:
  • Post category:java


  1. 性能和并发、分布式

    1. 让所有请求先访问缓存,如果缓存有数据,就不访问数据库,没有数据再访问数据库,适用于sql’请求次数多,但结果不经常改变
  2. 缺点

    1. 高并发下缓存与数据库一致性的问题

        1. 原因:先删缓存后更新数据库,删缓存成功更新数据失败,导致缓存没有数据,db是旧数据;先更新数据成功后删缓存失败,导致缓存是旧数据
        2. 解决:分布式锁、选择先更新数据再删缓存+消息队列弥补删缓存失败重试

          1. Setex key seconds value设置key并设置过期时间
          2. Expire key value设置过期时间,防止死锁,如果key是已经过期将会自动删除key
          3. Del key删除key
          4. 如果一个商品的key存入到缓存中,说明该商品被上锁了,执行完方法后解锁,最好是在访问数据库时加锁
        3. 分为最终一致性和强一致性。如果对数据有强一致性要求,不能放缓存。Redis所作的一切,只能保证最终一致性,方案也只能降低不一致发生的概率,无法完全避免
    2. 缓存雪崩、缓存击穿、缓存并发竞争问题

      1. 大并发项目才会遇到
      2. 缓存击穿:黑客故意请求缓存中不存在的数据,所有请求都怼到数据库上

        1. 互斥锁,缓存失效时,先获得锁再请求数据库,没得到锁则休眠一段时间重试
      3. 缓存雪崩:缓存同一时间大面积失效

        1. 给缓存的失效时间,加上一个随机值,避免集体失效
  3. 单线程的redis为什么很快

    1. 属于内存操作,单线程操作避免了频繁的上下文切换,采用了非阻塞io多路复用机制
    2. 非阻塞io多路复用机制

      1. 一个线程,根据每个socket的io流的状态,来管理多个io流。Io多路复用程序会同时监听多个socket,当被监听的socket准备好执行accept、read、write、close等操作,与这些操作对应的文件事件就会产生,io多路复用器会把所有产生事件的socket压入一个队列种,然后有序的每次仅一个socket的方式传送给文件事件分派器,文件事件分派其接收到socket之后会根据socket产生的事件类型调用对应的事件处理器进行处理

  1. 文件事件处理器

    1. Socket、
    2. io多路复用程序、
    3. 文件事件分派器dispather、
    4. 事件处理器handler

      1. 连接应答处理器:处理客户端连接请求
      2. 命令请求处理器:处理客户端传递的命令,如set、ipush等
      3. 命令回复处理器:用户返回客户端命令执行结果,如set、get
    5. 事件种类

      1. ae_readable:与两个事件处理器结合使用
      2. ae_writeable:当服务器有数据需要回传给客户端时,服务端将命令回复处理器与socket的ae_writeable事件关联起来
      3. 客户端与服务端交互过程
  2. 代码实现

    1. 请求监听器

      1. public class Acceptor implements Runnable {
      2. private final ServerSocketChannel ssc;
      3. private final Selector selector;
      4. public Acceptor(Selector selector, ServerSocketChannel ssc) {
      5. this.ssc=ssc;
      6. this.selector=selector;
      7. }
      8. @Override
      9. public void run() {
      10. try {
      11. SocketChannel sc= ssc.accept(); // 接受client連線請求
      12. if(sc!=null) {
      13. sc.configureBlocking(false); // 設置為非阻塞
      14. SelectionKey sk =sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key
      15. selector.wakeup(); // 使一個阻塞住的selector操作立即返回
      16. sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
    2. 事件派发器

      1. public class TCPReactor implements Runnable {
      2. private final ServerSocketChannel ssc;
      3. private final Selector selector;
      4. public TCPReactor(int port) throws IOException {
      5. selector = Selector.open();
      6. ssc = ServerSocketChannel.open();
      7. InetSocketAddress addr = new InetSocketAddress(port);
      8. ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
      9. ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
      10. SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件
      11. sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象
      12. }
      13. @Override
      14. public void run() {
      15. while (!Thread.interrupted()) { // 在線程被中斷前持續運行
      16. try {
      17. if (selector.select() == 0) // 若沒有事件就緒則不往下執行
      18. continue;
      19. } catch (IOException e) {
      20. // TODO Auto-generated catch block
      21. e.printStackTrace();
      22. }
      23. Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
      24. Iterator<SelectionKey> it = selectedKeys.iterator();
      25. while (it.hasNext()) {
      26. dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
      27. it.remove();
      28. }
      29. }
      30. }
      31. private void dispatch(SelectionKey key) {
      32. Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
      33. if (r != null)
      34. r.run();
      35. }
      36. }
    3. 事件处理器

      1. public class TCPHandler implements Runnable {
      2. private final SelectionKey sk;
      3. private final SocketChannel sc;
      4. int state;
      5. public TCPHandler(SelectionKey sk, SocketChannel sc) {
      6. this.sk = sk;
      7. this.sc = sc;
      8. state = 0; // 初始狀態設定為READING
      9. }
      10. @Override
      11. public void run() {
      12. try {
      13. if (state == 0)
      14. read(); // 讀取網絡數據
      15. else
      16. send(); // 發送網絡數據
      17. } catch (IOException e) {
      18. System.out.println(“[Warning!] A client has been closed.”);
      19. closeChannel();
      20. }
      21. }
      22. private void closeChannel() {
      23. try {
      24. sk.cancel();
      25. sc.close();
      26. } catch (IOException e1) {
      27. e1.printStackTrace();
      28. }
      29. }
      30. private synchronized void read() throws IOException {
      31. byte[] arr = new byte[1024];
      32. ByteBuffer buf = ByteBuffer.wrap(arr);
      33. int numBytes = sc.read(buf); // 讀取字符串
      34. if(numBytes == -1)
      35. {
      36. System.out.println(“[Warning!] A client has been closed.”);
      37. closeChannel();
      38. return;
      39. }
      40. String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
      41. if ((str != null) && !str.equals(” “)) {
      42. process(str); // 邏輯處理
      43. System.out.println(sc.socket().getRemoteSocketAddress().toString()
      44. + ” > ” + str);
      45. state = 1; // 改變狀態
      46. sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
      47. sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
      48. }
      49. }
      50. private void send() throws IOException  {
      51. // get message from message queue
      52. String str = “Your message has sent to ”
      53. + sc.socket().getLocalSocketAddress().toString() + “\r\n”;
      54. ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
      55. while (buf.hasRemaining()) {
      56. sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
      57. }
      58. state = 0; // 改變狀態
      59. sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
      60. sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
      61. }
      62. }
      63. 客户端代码
      64. public class Client {
      65. public static void main(String[] args) {
      66. String hostname=args[0];
      67. int port = Integer.parseInt(args[1]);
      68. try {
      69. Socket client = new Socket(hostname, port); // 連接至目的地
      70. PrintWriter out = new PrintWriter(client.getOutputStream());
      71. BufferedReader in=new BufferedReader(new InputStreamReader(client.getInputStream()));
      72. BufferedReader stdIn=new BufferedReader(new InputStreamReader(System.in));
      73. String input;
      74. while((input=stdIn.readLine()) != null) { // 讀取輸入
      75. out.println(input); // 發送輸入的字符串
      76. out.flush(); // 強制將緩衝區內的數據輸出
      77. if(input.equals(“exit”))
      78. {
      79. break;
      80. }
      81. System.out.println(“server: “+in.readLine());
      82. }
      83. client.close();
      84. } catch (UnknownHostException e) {
      85. System.err.println(“Don’t know about host: ” + hostname);
      86. } catch (IOException e) {
      87. System.err.println(“Couldn’t get I/O for the socket connection”);
  1. Redis的瓶颈不是cpu,而是内存或者带宽
  2. 数据类型

    1. String、hash、list、set、sorted set
  3. 过期策略:Redis采用的是定期删除+惰性删除策略

    1. 默认每隔100ms检查,是否有过期的key,有酒删除,但是并不是将所有key都检查一次,而是随机抽取检查,因此定期删除会导致很多key到时间没删除
    2. 于是,惰性删除配合酒完美了。只要客户端请求了key,如果是过期的酒会自动删除
    3. 如果定期删除没删除key,客户端也没有即时去请求key,redis的内存也还是会越来越高,这样就会采用内存淘汰机制,再redis.conf有一行配置#maxmemory-policy volatile-lru就是配置内存淘汰策略的

      1. Noeviction:当内存不足以容纳新写入数据时,新写入操作会报错
      2. Allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key,推荐使用
      3. Allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个key,
      4. Volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的key,这种情况一般把redis既当作缓存,又做持久化存储的时候才用,不推荐
      5. Volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key,不推荐
      6. Volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除,不推荐
    4. 如果没有设置expire的key,不满足先决条件,那么以上的策略行为和noeviction没区别
  4. Redis三种集群方式

    1. 主从复制

      1. 包含一个主数据库,一个或多个从数据库实例,
      2. 客户端可对主数据库进行读写,怼从数据库进行读,主数据写入的数据会实时自动同步给从数据库
      3. 工作机制:

        1. 从数据库启动后,向主数据库发送sync命令,主数据库接收到命令后通过bgsave保存快照,并使用缓冲区记录快照这段时间执行的写命令
        2. 主数据库将保存的快照文件发送给从数据库,并继续记录执行的写命令
        3. 从数据库接收到快照文件后,加载快照文件,载入数据
        4. 主数据库快照发送完后开始向从数据库发送缓冲区的写命令,保持数据一致性
      4. 部署:

        1. Redis.conf主要配置

          1. Protected-mode no #关闭保护模式
          2. Prot 6379 设置监听端口
          3. Timeout 30客户端连接空闲多久后端口连接
          4. Daemonize yes后台运行
          5. Pidfile redis_9379.pid
          6. Logfile redis.log
          7. #以下持久化配置
          8. Save 900 1 #900秒至少一次写操作则执行bgsave进行rdb持久化
          9. Save 300 10
          10. Save 60 10000
          11. #如果禁用rdb持久化,可以添加save
          12. Rdbcompression yes 是否怼rdb文件进行压缩,建议no
          13. Dofilename dump.rdb rdb文件名称
          14. Dir /redis/datas #rdb文件保存路径,aof也保存在这里
          15. #aof配置
          16. Appendonly yes
          17. Appendfsync everysec
          18. #设置密码
          19. Requirepass 123456
        2. 配置主从复制只需调整salve的配置即可

          1. Replicaof 127.0.0.1:6379 #master的ip,prot
          2. Masterauth 1234556 #master的密码
          3. Replica-serve-stale-data no #如果slave无法与master同步,设置为从不可读方便监控脚本
        3. 启动主从数据库

          1. Redis-server master.conf
          2. Redis-server salve1.conf
          3. Redis-server salve2.conf
          4. 从主写,从从读
        4. 执行info replication可以查看连接该为数据库的其他库连接信息
    2. 哨兵模式
    3. Redis-cluster集群

      1. 特点:客户端与redis节点直连,不需要中间代理层,客户端不需要连接集群所有节点,连接集群中任何一个可用节点
      1. 引入主从复制模式,一个主节点对应一个或多个从节点,当主节点宕机,就会启用从节点
      2. 当其他主节点ping一个主节点时,如果半数以上的主节点超时,那么认为主节点宕机了,就会切换到从节点
      3. 部署

        1. 复制6个redis,再每个reidis的redis.conf中配置

          1. Port 7100 #7100,7200,7300,7400,7500,7600
          2. Daemonize yes #后台运行
          3. Pidfile redis_7100.pid #pidfile对应7100,7200,7300,7400,7500,7600
          4. Cluster-enabled yes #开启集群模式
          5. Masterauth password #如果设置了密码,指定master密码
          6. Cluster-config-file nodes_7100.conf #集群的配置文件,对应6个
          7. Cluster-node-timeout 15000 请求超时,默认15秒
        2. 启动6个实例

          1. Redis-server redis_7100.conf 对应6个节点
        3. 通过命令将6个节点组成一个三主节点三从节点的集群

          1. Redis-cli-cluster create –cluster-replicas 1 127.0.0.1:7100
          2. 127.0.0.1:7200 127.0.0.1:7300 127.0.0.1:7400 127.0.0.1:7500
          3. 127.0.0.1:7600 -a password
        4. 连接7100设置一个值

          1. Redis-cli -p 7100 -c – a password  c表示集群
          2. Cluster nodes可查看集群的节点信息
          3. 通过7200 :kill -9 pid杀死进程来验证集群的高可用,重新进入集群执行cluster nodes可以看到7200fail了,但是7400成了master。重新启动7200,就成了slave从节点



版权声明:本文为a617332635原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。