#!/usr/bin/env python import threading import time import sys import random import string import atexit import pika import logging import pdb #pdb.set_trace() _Log = logging.getLogger() #logging.basicConfig(level=logging.DEBUG) #pdb.set_trace() py_version = sys.version_info[0] _OPTIONAL_PROTOCOLS = { 'sslv2': 'PROTOCOL_SSLv2', 'sslv3': 'PROTOCOL_SSLv3', 'tlsv1_1': 'PROTOCOL_TLSv1_1', 'tlsv1_2': 'PROTOCOL_TLSv1_2', } cmd = None hostname = None msg_counts = [] msg_time_sums = [] time_start = 0 time_end = 0 ################# options exchange_name = '' ex_type = None queue_name = None r_key = None msg=None msg_hdr = None auto_ack_l = True auto_delete_l = False count=1 msg_size = 100 prefetch = 0 allchs = False durable_l = False persistent_l = False multiple_l=False rate=1000 thread_num=1 confirm = True quorum = False reject_count=0 cancel_count=0 msg_send_time = False interval=0 port=5672 priority=0 sp=None pra=False sgc=False use_ssl=False user='guest' password='guest' old_pika = False #channel_act_as_writer casw = False #need_notify_server_info_with_heartbeat nsi = False vhost='/' ################ def is_true(value): if not value: return False if value.isdigit() and int(value) > 0: return True if value == "true" or value == "yes" or value == "on": return True return False def update_options(options): global exchange_name global ex_type global queue_name global r_key global msg global msg_hdr global auto_ack_l global auto_delete_l global count global msg_size global prefetch global allchs global durable_l global persistent_l global multiple_l global rate global thread_num global confirm global quorum global reject_count global cancel_count global msg_send_time global interval global port global priority global sp global pra global sgc global use_ssl global user global password global old_pika global casw global nsi global vhost for opt in options: tmps = opt.split('=') if len(tmps) != 2: print("invalid options:%s" % options) sys.exit(0) if tmps[0] == "exchange": exchange_name = tmps[1] elif tmps[0] == "exchange_type": ex_type = tmps[1] elif tmps[0] == "queue": queue_name = tmps[1] elif tmps[0] == "routing_key": r_key = tmps[1] elif tmps[0] == "msg": msg = tmps[1] elif tmps[0] == "msg_hdr": msg_hdr = tmps[1] elif tmps[0] == "auto_delete": auto_delete_l = is_true(tmps[1]) elif tmps[0] == "auto_ack": auto_ack_l = is_true(tmps[1]) elif tmps[0] == "count": count = int(tmps[1]) elif tmps[0] == "prefetch": prefetch = int(tmps[1]) elif tmps[0] == "all_channels": allchs = is_true(tmps[1]) elif tmps[0] == "durable": durable_l = is_true(tmps[1]) elif tmps[0] == "persistent": persistent_l = is_true(tmps[1]) elif tmps[0] == "msg_size": msg_size = int(tmps[1]) elif tmps[0] == "rate": rate = int(tmps[1]) elif tmps[0] == "multiple": multiple_l = is_true(tmps[1]) elif tmps[0] == "thread_num": thread_num = int(tmps[1]) elif tmps[0] == "confirm": confirm = is_true(tmps[1]) elif tmps[0] == "quorum": quorum = is_true(tmps[1]) elif tmps[0] == "reject_count": reject_count = int(tmps[1]) elif tmps[0] == "cancel_count": cancel_count = int(tmps[1]) elif tmps[0] == "msg_send_time": msg_send_time = is_true(tmps[1]) elif tmps[0] == "interval": interval = int(tmps[1]) elif tmps[0] == "port": port = int(tmps[1]) elif tmps[0] == "priority": priority = int(tmps[1]) elif tmps[0] == "sp": sp = is_true(tmps[1]) elif tmps[0] == "pra": pra = is_true(tmps[1]) elif tmps[0] == "sgc": sgc = is_true(tmps[1]) elif tmps[0] == "use_ssl": use_ssl = is_true(tmps[1]) elif tmps[0] == "user": user = tmps[1] elif tmps[0] == "password": password = tmps[1] elif tmps[0] == "old_pika": old_pika = is_true(tmps[1]) elif tmps[0] == "casw": casw = is_true(tmps[1]) elif tmps[0] == "nsi": nsi = is_true(tmps[1]) elif tmps[0] == "vhost": vhost = tmps[1] def stop11(): print (get_time_stamp() + "to close connection") connection.close() @atexit.register def stop(): global time_start spend_time = int(time.time() - time_start) print (get_time_stamp() + "stopped, spend:%s s" % spend_time) def record_msg_time(n, body): global msg_counts global msg_time_sums global count new_start = count > 1 and msg_counts[n] >= count now = time.time() if not new_start: msg_counts[n] += 1 else: msg_counts[n] = 1 try: if msg_send_time: s_time = float(body[:18]) #print("%s s_time:%s" % (n, s_time)) spend_time = (now - s_time)* 1000000 if not new_start: msg_time_sums[n] += spend_time else: msg_time_sums[n] = spend_time except Exception: pass def get_average_time(n): global msg_counts global msg_time_sums return int(msg_time_sums[n]/msg_counts[n]) def get_channel(n = 0): global hostname global confirm global port global use_ssl global user global password global use_ssl global casw global nsi global vhost client_properties = None if not old_pika and (casw or nsi): client_properties = {} client_properties['capabilities'] = {} if nsi: client_properties['capabilities']['need_notify_server_info_with_heartbeat'] = True if casw: client_properties['capabilities']['channel_act_as_writer'] = True ssl_options = None credentials=pika.credentials.PlainCredentials(user, password) if use_ssl: import ssl ssl_protocol = "TLSv1_2".lower() ssl_version = getattr(ssl,_OPTIONAL_PROTOCOLS[ssl_protocol]) ca_certs = r"/home/yaoxs/ssltest/ssl/client/amqp_ca.pem" keyfile = r"/home/yaoxs/ssltest/ssl/client/amqp_client_key.pem" certfile = r"/home/yaoxs/ssltest/ssl/client/amqp_client.pem" cert_reqs = ssl.CERT_REQUIRED if not old_pika: ssl_context = ssl.SSLContext(ssl_version) ssl_context.verify_mode = cert_reqs ssl_context.check_hostname = False ssl_context.load_verify_locations(ca_certs) ssl_context.load_cert_chain(certfile, keyfile) ssl_options = pika.SSLOptions(context = ssl_context, server_hostname = hostname) else: ssl_options = { "ssl_version":ssl_version, "ca_certs": ca_certs, "keyfile": keyfile, "certfile": certfile, "cert_reqs": cert_reqs } # pdb.set_trace() if old_pika: ConnectionParameters = pika.ConnectionParameters(host=hostname,port=port,virtual_host=vhost, credentials=credentials, ssl=use_ssl, ssl_options=ssl_options) elif not use_ssl: ConnectionParameters = pika.ConnectionParameters(host=hostname,port=port,virtual_host=vhost, credentials=credentials, client_properties = client_properties) else: ConnectionParameters = pika.ConnectionParameters(host=hostname,port=port,virtual_host=vhost, credentials=credentials, ssl_options=ssl_options, client_properties = client_properties) connection = pika.BlockingConnection(ConnectionParameters) channel = connection.channel() if confirm: channel.confirm_delivery() print("[%s]connect to %s ok" % (n,hostname)) return channel def get_time_stamp(): tt = time.time() hms = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(tt)) return "%s.%03d " % (hms, (tt - int(tt)) * 1000) def do(n = 0, channel_i = None): global exchange_name global ex_type global queue_name global r_key global msg global msg_hdr global auto_ack_l global auto_delete_l global count global msg_size global prefetch global allchs global durable_l global persistent_l global multiple_l global rate global thread_num global quorum global reject_count global cancel_count global msg_send_time global interval global priority global sp global sgc global time_start channel = None def callback_print(method, body, ack, ts = 0, tag = None): global pra if pra or (thread_num == 1 and method.delivery_tag < 100) or method.delivery_tag % 100 == 0: #if method.delivery_tag % 100 == 0: if len(body) > 20: print(get_time_stamp() + " [%s] Received %r ack:%s delivery_tag:%s red:%s sleep:%s, average_time:%s tag:%s count:%s" % (n, body[:20], ack, method.delivery_tag, method.redelivered,ts, get_average_time(n), tag, msg_counts[n])) else: print(get_time_stamp() + " [%s] Received %r ack:%s delivery_tag:%s red:%s sleep:%s average_time:%s tag:%s count:%s" % (n, body, ack, method.delivery_tag, method.redelivered,ts, get_average_time(n), tag, msg_counts[n])) def callback(ch, method, properties, body): record_msg_time(n, body) ts = interval if ts > 0: time.sleep(ts) ack = False #print(get_time_stamp() + " [%s] Received %r ack:%s delivery_tag:%s sleep:%s" % (n, body, ack, method.delivery_tag, ts)) callback_print(method, body, ack, ts) def callback_ack(ch, method, properties, body): global cancel_count record_msg_time(n, body) ts = interval if ts > 0: time.sleep(ts) #if method.delivery_tag % 2 == 0: # print(get_time_stamp() + " [%s] Received %r ack:%s delivery_tag:%s sleep:%s ignore ack" % (n, body, True, method.delivery_tag, ts)) # return #if method.delivery_tag % 5 == 0: # print("to sleep 12") # time.sleep(12) ack = False ack = True #print(get_time_stamp() + " [%s] Received %r ack:%s delivery_tag:%s sleep:%s" % (n, body, ack, method.delivery_tag, ts)) if cancel_count > 0 and cancel_count <= method.delivery_tag: ctag = "msg_test_ctag_%s" % n print(get_time_stamp() + 'basic_cancel %s, delivery_tag:%s' % (ctag, method.delivery_tag)) #time.sleep(10) channel.basic_cancel(consumer_tag = ctag) #print(get_time_stamp() + 'basic_cancel %s, sleep 30s for basic_cancel ok' % ctag) #time.sleep(30) cancel_count = 0 print("canceled:%s, start %s consumer, clear cancel_count" % (ctag, n+1)) start_thread_do_again(n+1, channel) #exit(0) if reject_count > 0 and method.delivery_tag % reject_count == 0: #elif reject_count > 0 and method.delivery_tag >= reject_count and \ # method.delivery_tag < reject_count + 3: callback_print(method, body, ack, ts, 'reject') ch.basic_reject(delivery_tag = method.delivery_tag) elif ack: callback_print(method, body, ack, ts) ch.basic_ack(delivery_tag = method.delivery_tag,multiple=multiple_l) if auto_ack_l: real_call_back = callback else: real_call_back = callback_ack print (get_time_stamp() + "start the %s th thread" % (n)) try: if channel is None: channel = get_channel(n) many_a_msg = 'a' * msg_size if queue_name is None: if quorum: queue_name_l="q_q_%s" % (n) else: queue_name_l="q_%s" % (n) else: queue_name_l = queue_name if cmd == "create_queue": cnt1 = int(count/rate) if cnt1 == 0: cnt1 = 1 rate = count ji = 0 for j in range(0,cnt1): for i in range(0, rate): if ji % 100 == 0: print (get_time_stamp() + "%s %s r_key:%s, %d" % (cmd, exchange_name, r_key, ji)) if queue_name is None: if quorum: queue_name_new="q_q_%s" % (ji) else: queue_name_new="q_%s" % (ji) else: queue_name_new = queue_name print (get_time_stamp() + "%s %s" % (cmd, queue_name_new)) arguments = None if quorum: arguments = {} arguments['x-queue-type'] = 'quorum' if sp: if arguments is None: arguments = {} arguments['x-speedy_channel'] = sp if sgc: if arguments is None: arguments = {} arguments['x-single-active-consumer'] = sgc #pdb.set_trace() channel.queue_declare(queue=queue_name_new, durable = durable_l, arguments = arguments) ji += 1 time.sleep(1) elif cmd == "delete_queue": cnt1 = int(count/rate) if cnt1 == 0: cnt1 = 1 rate = count ji = 0 for j in range(0,cnt1): for i in range(0, rate): if ji % 100 == 0: print (get_time_stamp() + "%s %s r_key:%s, %d" % (cmd, exchange_name, r_key, ji)) if queue_name is None: if quorum: queue_name_new="q_q_%s" % (ji) else: queue_name_new="q_%s" % (ji) else: queue_name_new = queue_name print (get_time_stamp() + "%s %s" % (cmd, queue_name_new)) arguments = None if quorum: arguments = {} arguments['x-queue-type'] = 'quorum' if sp: if arguments is None: arguments = {} arguments['x-speedy_channel'] = sp if sgc: if arguments is None: arguments = {} arguments['x-single-active-consumer'] = sgc #pdb.set_trace() channel.queue_delete(queue=queue_name_new) ji += 1 time.sleep(1) elif cmd == "create_exchange" and exchange_name and ex_type: print (get_time_stamp() + "%s %s %s" % (cmd, exchange_name, ex_type)) #pdb.set_trace() channel.exchange_declare(exchange=exchange_name, exchange_type=ex_type, auto_delete=auto_delete_l,durable = durable_l) elif cmd == "delete_exchange" and exchange_name: print (get_time_stamp() + "%s %s" % (cmd, exchange_name)) channel.exchange_delete(exchange=exchange_name) elif cmd == "bind" and exchange_name: cnt1 = int(count/rate) if cnt1 == 0: cnt1 = 1 rate = count ji = 0 for j in range(0,cnt1): for i in range(0, rate): if ji % 100 == 0: print (get_time_stamp() + "%s %s r_key:%s, %d" % (cmd, exchange_name, r_key, ji)) if queue_name is None: if quorum: queue_name_new="q_q_%s" % (ji) else: queue_name_new="q_%s" % (ji) else: queue_name_new = queue_name r_key_l = r_key if not r_key: if quorum: r_key_l = "q_q_%s" % (ji) else: r_key_l = "q_%s" % (ji) print (get_time_stamp() + "%s %s %s r_key:%s" % (cmd, exchange_name, queue_name_new, r_key_l)) channel.queue_bind(exchange=exchange_name, queue=queue_name_new, routing_key=r_key_l) ji += 1 time.sleep(1) #elif cmd == "publish" and exchange_name is not None and msg: elif cmd == "publish": if thread_num > 1: fst_sleep = random.randint(1,thread_num)/thread_num print("fst_sleep:%s" % (fst_sleep)) time.sleep(fst_sleep) properties_l=None if persistent_l: properties_l=pika.BasicProperties( delivery_mode = 2, # make message persistent ) cnt1 = int(count/rate) if cnt1 == 0: cnt1 = 1 rate = count ji = 0 r_key_l = r_key if time_start == 0: time_start = time.time() if not r_key: if quorum: r_key_l = "q_q_%s" % n else: r_key_l = "q_%s" % n for j in range(0,cnt1): for i in range(0, rate): #if ji % 100 == 0: #print get_time_stamp() + "%s %s r_key_l:%s, %d" % (cmd, exchange_name, r_key_l, ji) ji += 1 if msg_send_time: real_msg = "%s;%s" % (time.time(), many_a_msg) elif msg == "many_a": real_msg = many_a_msg elif msg is None: real_msg = "_unique_id:%s-%s" % (msg_hdr, ji) else: real_msg = msg #print get_time_stamp() + "%s %s r_key_l:%s, %d rate:%d" % (cmd, exchange_name, r_key_l, ji, rate) channel.basic_publish(exchange=exchange_name, routing_key=r_key_l, body=real_msg, properties=properties_l) print (get_time_stamp() + "%s %s r_key_l:%s, %d" % (cmd, exchange_name, r_key_l, ji)) if interval > 0: time.sleep(interval) else: time.sleep(1) print (get_time_stamp() + "send all %d msg, cnt1:%d, rate:%d" % (count, cnt1, rate)) elif cmd == "consume": if prefetch > 0: #pdb.set_trace() if old_pika: channel.basic_qos(prefetch_count=prefetch,all_channels=allchs) else: channel.basic_qos(prefetch_count=prefetch, global_qos = allchs) arguments = None if priority: arguments = {} arguments['x-priority'] = priority ctag = "msg_test_ctag_%s" % n #ctag = ''.join(random.sample(string.ascii_letters + string.digits, 8)) if old_pika: channel.basic_consume(real_call_back, queue_name_l, no_ack = auto_ack_l,consumer_tag = ctag, arguments = arguments) else: channel.basic_consume(queue_name_l, real_call_back, auto_ack = auto_ack_l, consumer_tag = ctag, arguments = arguments) print(get_time_stamp() + ' [*] Waiting for messages from %s. To exit press CTRL+C' % queue_name_l) channel.start_consuming() print(get_time_stamp() + '!!!quit consuing') print(get_time_stamp() + 'to exit') else: usage() except Exception as exc: print (get_time_stamp() + "%s" % str(exc)) #pass raise def usage(): print ("to add sth") def print_msg_latency(): global msg_counts global msg_time_sums global thread_num global auto_ack_l global confirm global msg_size global count msg_sum = 0 msg_time_sum = 0 for i in range(thread_num): msg_sum += msg_counts[i] msg_time_sum += msg_time_sums[i] if msg_time_sum == 0: print (get_time_stamp() + "no latency data, {%s %s %s}" % (confirm, not auto_ack_l, msg_size)) else: print (get_time_stamp() + "%s msgs, {%s %s %s}, average latency: %s us, " % (msg_sum, confirm, not auto_ack_l, msg_size, int(msg_time_sum/msg_sum))) def task_print_msg_latency(dump): while True: time.sleep(20) print_msg_latency() def start_thread_do_again(n, channel_i = None): t = threading.Thread(target=do, args=(n,channel_i,)) t.start() def main(): global cmd global hostname global msg_counts global msg_time_sums #pdb.set_trace() sys.argv.pop(0) if len(sys.argv) < 3: usage() sys.exit(0) cmd = sys.argv.pop(0) hostname = sys.argv.pop(0) options = sys.argv.pop(0).split(';') #print options update_options(options) msg_counts = [0] * (thread_num + 1) msg_time_sums = [0] * (thread_num + 1) if (cmd == "consume" or cmd == "publish") and (thread_num > 1): thread_s = [] for i in range(thread_num): t = threading.Thread(target=do, args=(i,)) thread_s.append(t) for i in range(thread_num): thread_s[i].start() if cmd == "consume": t = threading.Thread(target=task_print_msg_latency, args=(0,)) t.start() else: do() if __name__ == '__main__': main() cnt = 0 Sum = 120 Sum = -1 while cnt <= Sum: cnt += 1 print ("has sleeped %d s" % (cnt)) time.sleep(1) #pass