Added multi-line real-time output for connection rate testing.

This commit is contained in:
Joe Testa 2024-04-22 13:56:50 -04:00
parent 3c459f1428
commit 986f83653d

View File

@ -329,6 +329,7 @@ class DHEat:
# If the user passed --conn-rate-test, then we'll perform an interactive rate test against the target.
interactive = False
multiline_output = False
if aconf.conn_rate_test_enabled:
interactive = True
max_connections = 999999999999999999
@ -338,6 +339,11 @@ class DHEat:
DHEat.WHITEB = "\033[1;97m"
DHEat.BLUEB = "\033[1;94m"
# Enable multi-line output only if we're running in the Bash shell. This might work in other shells, too, but they are untested.
shell = os.getenv("SHELL", default="")
if shell.endswith("/bash") or shell == "bash":
multiline_output = True
rate_str = ""
if aconf.conn_rate_test_target_rate > 0:
rate_str = " at a max rate of %s%u%s connections per second" % (DHEat.WHITEB, aconf.conn_rate_test_target_rate, DHEat.CLEAR)
@ -346,6 +352,10 @@ class DHEat:
print("Performing non-disruptive rate test against %s[%s]:%u%s with %s%u%s concurrent sockets%s. No Diffie-Hellman requests will be sent." % (DHEat.WHITEB, aconf.host, aconf.port, DHEat.CLEAR, DHEat.WHITEB, concurrent_sockets, DHEat.CLEAR, rate_str))
print()
# Make room for the multi-line output.
if multiline_output:
print("\n\n\n\n")
else: # We'll do a non-interactive test as part of a standard audit.
# Ensure that the server supports at least one DH algorithm. Otherwise, this test is pointless.
server_dh_kex = []
@ -361,6 +371,7 @@ class DHEat:
num_attempted_connections = 0
num_opened_connections = 0
num_exceeded_maxstartups = 0
socket_dict: Dict[socket.socket, float] = {}
start_timer = time.time()
now = start_timer
@ -378,7 +389,14 @@ class DHEat:
if interactive:
if (now - last_update) >= 1.0:
seconds_running = now - start_timer
print("%s%s%s Run time: %s%.1f%s; TCP SYNs: %s%u%s; Compl. conns: %s%u%s; TCP SYNs/sec: %s%.1f%s; Compl. conns/sec: %s%.1f%s \r" % (DHEat.WHITEB, spinner[spinner_index], DHEat.CLEAR, DHEat.WHITEB, seconds_running, DHEat.CLEAR, DHEat.WHITEB, num_attempted_connections, DHEat.CLEAR, DHEat.WHITEB, num_opened_connections, DHEat.CLEAR, DHEat.BLUEB, num_attempted_connections / seconds_running, DHEat.CLEAR, DHEat.BLUEB, num_opened_connections / seconds_running, DHEat.CLEAR), end="")
if multiline_output:
print("\033[5ARun time: %s%.1f%s seconds" % (DHEat.WHITEB, seconds_running, DHEat.CLEAR))
print("TCP SYNs: %s%u%s (total); %s%.1f%s (per second)" % (DHEat.WHITEB, num_attempted_connections, DHEat.CLEAR, DHEat.BLUEB, num_attempted_connections / seconds_running, DHEat.CLEAR))
print("Completed connections: %s%u%s (total); %s%.1f%s (per second)" % (DHEat.WHITEB, num_opened_connections, DHEat.CLEAR, DHEat.BLUEB, num_opened_connections / seconds_running, DHEat.CLEAR))
print("\"Exceeded MaxStartups\" responses: %s%u%s (total); %s%.1f%s (per second)" % (DHEat.WHITEB, num_exceeded_maxstartups, DHEat.CLEAR, DHEat.BLUEB, num_exceeded_maxstartups / seconds_running, DHEat.CLEAR))
print("%s%s%s" % (DHEat.WHITEB, spinner[spinner_index], DHEat.CLEAR))
else:
print("%s%s%s Run time: %s%.1f%s; TCP SYNs: %s%u%s; Compl. conns: %s%u%s; TCP SYNs/sec: %s%.1f%s; Compl. conns/sec: %s%.1f%s \r" % (DHEat.WHITEB, spinner[spinner_index], DHEat.CLEAR, DHEat.WHITEB, seconds_running, DHEat.CLEAR, DHEat.WHITEB, num_attempted_connections, DHEat.CLEAR, DHEat.WHITEB, num_opened_connections, DHEat.CLEAR, DHEat.BLUEB, num_attempted_connections / seconds_running, DHEat.CLEAR, DHEat.BLUEB, num_opened_connections / seconds_running, DHEat.CLEAR), end="")
last_update = now
spinner_index = (spinner_index + 1) % 4
@ -438,7 +456,10 @@ class DHEat:
# If we received the SSH header, we'll count this as an opened connection.
if buf.startswith(b"SSH-"):
num_opened_connections += 1
out.d("Number of opened connections: %u (max: %u)." % (num_opened_connections, max_connections))
# out.d("Number of opened connections: %u (max: %u)." % (num_opened_connections, max_connections))
elif buf == b"Exceeded":
num_exceeded_maxstartups += 1
# out.d("Number of \"Exceeded MaxStartups\": %u" % num_exceeded_maxstartups)
_close_socket(socket_dict, s)