共享内存+inotify机制实现多进程低延迟数据共享

  • Post author:
  • Post category:其他


本文是对

共享内存实现多进程低延迟队列 10us_Sweet_Oranges的博客-CSDN博客

的部分修正。


起因


之前的博客写过通过“inotify +file”的形式来实现多进程队列(跨进程共享)的文章。这种方式在通常情况下表现不错,但是这里存在一个问题就是“当消费者过慢,会产生大量的击穿内核高速缓冲区io,导致消费者卡在读取数据的瓶颈上,无法使用负载均衡等手段来提高处理能力。”

为了解决上述问题,引入了共享内存,众所周知,这是所有ipc中最快的通信方式,从根本上解决这个问题。下面通过实现一个producer 和 consumer 程序,来展示我的设计思路。


生产者


由于物理内存有限,生产者会使用一个环形缓冲区来保证热点数据始终在内存中(类似A/B缓存这个长度为2的最小环形队列一样)。

同时为了保证消费者的接入配置最小化,生产者将配置通过一个固定大小的结构体映射到内存中(类似我们的A/B缓冲区,前面加了一个header,header中放元信息,而A/B缓冲区放树)。消费者首先映射结构体读取配置信息,从结构体中的得知缓冲区大小后执行mremap进行重新调整大小,这样消费者只需要知道共享内存的地址(一个文件名),就可以实现消费(我们的A/B缓冲区不存在重新调整大小的情况,因为已经将数据源单帧数据大小写到配置文件中了)。同时采用了消息计数,来标识消费者是否已经处理所有消息,触发等待。当新数据到达后,唤醒消费者。我们选择了通过向指定文件写入一个字节的内容触发inotify,虽然通过信号量也可以实现,但是使用信号量会导致生产者要多开一个线程实现管理,引入额外的复杂度。

#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <iostream>
#include <semaphore.h>

#include <gflags/gflags.h>

DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_string(shm_key, "", "shm key");

using namespace std;

class Producer
{
public:
    Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
    {
        shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
    }

    bool Init(const std::string &key)
    {
        fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
        if (fd_ < 0)
        {
            printf("1. open inotify path failed\n");
            return false;
        }
        else
        {
            printf("1. open inotify path successed\n");
        }
        // 打开共享内存
        shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
        if (shm_fd_ < 0)
        {
            printf("2. shm_open  failed\n");
            return false;
        }
        else
        {
            printf("2. open shm path successed\n");
        }
        uint64_t size = shm_size_ + sizeof(SHM_Data);
        printf("size = %ld\n", size);

        if (ftruncate(shm_fd_, size) == -1)
        {
            printf("3. ftruncate  failed\n");
            return false;
        }
        else
        {
            printf("3. ftruncate  successed\n");
        }

        shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
        if (shm_data_ == MAP_FAILED)
        {
            printf("4. mmap  failed\n");
            return false;
        }
        else
        {
            printf("4. mmap  successed\n");
        }
        shm_data_->total = 0;
        shm_data_->size = shm_size_;
        memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
        memcpy(shm_data_->key, key.c_str(), key.size());
        return true;
    }
    void Write(const char *line)
    {
        for (int i = 0; line[i] != '\0'; i++)
        {
            if (current_offset_ >= shm_size_)
            {
                current_offset_ = 0;
            }
            shm_data_->buffer[current_offset_++] = line[i];
        }
        if (current_offset_ >= shm_size_)
        {
            current_offset_ = 0;
        }
        shm_data_->buffer[current_offset_++] = '\0';
        shm_data_->total++;

        struct timeval tv;
        struct timezone tz;

        write(fd_, "8", 1);

        gettimeofday(&tv, &tz);
        //std::cout << "second : \t" << tv.tv_sec << std::endl;                                 //秒
        std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000  << std::endl; // 微秒
    }

private:
    struct SHM_Data
    {
        uint64_t total;         // 记录消息总数
        char inotify_name[512]; // inotify 文件名
        char key[64];           // 当前数据标识
        uint64_t size;          // 环形缓冲区大小
        char buffer[];          // 环形缓冲区
    };
    SHM_Data *shm_data_ = nullptr; // 共享内存
    int fd_;
    int shm_fd_;
    uint64_t shm_size_ = 0;
    uint64_t buffer_size_ = 0;
    uint64_t total_read = 0;
    uint64_t current_offset_ = 0;
    int count = 0;//tmp
    std::string inotify_path_;
    std::string shm_path_;
};

int main(int argc, char *argv[])
{
    char line[10] = "123456789";
    gflags::ParseCommandLineFlags(&argc, &argv, true);

    Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
    producer.Init(FLAGS_shm_key);//FLAGS开头的这些指的是用户命令行输入的,如果不输入,默认就是空字符串
    for(int i =0;i<200;i++)//测试两百次
    {
         producer.Write(line);
         sleep(1);
    }
   // producer.Write(line);
    std::cout << "write finished" << std::endl;
}

消费者

消费者实现就相对简单一些,读取配置结构体(header),执行mremap调整大小。 如果机器性能足够,可以选择不等待inotify,类似自旋锁的方式。这种方式测试发现新消息能在10us左右被消费者感知,使用inoitfy新消息感知需要40us左右。

#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <iostream>
#include <sys/mman.h>
#include <sys/time.h>
#include <semaphore.h>
#include <gflags/gflags.h>

DEFINE_string(shm_file, "test", "shm file path");
DEFINE_bool(shm_nowait, false, "shm no wait mode");

#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;

class Consumer
{

public:
    Consumer(int tag, const std::string &shm_path)
        : tag_(tag), shm_path_(shm_path)
    {
        line_size_ = 1024;
        inotify_buffer_ = new char[BUF_LEN];
        line_ = new char[line_size_];
    }
    ~Consumer()
    {
        delete[] line_;
        delete[] inotify_buffer_;
    }
    bool Init(const std::string &key)
    {
        shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
        if (shm_fd_ < 0)
        {
            printf("1. shm_open  failed\n");
            return false;
        }
        else
        {
            printf("1. shm_open  successed\n");
        }

        SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
        if (shm_info_ == MAP_FAILED)
        {
            printf("2. mmap info failed\n");
            return false;
        }
        else
        {
            printf("2. mmap info successed\n");
        }
        printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
        if (strcasecmp(shm_info_->key, key.c_str()) != 0)
        {
            printf("3. key not match \n");
            return false;
        }
        else
        {
            printf("3. key matched \n");
        }
        // 开始监听文件变化
        inotify_fd_ = inotify_init();
        if (inotify_fd_ < 0)
        {
            printf("4. inotify_init  failed\n");
            return false;
        }
        else
        {
            printf("4. inotify_init  successed\n");
        }
        shm_size_ = shm_info_->size;
        uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
        printf("realsize = %ld\n", real_size);

        inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);

        shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
        if (shm_data_ == MAP_FAILED)
        {
            printf("5. mmap data failed\n");
            return false;
        }
        else
        {
            printf("5. mmap data successed\n");
        }
        return true;
    }

    void Loop()
    {
        while (true)
        {
            while (total_read < shm_data_->total)
            {
               // printf("5---------------\n");
                for (int i = 0; i < line_size_; i++)
                {
                    if (current_offset_ >= shm_size_)
                    {
                        current_offset_ = 0;
                    }
                    line_[i] = shm_data_->buffer[current_offset_++];
                    if (line_[i] == '\0')
                    {
                        break;
                    }
                }
                total_read++;
                printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
            }
           // printf("6.-------\n");
            if (!FLAGS_shm_nowait)
            {
                //printf("7.-------\n");
                read(inotify_fd_, inotify_buffer_, BUF_LEN);
                struct timeval tv;
                struct timezone tz;

                gettimeofday(&tv, &tz);
               // std::cout<< "second : \t" << tv.tv_sec << std::endl;                                 //秒
                std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000  << std::endl; // 微秒
            }
        }
    }

private:
    struct SHM_Data
    {
        uint64_t total;         // 记录消息总数
        char inotify_name[512]; // inotify 文件名
        char key[64];           // 当前数据标识
        uint64_t size;          // 环形缓冲区大小
        char buffer[];          // 环形缓冲区
    };
    SHM_Data *shm_data_ = nullptr; // 共享对象指针
    uint64_t shm_size_ = 0;        // 共享内存大小
    uint64_t line_size_ = 0;       // 每条数据最大值
    uint64_t total_read = 0;       // 当前读取总记录数
    uint64_t current_offset_ = 0;  // 当前读取的偏移量

    int count = 0;//tmp

    std::string shm_path_;

    int inotify_fd_;
    int shm_fd_;
    int tag_;

    char *line_;
    char *inotify_buffer_;
};
DEFINE_string(shm_key, "", "shm key");
int main(int argc, char *argv[])
{
    gflags::ParseCommandLineFlags(&argc, &argv, true);
    Consumer consumer(2, FLAGS_shm_file);
    if (!consumer.Init(FLAGS_shm_key))
    {
        return 1;
    }
    consumer.Loop();
}

Loop()这部分要改成回调机制。


编译

因为这里使用了gflag,所以需要先编译安装gflag。过程参考

linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装

对于本文的两个程序,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译。

结果如下:

可以发现,这种机制下,生产者生产完数据后,消费者能在100us内感知到,这个效率还是非常高的。

使用top也看不出来这两个进程对CPU的明显消耗。

本文的消费者目前采用的是while循环来持续监听inotify,这个机制还是要改造成事件机制比较好。

参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050



版权声明:本文为jinking01原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。