#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math
import random
import time
import json
import base64
import sys
import os
import numpy as np
from aiokafka.producer import AIOKafkaProducer
import asyncio
loop = asyncio.get_event_loop()
import asyncio
from multiprocessing import Process
loop = asyncio.get_event_loop()
localtime = int(time.time())
CALLBACK_INVALID = 0
class Logger(object):
def __init__(self, file_name="Default.log", stream=sys.stdout):
self.terminal = stream
self.log = open(file_name, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
class MyEncoder(json.JSONEncoder):
def default(self, obj):
"""
只要检查到了是bytes类型的数据就把它转为str类型
:param obj:
:return:
"""
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, bytes):
return str(obj, encoding='utf-8');
def ChooseTopic(NextExecTime):
deltaT =int(NextExecTime - (int(time.time())))
print("deltaT=", deltaT)
if 2703360>deltaT > 0:
for j in range(5):
upperbound = int(math.pow(16, j))
if deltaT < upperbound *60:
m = int(math.pow(16, j - 1))
for g in range(16):
n = int(m+m * g)
if deltaT < n *60:
#topic = '{}min_test'.format(n)
topic = '-'.format(n)
return topic
else:
topic = '1min_'
return topic
async def send_many(num, loop):
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers='1.0.1?:9092')
await producer.start()
try:
starttime = int(time.time())
batch = producer.create_batch()
i = 0
count = 0
while i < num:
pTask = {
"Msg_type": 1, # 消息类型(1 数据,2 end)
"user_id": (random.randint(1, 999999999) + 10),
"task_id": 100,
"version_id": 100,
"cur_element_id": 2000,
"userlist_id": 300000,
"record_id": 300000,
"next_exec_time": 1625810400,
# "next_exec_time": localtime + (random.randint(0, 15) + random.randint(0, 15)+random.randint(0, 30) + 10) * 60,
"enqueue_time": localtime
}
pTaskbase64 = base64.b64encode(json.dumps(pTask).encode())
print("?=", pTask["?"])
DelayTask = {
"unique_id": ('u{a}:t{b}:v{c}:e{d}'.format(a=pTask["?"],b=pTask["?"],c=pTask["?"],d=pTask["?"])),
"callback_type": ?,
"payload": pTaskbase64,
"enqueue_time": localtime,
"next_exec_time": pTask["?"],
}
Delayt = json.dumps(DelayTask, cls=MyEncoder, indent=4).encode("utf-8")
#key = json.dumps(DelayTask["?"]).encode()
#timestamp = json.dumps(DelayTask["?"]).encode()
topic = ChooseTopic(pTask["?"])
#print("topic=", topic)
metadata = batch.append(key=None, value=Delayt, timestamp=None)
if metadata is None:
partitions = await producer.partitions_for(topic)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, topic, partition=partition)
print("%d messages sent to partition %d"
% (batch.record_count(), partition))
batch = producer.create_batch()
continue
i += 1
count += 1
#partition = random.randint(0,15)
partitions = await producer.partitions_for(topic)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, topic,partition=partition)
except Exception as e:
print(e)
finally:
await producer.stop()
print("topic=", topic)
overtime=int(time.time())
#print("startime=",starttime)
print("runtime=",overtime-starttime)
print("count=", count)
def f(n):
log_path = 'Logs/'
if not os.path.exists(log_path):
os.makedirs(log_path)
# 日志文件名按照程序运行时间设置
log_file_name = log_path + 'log-' + time.strftime("%Y%m%d-%H%M%S", time.localtime()) + '.log'
# 记录正常的 print 信息
sys.stdout = Logger(log_file_name)
# 记录 traceback 异常信息
sys.stderr = Logger(log_file_name)
starttime = int(time.time())
loop = asyncio.get_event_loop()
loop.run_until_complete(send_many(n, loop))
loop.close()
overtime = int(time.time())
print("startime2=", starttime)
print("runtime=", overtime - starttime)
if __name__ == '__main__':
p_lst = []
for i in range(10):
p = Process(target=f,args=(100000,))
p.start()
print("i=", i)
p_lst.append(p)
版权声明:本文为xiaoruirui1294原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。