kafka压测,生产者多进程异步批量 循环生产用户

  • Post author:
  • Post category:其他


#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math
import random
import time
import json
import base64
import sys
import os
import numpy as np
from aiokafka.producer import AIOKafkaProducer
import asyncio
loop = asyncio.get_event_loop()
import asyncio
from multiprocessing import Process


loop = asyncio.get_event_loop()
localtime = int(time.time())
CALLBACK_INVALID = 0

class Logger(object):
    def __init__(self, file_name="Default.log", stream=sys.stdout):
        self.terminal = stream
        self.log = open(file_name, "a")

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)

    def flush(self):
        pass

class MyEncoder(json.JSONEncoder):
    def default(self, obj):
        """
        只要检查到了是bytes类型的数据就把它转为str类型
        :param obj:
        :return:
        """
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, bytes):
            return str(obj, encoding='utf-8');

def ChooseTopic(NextExecTime):
    deltaT =int(NextExecTime - (int(time.time())))
    print("deltaT=", deltaT)
    if 2703360>deltaT > 0:
        for j in range(5):
            upperbound = int(math.pow(16, j))
            if deltaT < upperbound *60:
                m = int(math.pow(16, j - 1))
                for g in range(16):
                    n = int(m+m * g)
                    if deltaT < n *60:
                        #topic = '{}min_test'.format(n)
                        topic = '-'.format(n)
                        return topic
    else:
        topic = '1min_'
        return topic

async def send_many(num, loop):
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='1.0.1?:9092')
    await producer.start()
    try:
        starttime = int(time.time())
        batch = producer.create_batch()
        i = 0
        count = 0
        while i < num:
            pTask = {
                "Msg_type": 1,  # 消息类型(1 数据,2 end)
                "user_id": (random.randint(1, 999999999) + 10),
                "task_id": 100,
                "version_id": 100,
                "cur_element_id": 2000,
                "userlist_id": 300000,
                "record_id": 300000,
                "next_exec_time": 1625810400,
                # "next_exec_time": localtime + (random.randint(0, 15) + random.randint(0, 15)+random.randint(0, 30) + 10) * 60,
                "enqueue_time": localtime
            }
            pTaskbase64 = base64.b64encode(json.dumps(pTask).encode())
            print("?=", pTask["?"])
            DelayTask = {
                "unique_id": ('u{a}:t{b}:v{c}:e{d}'.format(a=pTask["?"],b=pTask["?"],c=pTask["?"],d=pTask["?"])),
                "callback_type": ?,
                "payload": pTaskbase64,
                "enqueue_time": localtime,
                "next_exec_time": pTask["?"],
            }
            Delayt = json.dumps(DelayTask, cls=MyEncoder, indent=4).encode("utf-8")
            #key = json.dumps(DelayTask["?"]).encode()
            #timestamp = json.dumps(DelayTask["?"]).encode()
            topic = ChooseTopic(pTask["?"])
            #print("topic=", topic)

            metadata = batch.append(key=None, value=Delayt, timestamp=None)
            if metadata is None:
                partitions = await producer.partitions_for(topic)
                partition = random.choice(tuple(partitions))
                await producer.send_batch(batch, topic, partition=partition)
                print("%d messages sent to partition %d"
                      % (batch.record_count(), partition))
                batch = producer.create_batch()
                continue
            i += 1
            count += 1

        #partition = random.randint(0,15)
        partitions = await producer.partitions_for(topic)
        partition = random.choice(tuple(partitions))
        await producer.send_batch(batch, topic,partition=partition)

    except Exception as e:
        print(e)
    finally:
        await producer.stop()
        print("topic=", topic)
        overtime=int(time.time())
        #print("startime=",starttime)
        print("runtime=",overtime-starttime)
        print("count=", count)

def f(n):
    log_path = 'Logs/'
    if not os.path.exists(log_path):
        os.makedirs(log_path)
    # 日志文件名按照程序运行时间设置
    log_file_name = log_path + 'log-' + time.strftime("%Y%m%d-%H%M%S", time.localtime()) + '.log'
    # 记录正常的 print 信息
    sys.stdout = Logger(log_file_name)
    # 记录 traceback 异常信息
    sys.stderr = Logger(log_file_name)

    starttime = int(time.time())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_many(n, loop))
    loop.close()
    overtime = int(time.time())
    print("startime2=", starttime)
    print("runtime=", overtime - starttime)

if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = Process(target=f,args=(100000,))
        p.start()
        print("i=", i)
        p_lst.append(p)









版权声明:本文为xiaoruirui1294原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。