From 13d15baa2a025efe6527c5f3db77ea9c6909c1f1 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Mon, 1 Feb 2021 13:10:06 -0500 Subject: [PATCH] Added multi-threaded scanning support. --- README.md | 3 + src/ssh_audit/auditconf.py | 6 + src/ssh_audit/output.py | 87 ------ src/ssh_audit/outputbuffer.py | 157 +++++++++-- src/ssh_audit/ssh_audit.py | 266 +++++++++++------- src/ssh_audit/ssh_socket.py | 8 +- src/ssh_audit/utils.py | 4 +- ssh-audit.1 | 7 +- .../dropbear_2019.78_test1.json | 2 +- .../expected_results/openssh_4.0p1_test1.json | 2 +- .../expected_results/openssh_5.6p1_test1.json | 2 +- .../expected_results/openssh_5.6p1_test2.json | 2 +- .../expected_results/openssh_5.6p1_test3.json | 2 +- .../expected_results/openssh_5.6p1_test4.json | 2 +- .../expected_results/openssh_5.6p1_test5.json | 2 +- .../expected_results/openssh_8.0p1_test1.json | 2 +- .../expected_results/openssh_8.0p1_test2.json | 2 +- .../expected_results/openssh_8.0p1_test3.json | 2 +- .../tinyssh_20190101_test1.json | 2 +- test/test_auditconf.py | 3 +- test/test_build_struct.py | 2 +- test/test_errors.py | 14 +- test/{test_output.py => test_outputbuffer.py} | 107 +++---- test/test_ssh1.py | 22 +- test/test_ssh2.py | 14 +- 25 files changed, 442 insertions(+), 280 deletions(-) delete mode 100644 src/ssh_audit/output.py rename test/{test_output.py => test_outputbuffer.py} (67%) diff --git a/README.md b/README.md index 630c592..4986903 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ - historical information from OpenSSH, Dropbear SSH and libssh; - policy scans to ensure adherence to a hardened/standard configuration; - runs on Linux and Windows; +- supports Python 3.6 - 3.9; - no dependencies ## Usage @@ -158,9 +159,11 @@ For convenience, a web front-end on top of the command-line tool is available at ## ChangeLog ### v2.4.0-dev (???) + - Added multi-threaded scanning support. - Added version check for OpenSSH user enumeration (CVE-2018-15473). - Fixed crash when receiving unexpected response during host key test. - Fixed hang against older Cisco devices during host key test & gex test. + - Fixed improper termination while scanning multiple targets when one target returns an error. - Dropped support for Python 3.5 (which reached EOL in Sept. 2020). ### v2.3.1 (2020-10-28) diff --git a/src/ssh_audit/auditconf.py b/src/ssh_audit/auditconf.py index f55141c..599f8c2 100644 --- a/src/ssh_audit/auditconf.py +++ b/src/ssh_audit/auditconf.py @@ -53,6 +53,7 @@ class AuditConf: self.timeout_set = False # Set to True when the user explicitly sets it. self.target_file: Optional[str] = None self.target_list: List[str] = [] + self.threads = 32 self.list_policies = False self.lookup = '' @@ -98,6 +99,11 @@ class AuditConf: valid = True elif name in ['policy_file', 'policy', 'target_file', 'target_list', 'lookup']: valid = True + elif name == "threads": + valid, num_threads = True, Utils.parse_int(value) + if num_threads < 1: + raise ValueError('invalid number of threads: {}'.format(value)) + value = num_threads if valid: object.__setattr__(self, name, value) diff --git a/src/ssh_audit/output.py b/src/ssh_audit/output.py deleted file mode 100644 index 15e0d03..0000000 --- a/src/ssh_audit/output.py +++ /dev/null @@ -1,87 +0,0 @@ -""" - The MIT License (MIT) - - Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -""" -import os -import sys - -# pylint: disable=unused-import -from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 -from typing import Callable, Optional, Union, Any # noqa: F401 - -from ssh_audit.utils import Utils - - -class Output: - LEVELS: Sequence[str] = ('info', 'warn', 'fail') - COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} - - # Use brighter colors on Windows for better readability. - if Utils.is_windows(): - COLORS = {'head': 96, 'good': 92, 'warn': 93, 'fail': 91} - - def __init__(self) -> None: - self.batch = False - self.verbose = False - self.use_colors = True - self.json = False - self.__level = 0 - self.__colsupport = 'colorama' in sys.modules or os.name == 'posix' - - @property - def level(self) -> str: - if self.__level < len(self.LEVELS): - return self.LEVELS[self.__level] - return 'unknown' - - @level.setter - def level(self, name: str) -> None: - self.__level = self.get_level(name) - - def get_level(self, name: str) -> int: - cname = 'info' if name == 'good' else name - if cname not in self.LEVELS: - return sys.maxsize - return self.LEVELS.index(cname) - - def sep(self) -> None: - if not self.batch: - print() - - @property - def colors_supported(self) -> bool: - return self.__colsupport - - @staticmethod - def _colorized(color: str) -> Callable[[str], None]: - return lambda x: print(u'{}{}\033[0m'.format(color, x)) - - def __getattr__(self, name: str) -> Callable[[str], None]: - if name == 'head' and self.batch: - return lambda x: None - if not self.get_level(name) >= self.__level: - return lambda x: None - if self.use_colors and self.colors_supported and name in self.COLORS: - color = '\033[0;{}m'.format(self.COLORS[name]) - return self._colorized(color) - else: - return lambda x: print(u'{}'.format(x)) diff --git a/src/ssh_audit/outputbuffer.py b/src/ssh_audit/outputbuffer.py index 7276754..f82223c 100644 --- a/src/ssh_audit/outputbuffer.py +++ b/src/ssh_audit/outputbuffer.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2021 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -22,29 +22,154 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -import io +import os import sys # pylint: disable=unused-import from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 from typing import Callable, Optional, Union, Any # noqa: F401 +from ssh_audit.utils import Utils -class OutputBuffer(List[str]): + +class OutputBuffer: + LEVELS: Sequence[str] = ('info', 'warn', 'fail') + COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} + + # Use brighter colors on Windows for better readability. + if Utils.is_windows(): + COLORS = {'head': 96, 'good': 92, 'warn': 93, 'fail': 91} + + def __init__(self, buffer_output: bool = True) -> None: + self.buffer_output = buffer_output + self.buffer: List[str] = [] + self.in_section = False + self.section: List[str] = [] + self.batch = False + self.verbose = False + self.use_colors = True + self.json = False + self.__level = 0 + self.__is_color_supported = ('colorama' in sys.modules) or (os.name == 'posix') + self.line_ended = True + + def _print(self, level: str, s: str = '', line_ended: bool = True) -> None: + '''Saves output to buffer (if in buffered mode), or immediately prints to stdout otherwise.''' + + # If we're logging only 'warn' or above, and this is an 'info', ignore message. + if self.get_level(level) < self.__level: + return + + if self.use_colors and self.colors_supported and len(s) > 0 and level != 'info': + s = "\033[0;%dm%s\033[0m" % (self.COLORS[level], s) + + if self.buffer_output: + # Select which list to add to. If we are in a 'with' statement, then this goes in the section buffer, otherwise the general buffer. + buf = self.section if self.in_section else self.buffer + + # Determine if a new line should be added, or if the last line should be appended. + if not self.line_ended: + last_entry = -1 if len(buf) > 0 else 0 + buf[last_entry] = buf[last_entry] + s + else: + buf.append(s) + + # When False, this tells the next call to append to the last line we just added. + self.line_ended = line_ended + else: + print(s) + + def get_buffer(self) -> str: + '''Returns all buffered output, then clears the buffer.''' + self.flush_section() + + buffer_str = "\n".join(self.buffer) + self.buffer = [] + return buffer_str + + def write(self) -> None: + '''Writes the output to stdout.''' + self.flush_section() + print(self.get_buffer(), flush=True) + + def reset(self) -> None: + self.flush_section() + self.get_buffer() + + @property + def level(self) -> str: + '''Returns the minimum level for output.''' + if self.__level < len(self.LEVELS): + return self.LEVELS[self.__level] + return 'unknown' + + @level.setter + def level(self, name: str) -> None: + '''Sets the minimum level for output (one of: 'info', 'warn', 'fail').''' + self.__level = self.get_level(name) + + def get_level(self, name: str) -> int: + cname = 'info' if name == 'good' else name + if cname not in self.LEVELS: + return sys.maxsize + return self.LEVELS.index(cname) + + @property + def colors_supported(self) -> bool: + '''Returns True if the system supports color output.''' + return self.__is_color_supported + + # When used in a 'with' block, the output to goes into a section; this can be sorted separately when add_section_to_buffer() is later called. def __enter__(self) -> 'OutputBuffer': - # pylint: disable=attribute-defined-outside-init - self.__buf = io.StringIO() - self.__stdout = sys.stdout - sys.stdout = self.__buf + self.in_section = True return self - def flush(self, sort_lines: bool = False) -> None: - # Lines must be sorted in some cases to ensure consistent testing. - if sort_lines: - self.sort() # pylint: disable=no-member - for line in self: # pylint: disable=not-an-iterable - print(line) - def __exit__(self, *args: Any) -> None: - self.extend(self.__buf.getvalue().splitlines()) # pylint: disable=no-member - sys.stdout = self.__stdout + self.in_section = False + + def flush_section(self, sort_section: bool = False) -> None: + '''Appends section output (optionally sorting it first) to the end of the buffer, then clears the section output.''' + if sort_section: + self.section.sort() + + self.buffer.extend(self.section) + self.section = [] + + def is_section_empty(self) -> bool: + '''Returns True if the section buffer is empty, otherwise False.''' + return len(self.section) == 0 + + def head(self, s: str, line_ended: bool = True) -> 'OutputBuffer': + if not self.batch: + self._print('head', s, line_ended) + return self + + def fail(self, s: str, line_ended: bool = True) -> 'OutputBuffer': + self._print('fail', s, line_ended) + return self + + def warn(self, s: str, line_ended: bool = True) -> 'OutputBuffer': + self._print('warn', s, line_ended) + return self + + def info(self, s: str, line_ended: bool = True) -> 'OutputBuffer': + self._print('info', s, line_ended) + return self + + def good(self, s: str, line_ended: bool = True) -> 'OutputBuffer': + self._print('good', s, line_ended) + return self + + def sep(self) -> 'OutputBuffer': + if not self.batch: + self._print('info') + return self + + def v(self, s: str, write_now: bool = False) -> 'OutputBuffer': + '''Prints a message if verbose output is enabled.''' + if self.verbose: + self.info(s) + if write_now: + self.write() + + return self diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 7bc77d6..9dc04c0 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -2,7 +2,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -23,6 +23,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import concurrent.futures +import copy import getopt import json import os @@ -42,7 +44,6 @@ from ssh_audit import exitcodes from ssh_audit.fingerprint import Fingerprint from ssh_audit.gextest import GEXTest from ssh_audit.hostkeytest import HostKeyTest -from ssh_audit.output import Output from ssh_audit.outputbuffer import OutputBuffer from ssh_audit.policy import Policy from ssh_audit.product import Product @@ -66,7 +67,7 @@ except ImportError: # pragma: nocover def usage(err: Optional[str] = None) -> None: retval = exitcodes.GOOD - uout = Output() + uout = OutputBuffer() p = os.path.basename(sys.argv[0]) uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) if err is not None and len(err) > 0: @@ -89,25 +90,27 @@ def usage(err: Optional[str] = None) -> None: uout.info(' -p, --port= port to connect') uout.info(' -P, --policy= run a policy test using the specified policy') uout.info(' -t, --timeout= timeout (in seconds) for connection and reading\n (default: 5)') - uout.info(' -T, --targets= a file containing a list of target hosts (one\n per line, format HOST[:PORT])') + uout.info(' -T, --targets= a file containing a list of target hosts (one\n per line, format HOST[:PORT]). Use --threads\n to control concurrent scans.') + uout.info(' --threads= number of threads to use when scanning multiple\n targets (-T/--targets) (default: 32)') uout.info(' -v, --verbose verbose output') uout.sep() + uout.write() sys.exit(retval) -def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments - with OutputBuffer() as obuf: +def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments + with out: for algorithm in algorithms: - program_retval = output_algorithm(alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes) - if len(obuf) > 0 and not is_json_output: + program_retval = output_algorithm(out, alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes) + if not out.is_section_empty() and not is_json_output: out.head('# ' + title) - obuf.flush() + out.flush_section() out.sep() return program_retval -def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: +def output_algorithm(out: OutputBuffer, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: prefix = '(' + alg_type + ') ' if alg_max_len == 0: alg_max_len = len(alg_name) @@ -175,7 +178,7 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al return program_retval -def output_compatibility(algs: Algorithms, client_audit: bool, for_server: bool = True) -> None: +def output_compatibility(out: OutputBuffer, algs: Algorithms, client_audit: bool, for_server: bool = True) -> None: # Don't output any compatibility info if we're doing a client audit. if client_audit: @@ -205,7 +208,7 @@ def output_compatibility(algs: Algorithms, client_audit: bool, for_server: bool out.good('(gen) compatibility: ' + ', '.join(comp_text)) -def output_security_sub(sub: str, software: Optional[Software], client_audit: bool, padlen: int) -> None: +def output_security_sub(out: OutputBuffer, sub: str, software: Optional[Software], client_audit: bool, padlen: int) -> None: secdb = VersionVulnerabilityDB.CVE if sub == 'cve' else VersionVulnerabilityDB.TXT if software is None or software.product not in secdb: return @@ -241,20 +244,20 @@ def output_security_sub(sub: str, software: Optional[Software], client_audit: bo out.fail('(sec) {}{} -- {}'.format(name, p, descr)) -def output_security(banner: Optional[Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None: - with OutputBuffer() as obuf: +def output_security(out: OutputBuffer, banner: Optional[Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None: + with out: if banner is not None: software = Software.parse(banner) - output_security_sub('cve', software, client_audit, padlen) - output_security_sub('txt', software, client_audit, padlen) - if len(obuf) > 0 and not is_json_output: + output_security_sub(out, 'cve', software, client_audit, padlen) + output_security_sub(out, 'txt', software, client_audit, padlen) + if not out.is_section_empty() and not is_json_output: out.head('# security') - obuf.flush() + out.flush_section() out.sep() -def output_fingerprints(algs: Algorithms, is_json_output: bool, sha256: bool = True) -> None: - with OutputBuffer() as obuf: +def output_fingerprints(out: OutputBuffer, algs: Algorithms, is_json_output: bool, sha256: bool = True) -> None: + with out: fps = [] if algs.ssh1kex is not None: name = 'ssh-rsa1' @@ -284,14 +287,14 @@ def output_fingerprints(algs: Algorithms, is_json_output: bool, sha256: bool = T # p = '' if out.batch else ' ' * (padlen - len(name)) # out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) out.good('(fin) {}: {}'.format(name, fpo)) - if len(obuf) > 0 and not is_json_output: + if not out.is_section_empty() and not is_json_output: out.head('# fingerprints') - obuf.flush() + out.flush_section() out.sep() # Returns True if no warnings or failures encountered in configuration. -def output_recommendations(algs: Algorithms, software: Optional[Software], is_json_output: bool, padlen: int = 0) -> bool: +def output_recommendations(out: OutputBuffer, algs: Algorithms, software: Optional[Software], is_json_output: bool, padlen: int = 0) -> bool: ret = True # PuTTY's algorithms cannot be modified, so there's no point in issuing recommendations. @@ -323,7 +326,7 @@ def output_recommendations(algs: Algorithms, software: Optional[Software], is_js return ret for_server = True - with OutputBuffer() as obuf: + with out: software, alg_rec = algs.get_recommendations(software, for_server) for sshv in range(2, 0, -1): if sshv not in alg_rec: @@ -351,20 +354,20 @@ def output_recommendations(algs: Algorithms, software: Optional[Software], is_js b = '(SSH{})'.format(sshv) if sshv == 1 else '' fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}' fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b)) - if len(obuf) > 0 and not is_json_output: + if not out.is_section_empty() and not is_json_output: if software is not None: title = '(for {})'.format(software.display(False)) else: title = '' out.head('# algorithm recommendations {}'.format(title)) - obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing). + out.flush_section(sort_section=True) # Sort the output so that it is always stable (needed for repeatable testing). out.sep() return ret # Output additional information & notes. -def output_info(software: Optional['Software'], client_audit: bool, any_problems: bool, is_json_output: bool) -> None: - with OutputBuffer() as obuf: +def output_info(out: OutputBuffer, software: Optional['Software'], client_audit: bool, any_problems: bool, is_json_output: bool) -> None: + with out: # Tell user that PuTTY cannot be hardened at the protocol-level. if client_audit and (software is not None) and (software.product == Product.PuTTY): out.warn('(nfo) PuTTY does not have the option of restricting any algorithms during the SSH handshake.') @@ -373,20 +376,20 @@ def output_info(software: Optional['Software'], client_audit: bool, any_problems if any_problems: out.warn('(nfo) For hardening guides on common OSes, please see: ') - if len(obuf) > 0 and not is_json_output: + if not out.is_section_empty() and not is_json_output: out.head('# additional info') - obuf.flush() + out.flush_section() out.sep() # Returns a exitcodes.* flag to denote if any failures or warnings were encountered. -def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False) -> int: +def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False) -> int: program_retval = exitcodes.GOOD client_audit = client_host is not None # If set, this is a client audit. sshv = 1 if pkm is not None else 2 algs = Algorithms(pkm, kex) - with OutputBuffer() as obuf: + with out: if print_target: host = aconf.host @@ -416,7 +419,7 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client out.good('(gen) software: {}'.format(software)) else: software = None - output_compatibility(algs, client_audit) + output_compatibility(out, algs, client_audit) if kex is not None: compressions = [x for x in kex.server.compression if x != 'none'] if len(compressions) > 0: @@ -424,12 +427,12 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client else: cmptxt = 'disabled' out.good('(gen) compression: {}'.format(cmptxt)) - if len(obuf) > 0 and not aconf.json: # Print output when it exists and JSON output isn't requested. + if not out.is_section_empty() and not aconf.json: # Print output when it exists and JSON output isn't requested. out.head('# general') - obuf.flush() + out.flush_section() out.sep() maxlen = algs.maxlen + 1 - output_security(banner, client_audit, maxlen, aconf.json) + output_security(out, banner, client_audit, maxlen, aconf.json) # Filled in by output_algorithms() with unidentified algs. unknown_algorithms: List[str] = [] if pkm is not None: @@ -437,34 +440,36 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client ciphers = pkm.supported_ciphers auths = pkm.supported_authentications title, atype = 'SSH1 host-key algorithms', 'key' - program_retval = output_algorithms(title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen) + program_retval = output_algorithms(out, title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen) title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc' - program_retval = output_algorithms(title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen) + program_retval = output_algorithms(out, title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen) title, atype = 'SSH1 authentication types', 'aut' - program_retval = output_algorithms(title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen) + program_retval = output_algorithms(out, title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen) if kex is not None: adb = SSH2_KexDB.ALGORITHMS title, atype = 'key exchange algorithms', 'kex' - program_retval = output_algorithms(title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes()) + program_retval = output_algorithms(out, title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes()) title, atype = 'host-key algorithms', 'key' - program_retval = output_algorithms(title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes()) + program_retval = output_algorithms(out, title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes()) title, atype = 'encryption algorithms (ciphers)', 'enc' - program_retval = output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen) + program_retval = output_algorithms(out, title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen) title, atype = 'message authentication code algorithms', 'mac' - program_retval = output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen) - output_fingerprints(algs, aconf.json, True) - perfect_config = output_recommendations(algs, software, aconf.json, maxlen) - output_info(software, client_audit, not perfect_config, aconf.json) + program_retval = output_algorithms(out, title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen) + output_fingerprints(out, algs, aconf.json, True) + perfect_config = output_recommendations(out, algs, software, aconf.json, maxlen) + output_info(out, software, client_audit, not perfect_config, aconf.json) if aconf.json: - print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True), end='' if len(aconf.target_list) > 0 else "\n") # Print the JSON of the audit info. Skip the newline at the end if multiple targets were given (since each audit dump will go into its own list entry). + out.reset() + # Build & write the JSON struct. + out.info(json.dumps(build_struct(aconf.host, banner, kex=kex, client_host=client_host), sort_keys=True)) elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them. out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at .\n" % ','.join(unknown_algorithms)) return program_retval -def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: Optional[str], kex: Optional['SSH2_Kex'] = None) -> bool: +def evaluate_policy(out: OutputBuffer, aconf: AuditConf, banner: Optional['Banner'], client_host: Optional[str], kex: Optional['SSH2_Kex'] = None) -> bool: if aconf.policy is None: raise RuntimeError('Internal error: cannot evaluate against null Policy!') @@ -472,11 +477,11 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O passed, error_struct, error_str = aconf.policy.evaluate(banner, kex) if aconf.json: json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': error_struct} - print(json.dumps(json_struct, sort_keys=True)) + out.info(json.dumps(json_struct, sort_keys=True)) else: spacing = '' if aconf.client_audit: - print("Client IP: %s" % client_host) + out.info("Client IP: %s" % client_host) spacing = " " # So the fields below line up with 'Client IP: '. else: host = aconf.host @@ -487,9 +492,9 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O else: host = '%s:%d' % (aconf.host, aconf.port) - print("Host: %s" % host) - print("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version())) - print("Result: %s" % spacing, end='') + out.info("Host: %s" % host) + out.info("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version())) + out.info("Result: %s" % spacing, line_ended=False) # Use these nice unicode characters in the result message, unless we're on Windows (the cmd.exe terminal doesn't display them properly). icon_good = "✔ " @@ -507,23 +512,25 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O return passed -def list_policies() -> None: +def list_policies(out: OutputBuffer) -> None: '''Prints a list of server & client policies.''' server_policy_names, client_policy_names = Policy.list_builtin_policies() if len(server_policy_names) > 0: out.head('\nServer policies:\n') - print(" * \"%s\"" % "\"\n * \"".join(server_policy_names)) + out.info(" * \"%s\"" % "\"\n * \"".join(server_policy_names)) if len(client_policy_names) > 0: out.head('\nClient policies:\n') - print(" * \"%s\"" % "\"\n * \"".join(client_policy_names)) + out.info(" * \"%s\"" % "\"\n * \"".join(client_policy_names)) + out.sep() if len(server_policy_names) == 0 and len(client_policy_names) == 0: - print("Error: no built-in policies found!") + out.fail("Error: no built-in policies found!") else: - print("\nHint: Use -P and provide the full name of a policy to run a policy scan with.\n") + out.info("\nHint: Use -P and provide the full name of a policy to run a policy scan with.\n") + out.write() def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH2_Kex'], client_host: Optional[str]) -> None: @@ -552,12 +559,12 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH print("Error: file already exists: %s" % aconf.policy_file) -def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements +def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements # pylint: disable=too-many-branches aconf = AuditConf() try: sopts = 'h1246M:p:P:jbcnvl:t:T:L' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup='] + lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads='] opts, args = getopt.gnu_getopt(args, sopts, lopts) except getopt.GetoptError as err: usage_cb(str(err)) @@ -589,6 +596,7 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi aconf.json = True elif o in ('-v', '--verbose'): aconf.verbose = True + out.verbose = True elif o in ('-l', '--level'): if a not in ('info', 'warn', 'fail'): usage_cb('level {} is not valid'.format(a)) @@ -603,6 +611,8 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi aconf.policy_file = a elif o in ('-T', '--targets'): aconf.target_file = a + elif o == '--threads': + aconf.threads = int(a) elif o in ('-L', '--list-policies'): aconf.list_policies = True elif o == '--lookup': @@ -615,7 +625,7 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi return aconf if aconf.list_policies: - list_policies() + list_policies(out) sys.exit(exitcodes.GOOD) if aconf.client_audit is False and aconf.target_file is None: @@ -659,23 +669,26 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi try: aconf.policy = Policy(policy_file=aconf.policy_file) except Exception as e: - print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc())) + out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc())) + out.write() sys.exit(exitcodes.UNKNOWN_ERROR) # If the user wants to do a client audit, but provided a server policy, terminate. if aconf.client_audit and aconf.policy.is_server_policy(): - print("Error: client audit selected, but server policy provided.") + out.fail("Error: client audit selected, but server policy provided.") + out.write() sys.exit(exitcodes.UNKNOWN_ERROR) # If the user wants to do a server audit, but provided a client policy, terminate. if aconf.client_audit is False and aconf.policy.is_server_policy() is False: - print("Error: server audit selected, but client policy provided.") + out.fail("Error: server audit selected, but client policy provided.") + out.write() sys.exit(exitcodes.UNKNOWN_ERROR) return aconf -def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: +def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: banner_str = '' banner_protocol = None @@ -695,8 +708,13 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p "comments": banner_comments, }, } + + # If we're scanning a client host, put the client's IP into the results. Otherwise, include the target host. if client_host is not None: res['client_ip'] = client_host + else: + res['target'] = target_host + if kex is not None: res['compression'] = kex.server.compression @@ -773,7 +791,7 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p # Returns one of the exitcodes.* flags. -def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: +def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: program_retval = exitcodes.GOOD out.batch = aconf.batch out.verbose = aconf.verbose @@ -781,12 +799,20 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal out.use_colors = aconf.colors s = SSH_Socket(aconf.host, aconf.port, aconf.ipvo, aconf.timeout, aconf.timeout_set) if aconf.client_audit: + out.v("Listening for client connection on port %d..." % aconf.port, write_now=True) s.listen_and_accept() else: + out.v("Connecting to %s:%d..." % ('[%s]' % aconf.host if Utils.is_ipv6_address(aconf.host) else aconf.host, aconf.port), write_now=True) err = s.connect() if err is not None: out.fail(err) - sys.exit(exitcodes.CONNECTION_ERROR) + + # If we're running against multiple targets, return a connection error to the calling worker thread. Otherwise, write the error message to the console and exit. + if len(aconf.target_list) > 0: + return exitcodes.CONNECTION_ERROR + else: + out.write() + sys.exit(exitcodes.CONNECTION_ERROR) if sshv is None: sshv = 2 if aconf.ssh2 else 1 @@ -811,7 +837,9 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1]) if payload_txt == u'Protocol major versions differ.': if sshv == 2 and aconf.ssh1: - return audit(aconf, 1) + ret = audit(out, aconf, 1) + out.write() + return ret err = '[exception] error reading packet ({})'.format(payload_txt) else: err_pair = None @@ -824,11 +852,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal 'instead received unknown message ({2})' err = fmt.format(err_pair[0], err_pair[1], packet_type) if err is not None: - output(aconf, banner, header) + output(out, aconf, banner, header) out.fail(err) return exitcodes.CONNECTION_ERROR if sshv == 1: - program_retval = output(aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload)) + program_retval = output(out, aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload)) elif sshv == 2: kex = SSH2_Kex.parse(payload) if aconf.client_audit is False: @@ -837,11 +865,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal # This is a standard audit scan. if (aconf.policy is None) and (aconf.make_policy is False): - program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target) + program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target) # This is a policy test. elif (aconf.policy is not None) and (aconf.make_policy is False): - program_retval = exitcodes.GOOD if evaluate_policy(aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE + program_retval = exitcodes.GOOD if evaluate_policy(out, aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE # A new policy should be made from this scan. elif (aconf.policy is None) and (aconf.make_policy is True): @@ -853,7 +881,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal return program_retval -def algorithm_lookup(alg_names: str) -> int: +def algorithm_lookup(out: OutputBuffer, alg_names: str) -> int: '''Looks up a comma-separated list of algorithms and outputs their security properties. Returns an exitcodes.* flag.''' retval = exitcodes.GOOD alg_types = { @@ -885,7 +913,7 @@ def algorithm_lookup(alg_names: str) -> int: for alg_type in alg_types: if len(algorithms_dict[alg_type]) > 0: title = str(alg_types.get(alg_type)) - retval = output_algorithms(title, adb, alg_type, list(algorithms_dict[alg_type]), unknown_algorithms, False, retval, padding) + retval = output_algorithms(out, title, adb, alg_type, list(algorithms_dict[alg_type]), unknown_algorithms, False, retval, padding) algorithms_dict_flattened = [ alg_name @@ -915,7 +943,7 @@ def algorithm_lookup(alg_names: str) -> int: for algorithm_not_found in algorithms_not_found: out.fail(algorithm_not_found) - print() + out.sep() if len(similar_algorithms) > 0: retval = exitcodes.FAILURE @@ -926,14 +954,45 @@ def algorithm_lookup(alg_names: str) -> int: return retval -out = Output() +# Worker thread for scanning multiple targets concurrently. +def target_worker_thread(host: str, port: int, shared_aconf: AuditConf) -> Tuple[int, str]: + ret = -1 + string_output = '' + + out = OutputBuffer() + out.verbose = shared_aconf.verbose + my_aconf = copy.deepcopy(shared_aconf) + my_aconf.host = host + my_aconf.port = port + + # If we're outputting JSON, turn off colors and ensure 'info' level messages go through. + if my_aconf.json: + out.json = True + out.use_colors = False + + out.v("Running against: %s:%d..." % (my_aconf.host, my_aconf.port), write_now=True) + try: + ret = audit(out, my_aconf, print_target=True) + string_output = out.get_buffer() + except Exception: + ret = -1 + string_output = "An exception occurred while scanning %s:%d:\n%s" % (host, port, str(traceback.format_exc())) + + return ret, string_output def main() -> int: - aconf = process_commandline(sys.argv[1:], usage) + out = OutputBuffer() + aconf = process_commandline(out, sys.argv[1:], usage) + + # If we're outputting JSON, turn off colors and ensure 'info' level messages go through. + if aconf.json: + out.json = True + out.use_colors = False if aconf.lookup != '': - retval = algorithm_lookup(aconf.lookup) + retval = algorithm_lookup(out, aconf.lookup) + out.write() sys.exit(retval) # If multiple targets were specified... @@ -945,31 +1004,46 @@ def main() -> int: print('[', end='') # Loop through each target in the list. - for i, target in enumerate(aconf.target_list): - aconf.host, port = Utils.parse_host_and_port(target) - if port == 0: - port = 22 - aconf.port = port + target_servers = [] + for _, target in enumerate(aconf.target_list): + host, port = Utils.parse_host_and_port(target, default_port=22) + target_servers.append((host, port)) - new_ret = audit(aconf, print_target=True) + # A ranked list of return codes. Those with higher indices will take precendence over lower ones. For example, if three servers are scanned, yielding WARNING, GOOD, and UNKNOWN_ERROR, the overall result will be UNKNOWN_ERROR, since its index is the highest. Errors have highest priority, followed by failures, then warnings. + ranked_return_codes = [exitcodes.GOOD, exitcodes.WARNING, exitcodes.FAILURE, exitcodes.CONNECTION_ERROR, exitcodes.UNKNOWN_ERROR] - # Set the return value only if an unknown error occurred, a failure occurred, or if a warning occurred and the previous value was good. - if (new_ret == exitcodes.UNKNOWN_ERROR) or (new_ret == exitcodes.FAILURE) or ((new_ret == exitcodes.WARNING) and (ret == exitcodes.GOOD)): - ret = new_ret + # Queue all worker threads. + num_target_servers = len(target_servers) + num_processed = 0 + out.v("Scanning %u targets with %s%u threads..." % (num_target_servers, '(at most) ' if aconf.threads > num_target_servers else '', aconf.threads), write_now=True) + with concurrent.futures.ThreadPoolExecutor(max_workers=aconf.threads) as executor: + future_to_server = {executor.submit(target_worker_thread, target_server[0], target_server[1], aconf): target_server for target_server in target_servers} + for future in concurrent.futures.as_completed(future_to_server): + worker_ret, worker_output = future.result() - # Don't print a delimiter after the last target was handled. - if i + 1 != len(aconf.target_list): - if aconf.json: - print(", ", end='') - else: - print(("-" * 80) + "\n") + # If this worker's return code is ranked higher that what we've cached so far, update our cache. + if ranked_return_codes.index(worker_ret) > ranked_return_codes.index(ret): + ret = worker_ret + + # print("Worker for %s:%d returned %d: [%s]" % (target_server[0], target_server[1], worker_ret, worker_output)) + print(worker_output, end='' if aconf.json else "\n") + + # Don't print a delimiter after the last target was handled. + num_processed += 1 + if num_processed < num_target_servers: + if aconf.json: + print(", ", end='') + else: + print(("-" * 80) + "\n") if aconf.json: print(']') - return ret - else: - return audit(aconf) + else: # Just a scan against a single target. + ret = audit(out, aconf) + out.write() + + return ret if __name__ == '__main__': # pragma: nocover diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index 4d36c17..0960537 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -36,7 +36,7 @@ from typing import Callable, Optional, Union, Any # noqa: F401 from ssh_audit import exitcodes from ssh_audit.banner import Banner from ssh_audit.globals import SSH_HEADER -from ssh_audit.output import Output +from ssh_audit.outputbuffer import OutputBuffer from ssh_audit.protocol import Protocol from ssh_audit.readbuf import ReadBuf from ssh_audit.ssh1 import SSH1 @@ -95,7 +95,7 @@ class SSH_Socket(ReadBuf, WriteBuf): if not check or socktype == socket.SOCK_STREAM: yield af, addr except socket.error as e: - Output().fail('[exception] {}'.format(e)) + OutputBuffer().fail('[exception] {}'.format(e)).write() sys.exit(exitcodes.CONNECTION_ERROR) # Listens on a server socket and accepts one connection (used for @@ -273,7 +273,7 @@ class SSH_Socket(ReadBuf, WriteBuf): payload_length = packet_length - padding_length - 1 check_size = 4 + 1 + payload_length + padding_length if check_size % self.__block_size != 0: - Output().fail('[exception] invalid ssh packet (block size)') + OutputBuffer().fail('[exception] invalid ssh packet (block size)').write() sys.exit(exitcodes.CONNECTION_ERROR) self.ensure_read(payload_length) if sshv == 1: @@ -288,7 +288,7 @@ class SSH_Socket(ReadBuf, WriteBuf): if sshv == 1: rcrc = SSH1.crc32(padding + payload) if crc != rcrc: - Output().fail('[exception] packet checksum CRC32 mismatch.') + OutputBuffer().fail('[exception] packet checksum CRC32 mismatch.').write() sys.exit(exitcodes.CONNECTION_ERROR) else: self.ensure_read(padding_length) diff --git a/src/ssh_audit/utils.py b/src/ssh_audit/utils.py index 3469dd0..a17ecb6 100644 --- a/src/ssh_audit/utils.py +++ b/src/ssh_audit/utils.py @@ -129,10 +129,10 @@ class Utils: return -1.0 @staticmethod - def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: + def parse_host_and_port(host_and_port: str, default_port: int = 0) -> Tuple[str, int]: '''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' host = host_and_port - port = 0 + port = default_port mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) if mx is not None: diff --git a/ssh-audit.1 b/ssh-audit.1 index 7d8f2e5..c1ec017 100644 --- a/ssh-audit.1 +++ b/ssh-audit.1 @@ -94,7 +94,12 @@ The timeout, in seconds, for creating connections and reading data from the sock .TP .B -T, \-\-targets= .br -A file containing a list of target hosts. Each line must have one host, in the format of HOST[:PORT]. +A file containing a list of target hosts. Each line must have one host, in the format of HOST[:PORT]. Use --threads to control concurrent scans. + +.TP +.B \-\-threads= +.br +The number of threads to use when scanning multiple targets (with -T/--targets). Default is 32. .TP .B -v, \-\-verbose diff --git a/test/docker/expected_results/dropbear_2019.78_test1.json b/test/docker/expected_results/dropbear_2019.78_test1.json index 7804783..c019eb9 100644 --- a/test/docker/expected_results/dropbear_2019.78_test1.json +++ b/test/docker/expected_results/dropbear_2019.78_test1.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-dropbear_2019.78", "software": "dropbear_2019.78"}, "compression": ["zlib@openssh.com", "none"], "enc": ["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc", "3des-ctr", "3des-cbc"], "fingerprints": [{"fp": "SHA256:CDfAU12pjQS7/91kg7gYacza0U/6PDbE04Ic3IpYxkM", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "kexguess2@matt.ucc.asn.au"}], "key": [{"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-sha1-96", "hmac-sha1", "hmac-sha2-256"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-dropbear_2019.78", "software": "dropbear_2019.78"}, "compression": ["zlib@openssh.com", "none"], "enc": ["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc", "3des-ctr", "3des-cbc"], "fingerprints": [{"fp": "SHA256:CDfAU12pjQS7/91kg7gYacza0U/6PDbE04Ic3IpYxkM", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "kexguess2@matt.ucc.asn.au"}], "key": [{"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-sha1-96", "hmac-sha1", "hmac-sha2-256"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_4.0p1_test1.json b/test/docker/expected_results/openssh_4.0p1_test1.json index 75e11e9..f17720d 100644 --- a/test/docker/expected_results/openssh_4.0p1_test1.json +++ b/test/docker/expected_results/openssh_4.0p1_test1.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [1, 99], "raw": "SSH-1.99-OpenSSH_4.0", "software": "OpenSSH_4.0"}, "compression": ["none", "zlib"], "enc": ["aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "arcfour", "aes192-cbc", "aes256-cbc", "rijndael-cbc@lysator.liu.se", "aes128-ctr", "aes192-ctr", "aes256-ctr"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} +{"banner": {"comments": null, "protocol": [1, 99], "raw": "SSH-1.99-OpenSSH_4.0", "software": "OpenSSH_4.0"}, "compression": ["none", "zlib"], "enc": ["aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "arcfour", "aes192-cbc", "aes256-cbc", "rijndael-cbc@lysator.liu.se", "aes128-ctr", "aes192-ctr", "aes256-ctr"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_5.6p1_test1.json b/test/docker/expected_results/openssh_5.6p1_test1.json index d5ba217..5b710a3 100644 --- a/test/docker/expected_results/openssh_5.6p1_test1.json +++ b/test/docker/expected_results/openssh_5.6p1_test1.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_5.6p1_test2.json b/test/docker/expected_results/openssh_5.6p1_test2.json index 903fbf3..50337ef 100644 --- a/test/docker/expected_results/openssh_5.6p1_test2.json +++ b/test/docker/expected_results/openssh_5.6p1_test2.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_5.6p1_test3.json b/test/docker/expected_results/openssh_5.6p1_test3.json index 993fd0c..ad3fd57 100644 --- a/test/docker/expected_results/openssh_5.6p1_test3.json +++ b/test/docker/expected_results/openssh_5.6p1_test3.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_5.6p1_test4.json b/test/docker/expected_results/openssh_5.6p1_test4.json index 7273bd0..1415921 100644 --- a/test/docker/expected_results/openssh_5.6p1_test4.json +++ b/test/docker/expected_results/openssh_5.6p1_test4.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_5.6p1_test5.json b/test/docker/expected_results/openssh_5.6p1_test5.json index 30ebced..0dafc95 100644 --- a/test/docker/expected_results/openssh_5.6p1_test5.json +++ b/test/docker/expected_results/openssh_5.6p1_test5.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_8.0p1_test1.json b/test/docker/expected_results/openssh_8.0p1_test1.json index 97e47f1..9185f5d 100644 --- a/test/docker/expected_results/openssh_8.0p1_test1.json +++ b/test/docker/expected_results/openssh_8.0p1_test1.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}, {"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "rsa-sha2-512", "keysize": 3072}, {"algorithm": "rsa-sha2-256", "keysize": 3072}, {"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-ed25519"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}, {"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "rsa-sha2-512", "keysize": 3072}, {"algorithm": "rsa-sha2-256", "keysize": 3072}, {"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-ed25519"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_8.0p1_test2.json b/test/docker/expected_results/openssh_8.0p1_test2.json index 1277d29..d6626fe 100644 --- a/test/docker/expected_results/openssh_8.0p1_test2.json +++ b/test/docker/expected_results/openssh_8.0p1_test2.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "ssh-ed25519"}, {"algorithm": "ssh-ed25519-cert-v01@openssh.com"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "ssh-ed25519"}, {"algorithm": "ssh-ed25519-cert-v01@openssh.com"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"], "target": "localhost"} diff --git a/test/docker/expected_results/openssh_8.0p1_test3.json b/test/docker/expected_results/openssh_8.0p1_test3.json index b0fa04a..88163e2 100644 --- a/test/docker/expected_results/openssh_8.0p1_test3.json +++ b/test/docker/expected_results/openssh_8.0p1_test3.json @@ -1 +1 @@ -{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes256-gcm@openssh.com", "aes128-gcm@openssh.com", "aes256-ctr", "aes192-ctr", "aes128-ctr"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "umac-128-etm@openssh.com"]} +{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes256-gcm@openssh.com", "aes128-gcm@openssh.com", "aes256-ctr", "aes192-ctr", "aes128-ctr"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "umac-128-etm@openssh.com"], "target": "localhost"} diff --git a/test/docker/expected_results/tinyssh_20190101_test1.json b/test/docker/expected_results/tinyssh_20190101_test1.json index d2dd3f0..c0820bf 100644 --- a/test/docker/expected_results/tinyssh_20190101_test1.json +++ b/test/docker/expected_results/tinyssh_20190101_test1.json @@ -1 +1 @@ -{"banner": {"comments": "", "protocol": [2, 0], "raw": "", "software": "tinyssh_noversion"}, "compression": ["none"], "enc": ["chacha20-poly1305@openssh.com"], "fingerprints": [{"fp": "SHA256:89ocln1x7KNqnMgWffGoYtD70ksJ4FrH7BMJHa7SrwU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "sntrup4591761x25519-sha512@tinyssh.org"}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256"]} +{"banner": {"comments": "", "protocol": [2, 0], "raw": "", "software": "tinyssh_noversion"}, "compression": ["none"], "enc": ["chacha20-poly1305@openssh.com"], "fingerprints": [{"fp": "SHA256:89ocln1x7KNqnMgWffGoYtD70ksJ4FrH7BMJHa7SrwU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "sntrup4591761x25519-sha512@tinyssh.org"}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256"], "target": "localhost"} diff --git a/test/test_auditconf.py b/test/test_auditconf.py index de3a0ef..064af54 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -7,6 +7,7 @@ class TestAuditConf: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf + self.OutputBuffer = ssh_audit.OutputBuffer self.usage = ssh_audit.usage self.process_commandline = process_commandline @@ -127,7 +128,7 @@ class TestAuditConf: def test_audit_conf_process_commandline(self): # pylint: disable=too-many-statements - c = lambda x: self.process_commandline(x.split(), self.usage) # noqa + c = lambda x: self.process_commandline(self.OutputBuffer, x.split(), self.usage) # noqa with pytest.raises(SystemExit): conf = c('') with pytest.raises(SystemExit): diff --git a/test/test_build_struct.py b/test/test_build_struct.py index ef47758..99ead0b 100644 --- a/test/test_build_struct.py +++ b/test/test_build_struct.py @@ -35,7 +35,7 @@ def test_prevent_runtime_error_regression(ssh_audit, kex): kex.set_host_key("ssh-rsa7", b"\x00\x00\x00\x07ssh-rsa\x00\x00\x00") kex.set_host_key("ssh-rsa8", b"\x00\x00\x00\x07ssh-rsa\x00\x00\x00") - rv = ssh_audit.build_struct(banner=None, kex=kex) + rv = ssh_audit.build_struct('localhost', banner=None, kex=kex) assert len(rv["fingerprints"]) == 9 diff --git a/test/test_errors.py b/test/test_errors.py index 44526f5..0e06d05 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -2,12 +2,15 @@ import socket import errno import pytest +from ssh_audit.outputbuffer import OutputBuffer + # pylint: disable=attribute-defined-outside-init class TestErrors: @pytest.fixture(autouse=True) def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf + self.OutputBuffer = ssh_audit.OutputBuffer self.audit = ssh_audit.audit def _conf(self): @@ -21,14 +24,21 @@ class TestErrors: conf = self._conf() spy.begin() + out = OutputBuffer() if exit_expected: with pytest.raises(SystemExit): - self.audit(conf) + self.audit(out, conf) else: - ret = self.audit(conf) + ret = self.audit(out, conf) assert ret != 0 + out.write() lines = spy.flush() + + # If the last line is empty, delete it. + if len(lines) > 1 and lines[-1] == '': + del lines[-1] + return lines def test_connection_unresolved(self, output_spy, virtual_socket): diff --git a/test/test_output.py b/test/test_outputbuffer.py similarity index 67% rename from test/test_output.py rename to test/test_outputbuffer.py index ac08a52..d92d7bd 100644 --- a/test/test_output.py +++ b/test/test_outputbuffer.py @@ -2,102 +2,107 @@ import pytest # pylint: disable=attribute-defined-outside-init -class TestOutput: +class TestOutputBuffer: @pytest.fixture(autouse=True) def init(self, ssh_audit): - self.Output = ssh_audit.Output self.OutputBuffer = ssh_audit.OutputBuffer - def test_output_buffer_no_lines(self, output_spy): + def test_outputbuffer_no_lines(self, output_spy): output_spy.begin() - with self.OutputBuffer() as obuf: - pass - assert output_spy.flush() == [] + obuf = self.OutputBuffer() + obuf.write() + assert output_spy.flush() == [''] output_spy.begin() - with self.OutputBuffer() as obuf: - pass - obuf.flush() - assert output_spy.flush() == [] - def test_output_buffer_no_flush(self, output_spy): - output_spy.begin() - with self.OutputBuffer(): - print('abc') - assert output_spy.flush() == [] - - def test_output_buffer_flush(self, output_spy): - output_spy.begin() - with self.OutputBuffer() as obuf: - print('abc') - print() - print('def') - obuf.flush() - assert output_spy.flush() == ['abc', '', 'def'] - - def test_output_defaults(self): - out = self.Output() + def test_outputbuffer_defaults(self): + obuf = self.OutputBuffer() # default: on - assert out.batch is False - assert out.use_colors is True - assert out.level == 'info' + assert obuf.batch is False + assert obuf.use_colors is True + assert obuf.level == 'info' - def test_output_colors(self, output_spy): - out = self.Output() - # test without colors + def test_outputbuffer_colors(self, output_spy): + out = self.OutputBuffer() + + # Test without colors. out.use_colors = False + output_spy.begin() out.info('info color') + out.write() assert output_spy.flush() == ['info color'] + output_spy.begin() out.head('head color') + out.write() assert output_spy.flush() == ['head color'] + output_spy.begin() out.good('good color') + out.write() assert output_spy.flush() == ['good color'] + output_spy.begin() out.warn('warn color') + out.write() assert output_spy.flush() == ['warn color'] + output_spy.begin() out.fail('fail color') + out.write() assert output_spy.flush() == ['fail color'] + + # If colors aren't supported by this system, skip the color tests. if not out.colors_supported: return - # test with colors + + # Test with colors. out.use_colors = True + output_spy.begin() out.info('info color') + out.write() assert output_spy.flush() == ['info color'] + output_spy.begin() out.head('head color') - assert output_spy.flush() == ['\x1b[0;36mhead color\x1b[0m'] + out.write() + assert output_spy.flush() in [['\x1b[0;36mhead color\x1b[0m'], ['\x1b[0;96mhead color\x1b[0m']] + output_spy.begin() out.good('good color') - assert output_spy.flush() == ['\x1b[0;32mgood color\x1b[0m'] + out.write() + assert output_spy.flush() in [['\x1b[0;32mgood color\x1b[0m'], ['\x1b[0;92mgood color\x1b[0m']] + output_spy.begin() out.warn('warn color') - assert output_spy.flush() == ['\x1b[0;33mwarn color\x1b[0m'] + out.write() + assert output_spy.flush() in [['\x1b[0;33mwarn color\x1b[0m'], ['\x1b[0;93mwarn color\x1b[0m']] + output_spy.begin() out.fail('fail color') - assert output_spy.flush() == ['\x1b[0;31mfail color\x1b[0m'] + out.write() + assert output_spy.flush() in [['\x1b[0;31mfail color\x1b[0m'], ['\x1b[0;91mfail color\x1b[0m']] - def test_output_sep(self, output_spy): - out = self.Output() + def test_outputbuffer_sep(self, output_spy): + out = self.OutputBuffer() output_spy.begin() out.sep() out.sep() out.sep() + out.write() assert output_spy.flush() == ['', '', ''] - def test_output_levels(self): - out = self.Output() + def test_outputbuffer_levels(self): + out = self.OutputBuffer() assert out.get_level('info') == 0 assert out.get_level('good') == 0 assert out.get_level('warn') == 1 assert out.get_level('fail') == 2 assert out.get_level('unknown') > 2 - def test_output_level_property(self): - out = self.Output() + def test_outputbuffer_level_property(self): + out = self.OutputBuffer() out.level = 'info' assert out.level == 'info' out.level = 'good' @@ -109,8 +114,8 @@ class TestOutput: out.level = 'invalid level' assert out.level == 'unknown' - def test_output_level(self, output_spy): - out = self.Output() + def test_outputbuffer_level(self, output_spy): + out = self.OutputBuffer() # visible: all out.level = 'info' output_spy.begin() @@ -119,6 +124,7 @@ class TestOutput: out.good('good color') out.warn('warn color') out.fail('fail color') + out.write() assert len(output_spy.flush()) == 5 # visible: head, warn, fail out.level = 'warn' @@ -128,6 +134,7 @@ class TestOutput: out.good('good color') out.warn('warn color') out.fail('fail color') + out.write() assert len(output_spy.flush()) == 3 # visible: head, fail out.level = 'fail' @@ -137,6 +144,7 @@ class TestOutput: out.good('good color') out.warn('warn color') out.fail('fail color') + out.write() assert len(output_spy.flush()) == 2 # visible: head out.level = 'invalid level' @@ -146,10 +154,11 @@ class TestOutput: out.good('good color') out.warn('warn color') out.fail('fail color') + out.write() assert len(output_spy.flush()) == 1 - def test_output_batch(self, output_spy): - out = self.Output() + def test_outputbuffer_batch(self, output_spy): + out = self.OutputBuffer() # visible: all output_spy.begin() out.level = 'info' @@ -159,6 +168,7 @@ class TestOutput: out.good('good color') out.warn('warn color') out.fail('fail color') + out.write() assert len(output_spy.flush()) == 5 # visible: all except head output_spy.begin() @@ -169,4 +179,5 @@ class TestOutput: out.good('good color') out.warn('warn color') out.fail('fail color') + out.write() assert len(output_spy.flush()) == 4 diff --git a/test/test_ssh1.py b/test/test_ssh1.py index 820af5f..1ccbb7c 100644 --- a/test/test_ssh1.py +++ b/test/test_ssh1.py @@ -3,6 +3,7 @@ import pytest from ssh_audit.auditconf import AuditConf from ssh_audit.fingerprint import Fingerprint +from ssh_audit.outputbuffer import OutputBuffer from ssh_audit.protocol import Protocol from ssh_audit.readbuf import ReadBuf from ssh_audit.ssh1 import SSH1 @@ -15,6 +16,7 @@ from ssh_audit.writebuf import WriteBuf class TestSSH1: @pytest.fixture(autouse=True) def init(self, ssh_audit): + self.OutputBuffer = OutputBuffer self.protocol = Protocol self.ssh1 = SSH1 self.PublicKeyMessage = SSH1_PublicKeyMessage @@ -132,9 +134,11 @@ class TestSSH1: vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') vsocket.rdata.append(self._create_ssh1_packet(w.write_flush())) output_spy.begin() - self.audit(self._conf()) + out = self.OutputBuffer() + self.audit(out, self._conf()) + out.write() lines = output_spy.flush() - assert len(lines) == 14 + assert len(lines) == 15 def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket): vsocket = virtual_socket @@ -144,10 +148,12 @@ class TestSSH1: vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') vsocket.rdata.append(self._create_ssh1_packet(w.write_flush())) output_spy.begin() - ret = self.audit(self._conf()) + out = self.OutputBuffer() + ret = self.audit(out, self._conf()) + out.write() assert ret != 0 lines = output_spy.flush() - assert len(lines) == 8 + assert len(lines) == 9 assert 'unknown message' in lines[-1] def test_ssh1_server_invalid_checksum(self, output_spy, virtual_socket): @@ -158,8 +164,10 @@ class TestSSH1: vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') vsocket.rdata.append(self._create_ssh1_packet(w.write_flush(), False)) output_spy.begin() + out = self.OutputBuffer() with pytest.raises(SystemExit): - self.audit(self._conf()) + self.audit(out, self._conf()) + out.write() lines = output_spy.flush() - assert len(lines) == 1 - assert 'checksum' in lines[-1] + assert len(lines) == 3 + assert ('checksum' in lines[0]) or ('checksum' in lines[1]) or ('checksum' in lines[2]) diff --git a/test/test_ssh2.py b/test/test_ssh2.py index 1cdfd91..d38cadd 100644 --- a/test/test_ssh2.py +++ b/test/test_ssh2.py @@ -3,6 +3,7 @@ import struct import pytest from ssh_audit.auditconf import AuditConf +from ssh_audit.outputbuffer import OutputBuffer from ssh_audit.protocol import Protocol from ssh_audit.readbuf import ReadBuf from ssh_audit.ssh2_kex import SSH2_Kex @@ -15,6 +16,7 @@ from ssh_audit.writebuf import WriteBuf class TestSSH2: @pytest.fixture(autouse=True) def init(self, ssh_audit): + self.OutputBuffer = OutputBuffer self.protocol = Protocol self.ssh2_kex = SSH2_Kex self.ssh2_kexparty = SSH2_KexParty @@ -141,9 +143,11 @@ class TestSSH2: vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n') vsocket.rdata.append(self._create_ssh2_packet(w.write_flush())) output_spy.begin() - self.audit(self._conf()) + out = self.OutputBuffer() + self.audit(out, self._conf()) + out.write() lines = output_spy.flush() - assert len(lines) == 68 + assert len(lines) == 69 def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket): vsocket = virtual_socket @@ -152,8 +156,10 @@ class TestSSH2: vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n') vsocket.rdata.append(self._create_ssh2_packet(w.write_flush())) output_spy.begin() - ret = self.audit(self._conf()) + out = self.OutputBuffer() + ret = self.audit(out, self._conf()) + out.write() assert ret != 0 lines = output_spy.flush() - assert len(lines) == 4 + assert len(lines) == 5 assert 'unknown message' in lines[-1]