diff --git a/ssh-audit.py b/ssh-audit.py index 51b9569..3884996 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -456,6 +456,140 @@ macs = %s return "Name: %s\nVersion: %s\nBanner: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str) +class Utils: + @classmethod + def _type_err(cls, v: Any, target: str) -> TypeError: + return TypeError('cannot convert {} to {}'.format(type(v), target)) + + @classmethod + def to_bytes(cls, v: Union[bytes, str], enc: str = 'utf-8') -> bytes: + if isinstance(v, bytes): + return v + elif isinstance(v, str): + return v.encode(enc) + raise cls._type_err(v, 'bytes') + + @classmethod + def to_text(cls, v: Union[str, bytes], enc: str = 'utf-8') -> str: + if isinstance(v, str): + return v + elif isinstance(v, bytes): + return v.decode(enc) + raise cls._type_err(v, 'unicode text') + + @classmethod + def _is_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127) -> bool: + r = False + if isinstance(v, str): + for c in v: + i = cls.ctoi(c) + if not char_filter(i): + return r + r = True + return r + + @classmethod + def _to_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127, errors: str = 'replace') -> str: + if isinstance(v, str): + r = bytearray() + for c in v: + i = cls.ctoi(c) + if char_filter(i): + r.append(i) + else: + if errors == 'ignore': + continue + r.append(63) + return cls.to_text(r.decode('ascii')) + raise cls._type_err(v, 'ascii') + + @classmethod + def is_ascii(cls, v: str) -> bool: + return cls._is_ascii(v) + + @classmethod + def to_ascii(cls, v: str, errors: str = 'replace') -> str: + return cls._to_ascii(v, errors=errors) + + @classmethod + def is_print_ascii(cls, v: str) -> bool: + return cls._is_ascii(v, lambda x: 126 >= x >= 32) + + @classmethod + def to_print_ascii(cls, v: str, errors: str = 'replace') -> str: + return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors) + + @classmethod + def unique_seq(cls, seq: Sequence[Any]) -> Sequence[Any]: + seen = set() # type: Set[Any] + + def _seen_add(x: Any) -> bool: + seen.add(x) + return False + + if isinstance(seq, tuple): + return tuple(x for x in seq if x not in seen and not _seen_add(x)) + else: + return [x for x in seq if x not in seen and not _seen_add(x)] + + @classmethod + def ctoi(cls, c: Union[str, int]) -> int: + if isinstance(c, str): + return ord(c[0]) + else: + return c + + @staticmethod + def parse_int(v: Any) -> int: + try: + return int(v) + except Exception: # pylint: disable=bare-except + return 0 + + @staticmethod + def parse_float(v: Any) -> float: + try: + return float(v) + except Exception: # pylint: disable=bare-except + return -1.0 + + @staticmethod + def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: + '''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' + host = host_and_port + port = 0 + + mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) + if mx is not None: + host = mx.group(1) + port_str = mx.group(2) + if port_str is not None: + port = int(port_str) + else: + s = host_and_port.split(':') + if len(s) == 2: + host = s[0] + if len(s[1]) > 0: + port = int(s[1]) + + return host, port + + @staticmethod + def is_ipv6_address(address: str) -> bool: + '''Returns True if address is an IPv6 address, otherwise False.''' + is_ipv6 = True + try: + ipaddress.IPv6Address(address) + except ipaddress.AddressValueError: + is_ipv6 = False + + return is_ipv6 + + @staticmethod + def is_windows() -> bool: + return sys.platform in ['win32', 'cygwin'] + + class AuditConf: # pylint: disable=too-many-instance-attributes def __init__(self, host: str = '', port: int = 22) -> None: @@ -652,6 +786,10 @@ class Output: LEVELS = ('info', 'warn', 'fail') # type: Sequence[str] COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} + # Use brighter colors on Windows for better readability. + if Utils.is_windows(): + COLORS = {'head': 96, 'good': 92, 'warn': 93, 'fail': 91} + def __init__(self) -> None: self.batch = False self.verbose = False @@ -3388,10 +3526,18 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], client_hos print("Host: %s" % host) print("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version())) print("Result: %s" % spacing, end='') + + # Use these nice unicode characters in the result message, unless we're on Windows (the cmd.exe terminal doesn't display them properly). + icon_good = "✔ " + icon_fail = "❌ " + if Utils.is_windows(): + icon_good = "" + icon_fail = "" + if passed: - out.good("✔ Passed") + out.good("%sPassed" % icon_good) else: - out.fail("❌ Failed!") + out.fail("%sFailed!" % icon_fail) out.warn("\nErrors:\n%s" % error_str) return passed @@ -3406,7 +3552,8 @@ def list_policies() -> None: # If the path is not a directory, print a useful error and exit. if not os.path.isdir(policies_dir): print("Error: could not find policies directory. Please report this full output to <%s>:" % GITHUB_ISSUES_URL) - print("\n__file__: %s" % __file__) + print("\nsys.argv[0]: %s" % sys.argv[0]) + print("__file__: %s" % __file__) print("policies_dir: %s" % policies_dir) sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) @@ -3474,136 +3621,6 @@ def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional[ print("Error: file already exists: %s" % aconf.policy_file) -class Utils: - @classmethod - def _type_err(cls, v: Any, target: str) -> TypeError: - return TypeError('cannot convert {} to {}'.format(type(v), target)) - - @classmethod - def to_bytes(cls, v: Union[bytes, str], enc: str = 'utf-8') -> bytes: - if isinstance(v, bytes): - return v - elif isinstance(v, str): - return v.encode(enc) - raise cls._type_err(v, 'bytes') - - @classmethod - def to_text(cls, v: Union[str, bytes], enc: str = 'utf-8') -> str: - if isinstance(v, str): - return v - elif isinstance(v, bytes): - return v.decode(enc) - raise cls._type_err(v, 'unicode text') - - @classmethod - def _is_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127) -> bool: - r = False - if isinstance(v, str): - for c in v: - i = cls.ctoi(c) - if not char_filter(i): - return r - r = True - return r - - @classmethod - def _to_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127, errors: str = 'replace') -> str: - if isinstance(v, str): - r = bytearray() - for c in v: - i = cls.ctoi(c) - if char_filter(i): - r.append(i) - else: - if errors == 'ignore': - continue - r.append(63) - return cls.to_text(r.decode('ascii')) - raise cls._type_err(v, 'ascii') - - @classmethod - def is_ascii(cls, v: str) -> bool: - return cls._is_ascii(v) - - @classmethod - def to_ascii(cls, v: str, errors: str = 'replace') -> str: - return cls._to_ascii(v, errors=errors) - - @classmethod - def is_print_ascii(cls, v: str) -> bool: - return cls._is_ascii(v, lambda x: 126 >= x >= 32) - - @classmethod - def to_print_ascii(cls, v: str, errors: str = 'replace') -> str: - return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors) - - @classmethod - def unique_seq(cls, seq: Sequence[Any]) -> Sequence[Any]: - seen = set() # type: Set[Any] - - def _seen_add(x: Any) -> bool: - seen.add(x) - return False - - if isinstance(seq, tuple): - return tuple(x for x in seq if x not in seen and not _seen_add(x)) - else: - return [x for x in seq if x not in seen and not _seen_add(x)] - - @classmethod - def ctoi(cls, c: Union[str, int]) -> int: - if isinstance(c, str): - return ord(c[0]) - else: - return c - - @staticmethod - def parse_int(v: Any) -> int: - try: - return int(v) - except Exception: # pylint: disable=bare-except - return 0 - - @staticmethod - def parse_float(v: Any) -> float: - try: - return float(v) - except Exception: # pylint: disable=bare-except - return -1.0 - - @staticmethod - def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: - '''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' - host = host_and_port - port = 0 - - mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) - if mx is not None: - host = mx.group(1) - port_str = mx.group(2) - if port_str is not None: - port = int(port_str) - else: - s = host_and_port.split(':') - if len(s) == 2: - host = s[0] - if len(s[1]) > 0: - port = int(s[1]) - - return host, port - - @staticmethod - def is_ipv6_address(address: str) -> bool: - '''Returns True if address is an IPv6 address, otherwise False.''' - is_ipv6 = True - try: - ipaddress.IPv6Address(address) - except ipaddress.AddressValueError: - is_ipv6 = False - - return is_ipv6 - - def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: banner_str = ''