#!/usr/bin/env python # -*- coding: utf-8 -*- import math import random import time import json import base64 import sys import os import numpy as np from aiokafka.producer import AIOKafkaProducer import asyncio loop = asyncio.get_event_loop() import asyncio from multiprocessing import Process loop = asyncio.get_event_loop() localtime = int(time.time()) CALLBACK_INVALID = 0 class Logger(object): def __init__(self, file_name="Default.log", stream=sys.stdout): self.terminal = stream self.log = open(file_name, "a") def write(self, message): self.terminal.write(message) self.log.write(message) def flush(self): pass class MyEncoder(json.JSONEncoder): def default(self, obj): """ 只要检查到了是bytes类型的数据就把它转为str类型 :param obj: :return: """ if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8'); def ChooseTopic(NextExecTime): deltaT =int(NextExecTime - (int(time.time()))) print("deltaT=", deltaT) if 2703360>deltaT > 0: for j in range(5): upperbound = int(math.pow(16, j)) if deltaT < upperbound *60: m = int(math.pow(16, j - 1)) for g in range(16): n = int(m+m * g) if deltaT < n *60: #topic = '{}min_test'.format(n) topic = '-'.format(n) return topic else: topic = '1min_' return topic async def send_many(num, loop): producer = AIOKafkaProducer( loop=loop, bootstrap_servers='1.0.1?:9092') await producer.start() try: starttime = int(time.time()) batch = producer.create_batch() i = 0 count = 0 while i < num: pTask = { "Msg_type": 1, # 消息类型(1 数据,2 end) "user_id": (random.randint(1, 999999999) + 10), "task_id": 100, "version_id": 100, "cur_element_id": 2000, "userlist_id": 300000, "record_id": 300000, "next_exec_time": 1625810400, # "next_exec_time": localtime + (random.randint(0, 15) + random.randint(0, 15)+random.randint(0, 30) + 10) * 60, "enqueue_time": localtime } pTaskbase64 = base64.b64encode(json.dumps(pTask).encode()) print("?=", pTask["?"]) DelayTask = { "unique_id": ('u{a}:t{b}:v{c}:e{d}'.format(a=pTask["?"],b=pTask["?"],c=pTask["?"],d=pTask["?"])), "callback_type": ?, "payload": pTaskbase64, "enqueue_time": localtime, "next_exec_time": pTask["?"], } Delayt = json.dumps(DelayTask, cls=MyEncoder, indent=4).encode("utf-8") #key = json.dumps(DelayTask["?"]).encode() #timestamp = json.dumps(DelayTask["?"]).encode() topic = ChooseTopic(pTask["?"]) #print("topic=", topic) metadata = batch.append(key=None, value=Delayt, timestamp=None) if metadata is None: partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) print("%d messages sent to partition %d" % (batch.record_count(), partition)) batch = producer.create_batch() continue i += 1 count += 1 #partition = random.randint(0,15) partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic,partition=partition) except Exception as e: print(e) finally: await producer.stop() print("topic=", topic) overtime=int(time.time()) #print("startime=",starttime) print("runtime=",overtime-starttime) print("count=", count) def f(n): log_path = 'Logs/' if not os.path.exists(log_path): os.makedirs(log_path) # 日志文件名按照程序运行时间设置 log_file_name = log_path + 'log-' + time.strftime("%Y%m%d-%H%M%S", time.localtime()) + '.log' # 记录正常的 print 信息 sys.stdout = Logger(log_file_name) # 记录 traceback 异常信息 sys.stderr = Logger(log_file_name) starttime = int(time.time()) loop = asyncio.get_event_loop() loop.run_until_complete(send_many(n, loop)) loop.close() overtime = int(time.time()) print("startime2=", starttime) print("runtime=", overtime - starttime) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=f,args=(100000,)) p.start() print("i=", i) p_lst.append(p)
版权声明:本文为xiaoruirui1294原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。