Added multi-threaded scanning support.

This commit is contained in:
Joe Testa 2021-02-01 13:10:06 -05:00
parent bbb81e24ab
commit 13d15baa2a
25 changed files with 442 additions and 280 deletions

View File

@ -21,6 +21,7 @@
- historical information from OpenSSH, Dropbear SSH and libssh; - historical information from OpenSSH, Dropbear SSH and libssh;
- policy scans to ensure adherence to a hardened/standard configuration; - policy scans to ensure adherence to a hardened/standard configuration;
- runs on Linux and Windows; - runs on Linux and Windows;
- supports Python 3.6 - 3.9;
- no dependencies - no dependencies
## Usage ## Usage
@ -158,9 +159,11 @@ For convenience, a web front-end on top of the command-line tool is available at
## ChangeLog ## ChangeLog
### v2.4.0-dev (???) ### v2.4.0-dev (???)
- Added multi-threaded scanning support.
- Added version check for OpenSSH user enumeration (CVE-2018-15473). - Added version check for OpenSSH user enumeration (CVE-2018-15473).
- Fixed crash when receiving unexpected response during host key test. - Fixed crash when receiving unexpected response during host key test.
- Fixed hang against older Cisco devices during host key test & gex test. - Fixed hang against older Cisco devices during host key test & gex test.
- Fixed improper termination while scanning multiple targets when one target returns an error.
- Dropped support for Python 3.5 (which reached EOL in Sept. 2020). - Dropped support for Python 3.5 (which reached EOL in Sept. 2020).
### v2.3.1 (2020-10-28) ### v2.3.1 (2020-10-28)

View File

@ -53,6 +53,7 @@ class AuditConf:
self.timeout_set = False # Set to True when the user explicitly sets it. self.timeout_set = False # Set to True when the user explicitly sets it.
self.target_file: Optional[str] = None self.target_file: Optional[str] = None
self.target_list: List[str] = [] self.target_list: List[str] = []
self.threads = 32
self.list_policies = False self.list_policies = False
self.lookup = '' self.lookup = ''
@ -98,6 +99,11 @@ class AuditConf:
valid = True valid = True
elif name in ['policy_file', 'policy', 'target_file', 'target_list', 'lookup']: elif name in ['policy_file', 'policy', 'target_file', 'target_list', 'lookup']:
valid = True valid = True
elif name == "threads":
valid, num_threads = True, Utils.parse_int(value)
if num_threads < 1:
raise ValueError('invalid number of threads: {}'.format(value))
value = num_threads
if valid: if valid:
object.__setattr__(self, name, value) object.__setattr__(self, name, value)

View File

@ -1,87 +0,0 @@
"""
The MIT License (MIT)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import os
import sys
# pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import Callable, Optional, Union, Any # noqa: F401
from ssh_audit.utils import Utils
class Output:
LEVELS: Sequence[str] = ('info', 'warn', 'fail')
COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31}
# Use brighter colors on Windows for better readability.
if Utils.is_windows():
COLORS = {'head': 96, 'good': 92, 'warn': 93, 'fail': 91}
def __init__(self) -> None:
self.batch = False
self.verbose = False
self.use_colors = True
self.json = False
self.__level = 0
self.__colsupport = 'colorama' in sys.modules or os.name == 'posix'
@property
def level(self) -> str:
if self.__level < len(self.LEVELS):
return self.LEVELS[self.__level]
return 'unknown'
@level.setter
def level(self, name: str) -> None:
self.__level = self.get_level(name)
def get_level(self, name: str) -> int:
cname = 'info' if name == 'good' else name
if cname not in self.LEVELS:
return sys.maxsize
return self.LEVELS.index(cname)
def sep(self) -> None:
if not self.batch:
print()
@property
def colors_supported(self) -> bool:
return self.__colsupport
@staticmethod
def _colorized(color: str) -> Callable[[str], None]:
return lambda x: print(u'{}{}\033[0m'.format(color, x))
def __getattr__(self, name: str) -> Callable[[str], None]:
if name == 'head' and self.batch:
return lambda x: None
if not self.get_level(name) >= self.__level:
return lambda x: None
if self.use_colors and self.colors_supported and name in self.COLORS:
color = '\033[0;{}m'.format(self.COLORS[name])
return self._colorized(color)
else:
return lambda x: print(u'{}'.format(x))

View File

@ -1,7 +1,7 @@
""" """
The MIT License (MIT) The MIT License (MIT)
Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2021 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
@ -22,29 +22,154 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import io import os
import sys import sys
# pylint: disable=unused-import # pylint: disable=unused-import
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
from typing import Callable, Optional, Union, Any # noqa: F401 from typing import Callable, Optional, Union, Any # noqa: F401
from ssh_audit.utils import Utils
class OutputBuffer(List[str]):
class OutputBuffer:
LEVELS: Sequence[str] = ('info', 'warn', 'fail')
COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31}
# Use brighter colors on Windows for better readability.
if Utils.is_windows():
COLORS = {'head': 96, 'good': 92, 'warn': 93, 'fail': 91}
def __init__(self, buffer_output: bool = True) -> None:
self.buffer_output = buffer_output
self.buffer: List[str] = []
self.in_section = False
self.section: List[str] = []
self.batch = False
self.verbose = False
self.use_colors = True
self.json = False
self.__level = 0
self.__is_color_supported = ('colorama' in sys.modules) or (os.name == 'posix')
self.line_ended = True
def _print(self, level: str, s: str = '', line_ended: bool = True) -> None:
'''Saves output to buffer (if in buffered mode), or immediately prints to stdout otherwise.'''
# If we're logging only 'warn' or above, and this is an 'info', ignore message.
if self.get_level(level) < self.__level:
return
if self.use_colors and self.colors_supported and len(s) > 0 and level != 'info':
s = "\033[0;%dm%s\033[0m" % (self.COLORS[level], s)
if self.buffer_output:
# Select which list to add to. If we are in a 'with' statement, then this goes in the section buffer, otherwise the general buffer.
buf = self.section if self.in_section else self.buffer
# Determine if a new line should be added, or if the last line should be appended.
if not self.line_ended:
last_entry = -1 if len(buf) > 0 else 0
buf[last_entry] = buf[last_entry] + s
else:
buf.append(s)
# When False, this tells the next call to append to the last line we just added.
self.line_ended = line_ended
else:
print(s)
def get_buffer(self) -> str:
'''Returns all buffered output, then clears the buffer.'''
self.flush_section()
buffer_str = "\n".join(self.buffer)
self.buffer = []
return buffer_str
def write(self) -> None:
'''Writes the output to stdout.'''
self.flush_section()
print(self.get_buffer(), flush=True)
def reset(self) -> None:
self.flush_section()
self.get_buffer()
@property
def level(self) -> str:
'''Returns the minimum level for output.'''
if self.__level < len(self.LEVELS):
return self.LEVELS[self.__level]
return 'unknown'
@level.setter
def level(self, name: str) -> None:
'''Sets the minimum level for output (one of: 'info', 'warn', 'fail').'''
self.__level = self.get_level(name)
def get_level(self, name: str) -> int:
cname = 'info' if name == 'good' else name
if cname not in self.LEVELS:
return sys.maxsize
return self.LEVELS.index(cname)
@property
def colors_supported(self) -> bool:
'''Returns True if the system supports color output.'''
return self.__is_color_supported
# When used in a 'with' block, the output to goes into a section; this can be sorted separately when add_section_to_buffer() is later called.
def __enter__(self) -> 'OutputBuffer': def __enter__(self) -> 'OutputBuffer':
# pylint: disable=attribute-defined-outside-init self.in_section = True
self.__buf = io.StringIO()
self.__stdout = sys.stdout
sys.stdout = self.__buf
return self return self
def flush(self, sort_lines: bool = False) -> None:
# Lines must be sorted in some cases to ensure consistent testing.
if sort_lines:
self.sort() # pylint: disable=no-member
for line in self: # pylint: disable=not-an-iterable
print(line)
def __exit__(self, *args: Any) -> None: def __exit__(self, *args: Any) -> None:
self.extend(self.__buf.getvalue().splitlines()) # pylint: disable=no-member self.in_section = False
sys.stdout = self.__stdout
def flush_section(self, sort_section: bool = False) -> None:
'''Appends section output (optionally sorting it first) to the end of the buffer, then clears the section output.'''
if sort_section:
self.section.sort()
self.buffer.extend(self.section)
self.section = []
def is_section_empty(self) -> bool:
'''Returns True if the section buffer is empty, otherwise False.'''
return len(self.section) == 0
def head(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
if not self.batch:
self._print('head', s, line_ended)
return self
def fail(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
self._print('fail', s, line_ended)
return self
def warn(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
self._print('warn', s, line_ended)
return self
def info(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
self._print('info', s, line_ended)
return self
def good(self, s: str, line_ended: bool = True) -> 'OutputBuffer':
self._print('good', s, line_ended)
return self
def sep(self) -> 'OutputBuffer':
if not self.batch:
self._print('info')
return self
def v(self, s: str, write_now: bool = False) -> 'OutputBuffer':
'''Prints a message if verbose output is enabled.'''
if self.verbose:
self.info(s)
if write_now:
self.write()
return self

View File

@ -2,7 +2,7 @@
""" """
The MIT License (MIT) The MIT License (MIT)
Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com)
Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu)
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
@ -23,6 +23,8 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import concurrent.futures
import copy
import getopt import getopt
import json import json
import os import os
@ -42,7 +44,6 @@ from ssh_audit import exitcodes
from ssh_audit.fingerprint import Fingerprint from ssh_audit.fingerprint import Fingerprint
from ssh_audit.gextest import GEXTest from ssh_audit.gextest import GEXTest
from ssh_audit.hostkeytest import HostKeyTest from ssh_audit.hostkeytest import HostKeyTest
from ssh_audit.output import Output
from ssh_audit.outputbuffer import OutputBuffer from ssh_audit.outputbuffer import OutputBuffer
from ssh_audit.policy import Policy from ssh_audit.policy import Policy
from ssh_audit.product import Product from ssh_audit.product import Product
@ -66,7 +67,7 @@ except ImportError: # pragma: nocover
def usage(err: Optional[str] = None) -> None: def usage(err: Optional[str] = None) -> None:
retval = exitcodes.GOOD retval = exitcodes.GOOD
uout = Output() uout = OutputBuffer()
p = os.path.basename(sys.argv[0]) p = os.path.basename(sys.argv[0])
uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION))
if err is not None and len(err) > 0: if err is not None and len(err) > 0:
@ -89,25 +90,27 @@ def usage(err: Optional[str] = None) -> None:
uout.info(' -p, --port=<port> port to connect') uout.info(' -p, --port=<port> port to connect')
uout.info(' -P, --policy=<policy.txt> run a policy test using the specified policy') uout.info(' -P, --policy=<policy.txt> run a policy test using the specified policy')
uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)') uout.info(' -t, --timeout=<secs> timeout (in seconds) for connection and reading\n (default: 5)')
uout.info(' -T, --targets=<hosts.txt> a file containing a list of target hosts (one\n per line, format HOST[:PORT])') uout.info(' -T, --targets=<hosts.txt> a file containing a list of target hosts (one\n per line, format HOST[:PORT]). Use --threads\n to control concurrent scans.')
uout.info(' --threads=<threads> number of threads to use when scanning multiple\n targets (-T/--targets) (default: 32)')
uout.info(' -v, --verbose verbose output') uout.info(' -v, --verbose verbose output')
uout.sep() uout.sep()
uout.write()
sys.exit(retval) sys.exit(retval)
def output_algorithms(title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: # pylint: disable=too-many-arguments
with OutputBuffer() as obuf: with out:
for algorithm in algorithms: for algorithm in algorithms:
program_retval = output_algorithm(alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes) program_retval = output_algorithm(out, alg_db, alg_type, algorithm, unknown_algs, program_retval, maxlen, alg_sizes)
if len(obuf) > 0 and not is_json_output: if not out.is_section_empty() and not is_json_output:
out.head('# ' + title) out.head('# ' + title)
obuf.flush() out.flush_section()
out.sep() out.sep()
return program_retval return program_retval
def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int: def output_algorithm(out: OutputBuffer, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, alg_name: str, unknown_algs: List[str], program_retval: int, alg_max_len: int = 0, alg_sizes: Optional[Dict[str, Tuple[int, int]]] = None) -> int:
prefix = '(' + alg_type + ') ' prefix = '(' + alg_type + ') '
if alg_max_len == 0: if alg_max_len == 0:
alg_max_len = len(alg_name) alg_max_len = len(alg_name)
@ -175,7 +178,7 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al
return program_retval return program_retval
def output_compatibility(algs: Algorithms, client_audit: bool, for_server: bool = True) -> None: def output_compatibility(out: OutputBuffer, algs: Algorithms, client_audit: bool, for_server: bool = True) -> None:
# Don't output any compatibility info if we're doing a client audit. # Don't output any compatibility info if we're doing a client audit.
if client_audit: if client_audit:
@ -205,7 +208,7 @@ def output_compatibility(algs: Algorithms, client_audit: bool, for_server: bool
out.good('(gen) compatibility: ' + ', '.join(comp_text)) out.good('(gen) compatibility: ' + ', '.join(comp_text))
def output_security_sub(sub: str, software: Optional[Software], client_audit: bool, padlen: int) -> None: def output_security_sub(out: OutputBuffer, sub: str, software: Optional[Software], client_audit: bool, padlen: int) -> None:
secdb = VersionVulnerabilityDB.CVE if sub == 'cve' else VersionVulnerabilityDB.TXT secdb = VersionVulnerabilityDB.CVE if sub == 'cve' else VersionVulnerabilityDB.TXT
if software is None or software.product not in secdb: if software is None or software.product not in secdb:
return return
@ -241,20 +244,20 @@ def output_security_sub(sub: str, software: Optional[Software], client_audit: bo
out.fail('(sec) {}{} -- {}'.format(name, p, descr)) out.fail('(sec) {}{} -- {}'.format(name, p, descr))
def output_security(banner: Optional[Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None: def output_security(out: OutputBuffer, banner: Optional[Banner], client_audit: bool, padlen: int, is_json_output: bool) -> None:
with OutputBuffer() as obuf: with out:
if banner is not None: if banner is not None:
software = Software.parse(banner) software = Software.parse(banner)
output_security_sub('cve', software, client_audit, padlen) output_security_sub(out, 'cve', software, client_audit, padlen)
output_security_sub('txt', software, client_audit, padlen) output_security_sub(out, 'txt', software, client_audit, padlen)
if len(obuf) > 0 and not is_json_output: if not out.is_section_empty() and not is_json_output:
out.head('# security') out.head('# security')
obuf.flush() out.flush_section()
out.sep() out.sep()
def output_fingerprints(algs: Algorithms, is_json_output: bool, sha256: bool = True) -> None: def output_fingerprints(out: OutputBuffer, algs: Algorithms, is_json_output: bool, sha256: bool = True) -> None:
with OutputBuffer() as obuf: with out:
fps = [] fps = []
if algs.ssh1kex is not None: if algs.ssh1kex is not None:
name = 'ssh-rsa1' name = 'ssh-rsa1'
@ -284,14 +287,14 @@ def output_fingerprints(algs: Algorithms, is_json_output: bool, sha256: bool = T
# p = '' if out.batch else ' ' * (padlen - len(name)) # p = '' if out.batch else ' ' * (padlen - len(name))
# out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo)) # out.good('(fin) {0}{1} -- {2} {3}'.format(name, p, bits, fpo))
out.good('(fin) {}: {}'.format(name, fpo)) out.good('(fin) {}: {}'.format(name, fpo))
if len(obuf) > 0 and not is_json_output: if not out.is_section_empty() and not is_json_output:
out.head('# fingerprints') out.head('# fingerprints')
obuf.flush() out.flush_section()
out.sep() out.sep()
# Returns True if no warnings or failures encountered in configuration. # Returns True if no warnings or failures encountered in configuration.
def output_recommendations(algs: Algorithms, software: Optional[Software], is_json_output: bool, padlen: int = 0) -> bool: def output_recommendations(out: OutputBuffer, algs: Algorithms, software: Optional[Software], is_json_output: bool, padlen: int = 0) -> bool:
ret = True ret = True
# PuTTY's algorithms cannot be modified, so there's no point in issuing recommendations. # PuTTY's algorithms cannot be modified, so there's no point in issuing recommendations.
@ -323,7 +326,7 @@ def output_recommendations(algs: Algorithms, software: Optional[Software], is_js
return ret return ret
for_server = True for_server = True
with OutputBuffer() as obuf: with out:
software, alg_rec = algs.get_recommendations(software, for_server) software, alg_rec = algs.get_recommendations(software, for_server)
for sshv in range(2, 0, -1): for sshv in range(2, 0, -1):
if sshv not in alg_rec: if sshv not in alg_rec:
@ -351,20 +354,20 @@ def output_recommendations(algs: Algorithms, software: Optional[Software], is_js
b = '(SSH{})'.format(sshv) if sshv == 1 else '' b = '(SSH{})'.format(sshv) if sshv == 1 else ''
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}' fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}'
fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b)) fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b))
if len(obuf) > 0 and not is_json_output: if not out.is_section_empty() and not is_json_output:
if software is not None: if software is not None:
title = '(for {})'.format(software.display(False)) title = '(for {})'.format(software.display(False))
else: else:
title = '' title = ''
out.head('# algorithm recommendations {}'.format(title)) out.head('# algorithm recommendations {}'.format(title))
obuf.flush(True) # Sort the output so that it is always stable (needed for repeatable testing). out.flush_section(sort_section=True) # Sort the output so that it is always stable (needed for repeatable testing).
out.sep() out.sep()
return ret return ret
# Output additional information & notes. # Output additional information & notes.
def output_info(software: Optional['Software'], client_audit: bool, any_problems: bool, is_json_output: bool) -> None: def output_info(out: OutputBuffer, software: Optional['Software'], client_audit: bool, any_problems: bool, is_json_output: bool) -> None:
with OutputBuffer() as obuf: with out:
# Tell user that PuTTY cannot be hardened at the protocol-level. # Tell user that PuTTY cannot be hardened at the protocol-level.
if client_audit and (software is not None) and (software.product == Product.PuTTY): if client_audit and (software is not None) and (software.product == Product.PuTTY):
out.warn('(nfo) PuTTY does not have the option of restricting any algorithms during the SSH handshake.') out.warn('(nfo) PuTTY does not have the option of restricting any algorithms during the SSH handshake.')
@ -373,20 +376,20 @@ def output_info(software: Optional['Software'], client_audit: bool, any_problems
if any_problems: if any_problems:
out.warn('(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>') out.warn('(nfo) For hardening guides on common OSes, please see: <https://www.ssh-audit.com/hardening_guides.html>')
if len(obuf) > 0 and not is_json_output: if not out.is_section_empty() and not is_json_output:
out.head('# additional info') out.head('# additional info')
obuf.flush() out.flush_section()
out.sep() out.sep()
# Returns a exitcodes.* flag to denote if any failures or warnings were encountered. # Returns a exitcodes.* flag to denote if any failures or warnings were encountered.
def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False) -> int: def output(out: OutputBuffer, aconf: AuditConf, banner: Optional[Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2_Kex] = None, pkm: Optional[SSH1_PublicKeyMessage] = None, print_target: bool = False) -> int:
program_retval = exitcodes.GOOD program_retval = exitcodes.GOOD
client_audit = client_host is not None # If set, this is a client audit. client_audit = client_host is not None # If set, this is a client audit.
sshv = 1 if pkm is not None else 2 sshv = 1 if pkm is not None else 2
algs = Algorithms(pkm, kex) algs = Algorithms(pkm, kex)
with OutputBuffer() as obuf: with out:
if print_target: if print_target:
host = aconf.host host = aconf.host
@ -416,7 +419,7 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client
out.good('(gen) software: {}'.format(software)) out.good('(gen) software: {}'.format(software))
else: else:
software = None software = None
output_compatibility(algs, client_audit) output_compatibility(out, algs, client_audit)
if kex is not None: if kex is not None:
compressions = [x for x in kex.server.compression if x != 'none'] compressions = [x for x in kex.server.compression if x != 'none']
if len(compressions) > 0: if len(compressions) > 0:
@ -424,12 +427,12 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client
else: else:
cmptxt = 'disabled' cmptxt = 'disabled'
out.good('(gen) compression: {}'.format(cmptxt)) out.good('(gen) compression: {}'.format(cmptxt))
if len(obuf) > 0 and not aconf.json: # Print output when it exists and JSON output isn't requested. if not out.is_section_empty() and not aconf.json: # Print output when it exists and JSON output isn't requested.
out.head('# general') out.head('# general')
obuf.flush() out.flush_section()
out.sep() out.sep()
maxlen = algs.maxlen + 1 maxlen = algs.maxlen + 1
output_security(banner, client_audit, maxlen, aconf.json) output_security(out, banner, client_audit, maxlen, aconf.json)
# Filled in by output_algorithms() with unidentified algs. # Filled in by output_algorithms() with unidentified algs.
unknown_algorithms: List[str] = [] unknown_algorithms: List[str] = []
if pkm is not None: if pkm is not None:
@ -437,34 +440,36 @@ def output(aconf: AuditConf, banner: Optional[Banner], header: List[str], client
ciphers = pkm.supported_ciphers ciphers = pkm.supported_ciphers
auths = pkm.supported_authentications auths = pkm.supported_authentications
title, atype = 'SSH1 host-key algorithms', 'key' title, atype = 'SSH1 host-key algorithms', 'key'
program_retval = output_algorithms(title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen) program_retval = output_algorithms(out, title, adb, atype, ['ssh-rsa1'], unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc' title, atype = 'SSH1 encryption algorithms (ciphers)', 'enc'
program_retval = output_algorithms(title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen) program_retval = output_algorithms(out, title, adb, atype, ciphers, unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'SSH1 authentication types', 'aut' title, atype = 'SSH1 authentication types', 'aut'
program_retval = output_algorithms(title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen) program_retval = output_algorithms(out, title, adb, atype, auths, unknown_algorithms, aconf.json, program_retval, maxlen)
if kex is not None: if kex is not None:
adb = SSH2_KexDB.ALGORITHMS adb = SSH2_KexDB.ALGORITHMS
title, atype = 'key exchange algorithms', 'kex' title, atype = 'key exchange algorithms', 'kex'
program_retval = output_algorithms(title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes()) program_retval = output_algorithms(out, title, adb, atype, kex.kex_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.dh_modulus_sizes())
title, atype = 'host-key algorithms', 'key' title, atype = 'host-key algorithms', 'key'
program_retval = output_algorithms(title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes()) program_retval = output_algorithms(out, title, adb, atype, kex.key_algorithms, unknown_algorithms, aconf.json, program_retval, maxlen, kex.rsa_key_sizes())
title, atype = 'encryption algorithms (ciphers)', 'enc' title, atype = 'encryption algorithms (ciphers)', 'enc'
program_retval = output_algorithms(title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen) program_retval = output_algorithms(out, title, adb, atype, kex.server.encryption, unknown_algorithms, aconf.json, program_retval, maxlen)
title, atype = 'message authentication code algorithms', 'mac' title, atype = 'message authentication code algorithms', 'mac'
program_retval = output_algorithms(title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen) program_retval = output_algorithms(out, title, adb, atype, kex.server.mac, unknown_algorithms, aconf.json, program_retval, maxlen)
output_fingerprints(algs, aconf.json, True) output_fingerprints(out, algs, aconf.json, True)
perfect_config = output_recommendations(algs, software, aconf.json, maxlen) perfect_config = output_recommendations(out, algs, software, aconf.json, maxlen)
output_info(software, client_audit, not perfect_config, aconf.json) output_info(out, software, client_audit, not perfect_config, aconf.json)
if aconf.json: if aconf.json:
print(json.dumps(build_struct(banner, kex=kex, client_host=client_host), sort_keys=True), end='' if len(aconf.target_list) > 0 else "\n") # Print the JSON of the audit info. Skip the newline at the end if multiple targets were given (since each audit dump will go into its own list entry). out.reset()
# Build & write the JSON struct.
out.info(json.dumps(build_struct(aconf.host, banner, kex=kex, client_host=client_host), sort_keys=True))
elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them. elif len(unknown_algorithms) > 0: # If we encountered any unknown algorithms, ask the user to report them.
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms)) out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
return program_retval return program_retval
def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: Optional[str], kex: Optional['SSH2_Kex'] = None) -> bool: def evaluate_policy(out: OutputBuffer, aconf: AuditConf, banner: Optional['Banner'], client_host: Optional[str], kex: Optional['SSH2_Kex'] = None) -> bool:
if aconf.policy is None: if aconf.policy is None:
raise RuntimeError('Internal error: cannot evaluate against null Policy!') raise RuntimeError('Internal error: cannot evaluate against null Policy!')
@ -472,11 +477,11 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O
passed, error_struct, error_str = aconf.policy.evaluate(banner, kex) passed, error_struct, error_str = aconf.policy.evaluate(banner, kex)
if aconf.json: if aconf.json:
json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': error_struct} json_struct = {'host': aconf.host, 'policy': aconf.policy.get_name_and_version(), 'passed': passed, 'errors': error_struct}
print(json.dumps(json_struct, sort_keys=True)) out.info(json.dumps(json_struct, sort_keys=True))
else: else:
spacing = '' spacing = ''
if aconf.client_audit: if aconf.client_audit:
print("Client IP: %s" % client_host) out.info("Client IP: %s" % client_host)
spacing = " " # So the fields below line up with 'Client IP: '. spacing = " " # So the fields below line up with 'Client IP: '.
else: else:
host = aconf.host host = aconf.host
@ -487,9 +492,9 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O
else: else:
host = '%s:%d' % (aconf.host, aconf.port) host = '%s:%d' % (aconf.host, aconf.port)
print("Host: %s" % host) out.info("Host: %s" % host)
print("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version())) out.info("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version()))
print("Result: %s" % spacing, end='') out.info("Result: %s" % spacing, line_ended=False)
# Use these nice unicode characters in the result message, unless we're on Windows (the cmd.exe terminal doesn't display them properly). # Use these nice unicode characters in the result message, unless we're on Windows (the cmd.exe terminal doesn't display them properly).
icon_good = "" icon_good = ""
@ -507,23 +512,25 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['Banner'], client_host: O
return passed return passed
def list_policies() -> None: def list_policies(out: OutputBuffer) -> None:
'''Prints a list of server & client policies.''' '''Prints a list of server & client policies.'''
server_policy_names, client_policy_names = Policy.list_builtin_policies() server_policy_names, client_policy_names = Policy.list_builtin_policies()
if len(server_policy_names) > 0: if len(server_policy_names) > 0:
out.head('\nServer policies:\n') out.head('\nServer policies:\n')
print(" * \"%s\"" % "\"\n * \"".join(server_policy_names)) out.info(" * \"%s\"" % "\"\n * \"".join(server_policy_names))
if len(client_policy_names) > 0: if len(client_policy_names) > 0:
out.head('\nClient policies:\n') out.head('\nClient policies:\n')
print(" * \"%s\"" % "\"\n * \"".join(client_policy_names)) out.info(" * \"%s\"" % "\"\n * \"".join(client_policy_names))
out.sep()
if len(server_policy_names) == 0 and len(client_policy_names) == 0: if len(server_policy_names) == 0 and len(client_policy_names) == 0:
print("Error: no built-in policies found!") out.fail("Error: no built-in policies found!")
else: else:
print("\nHint: Use -P and provide the full name of a policy to run a policy scan with.\n") out.info("\nHint: Use -P and provide the full name of a policy to run a policy scan with.\n")
out.write()
def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH2_Kex'], client_host: Optional[str]) -> None: def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH2_Kex'], client_host: Optional[str]) -> None:
@ -552,12 +559,12 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH
print("Error: file already exists: %s" % aconf.policy_file) print("Error: file already exists: %s" % aconf.policy_file)
def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements
# pylint: disable=too-many-branches # pylint: disable=too-many-branches
aconf = AuditConf() aconf = AuditConf()
try: try:
sopts = 'h1246M:p:P:jbcnvl:t:T:L' sopts = 'h1246M:p:P:jbcnvl:t:T:L'
lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup='] lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=']
opts, args = getopt.gnu_getopt(args, sopts, lopts) opts, args = getopt.gnu_getopt(args, sopts, lopts)
except getopt.GetoptError as err: except getopt.GetoptError as err:
usage_cb(str(err)) usage_cb(str(err))
@ -589,6 +596,7 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi
aconf.json = True aconf.json = True
elif o in ('-v', '--verbose'): elif o in ('-v', '--verbose'):
aconf.verbose = True aconf.verbose = True
out.verbose = True
elif o in ('-l', '--level'): elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'): if a not in ('info', 'warn', 'fail'):
usage_cb('level {} is not valid'.format(a)) usage_cb('level {} is not valid'.format(a))
@ -603,6 +611,8 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi
aconf.policy_file = a aconf.policy_file = a
elif o in ('-T', '--targets'): elif o in ('-T', '--targets'):
aconf.target_file = a aconf.target_file = a
elif o == '--threads':
aconf.threads = int(a)
elif o in ('-L', '--list-policies'): elif o in ('-L', '--list-policies'):
aconf.list_policies = True aconf.list_policies = True
elif o == '--lookup': elif o == '--lookup':
@ -615,7 +625,7 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi
return aconf return aconf
if aconf.list_policies: if aconf.list_policies:
list_policies() list_policies(out)
sys.exit(exitcodes.GOOD) sys.exit(exitcodes.GOOD)
if aconf.client_audit is False and aconf.target_file is None: if aconf.client_audit is False and aconf.target_file is None:
@ -659,23 +669,26 @@ def process_commandline(args: List[str], usage_cb: Callable[..., None]) -> 'Audi
try: try:
aconf.policy = Policy(policy_file=aconf.policy_file) aconf.policy = Policy(policy_file=aconf.policy_file)
except Exception as e: except Exception as e:
print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc())) out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
out.write()
sys.exit(exitcodes.UNKNOWN_ERROR) sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a client audit, but provided a server policy, terminate. # If the user wants to do a client audit, but provided a server policy, terminate.
if aconf.client_audit and aconf.policy.is_server_policy(): if aconf.client_audit and aconf.policy.is_server_policy():
print("Error: client audit selected, but server policy provided.") out.fail("Error: client audit selected, but server policy provided.")
out.write()
sys.exit(exitcodes.UNKNOWN_ERROR) sys.exit(exitcodes.UNKNOWN_ERROR)
# If the user wants to do a server audit, but provided a client policy, terminate. # If the user wants to do a server audit, but provided a client policy, terminate.
if aconf.client_audit is False and aconf.policy.is_server_policy() is False: if aconf.client_audit is False and aconf.policy.is_server_policy() is False:
print("Error: server audit selected, but client policy provided.") out.fail("Error: server audit selected, but client policy provided.")
out.write()
sys.exit(exitcodes.UNKNOWN_ERROR) sys.exit(exitcodes.UNKNOWN_ERROR)
return aconf return aconf
def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: def build_struct(target_host: str, banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, pkm: Optional['SSH1_PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any:
banner_str = '' banner_str = ''
banner_protocol = None banner_protocol = None
@ -695,8 +708,13 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p
"comments": banner_comments, "comments": banner_comments,
}, },
} }
# If we're scanning a client host, put the client's IP into the results. Otherwise, include the target host.
if client_host is not None: if client_host is not None:
res['client_ip'] = client_host res['client_ip'] = client_host
else:
res['target'] = target_host
if kex is not None: if kex is not None:
res['compression'] = kex.server.compression res['compression'] = kex.server.compression
@ -773,7 +791,7 @@ def build_struct(banner: Optional['Banner'], kex: Optional['SSH2_Kex'] = None, p
# Returns one of the exitcodes.* flags. # Returns one of the exitcodes.* flags.
def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int:
program_retval = exitcodes.GOOD program_retval = exitcodes.GOOD
out.batch = aconf.batch out.batch = aconf.batch
out.verbose = aconf.verbose out.verbose = aconf.verbose
@ -781,12 +799,20 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
out.use_colors = aconf.colors out.use_colors = aconf.colors
s = SSH_Socket(aconf.host, aconf.port, aconf.ipvo, aconf.timeout, aconf.timeout_set) s = SSH_Socket(aconf.host, aconf.port, aconf.ipvo, aconf.timeout, aconf.timeout_set)
if aconf.client_audit: if aconf.client_audit:
out.v("Listening for client connection on port %d..." % aconf.port, write_now=True)
s.listen_and_accept() s.listen_and_accept()
else: else:
out.v("Connecting to %s:%d..." % ('[%s]' % aconf.host if Utils.is_ipv6_address(aconf.host) else aconf.host, aconf.port), write_now=True)
err = s.connect() err = s.connect()
if err is not None: if err is not None:
out.fail(err) out.fail(err)
sys.exit(exitcodes.CONNECTION_ERROR)
# If we're running against multiple targets, return a connection error to the calling worker thread. Otherwise, write the error message to the console and exit.
if len(aconf.target_list) > 0:
return exitcodes.CONNECTION_ERROR
else:
out.write()
sys.exit(exitcodes.CONNECTION_ERROR)
if sshv is None: if sshv is None:
sshv = 2 if aconf.ssh2 else 1 sshv = 2 if aconf.ssh2 else 1
@ -811,7 +837,9 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1]) payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1])
if payload_txt == u'Protocol major versions differ.': if payload_txt == u'Protocol major versions differ.':
if sshv == 2 and aconf.ssh1: if sshv == 2 and aconf.ssh1:
return audit(aconf, 1) ret = audit(out, aconf, 1)
out.write()
return ret
err = '[exception] error reading packet ({})'.format(payload_txt) err = '[exception] error reading packet ({})'.format(payload_txt)
else: else:
err_pair = None err_pair = None
@ -824,11 +852,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
'instead received unknown message ({2})' 'instead received unknown message ({2})'
err = fmt.format(err_pair[0], err_pair[1], packet_type) err = fmt.format(err_pair[0], err_pair[1], packet_type)
if err is not None: if err is not None:
output(aconf, banner, header) output(out, aconf, banner, header)
out.fail(err) out.fail(err)
return exitcodes.CONNECTION_ERROR return exitcodes.CONNECTION_ERROR
if sshv == 1: if sshv == 1:
program_retval = output(aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload)) program_retval = output(out, aconf, banner, header, pkm=SSH1_PublicKeyMessage.parse(payload))
elif sshv == 2: elif sshv == 2:
kex = SSH2_Kex.parse(payload) kex = SSH2_Kex.parse(payload)
if aconf.client_audit is False: if aconf.client_audit is False:
@ -837,11 +865,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
# This is a standard audit scan. # This is a standard audit scan.
if (aconf.policy is None) and (aconf.make_policy is False): if (aconf.policy is None) and (aconf.make_policy is False):
program_retval = output(aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target) program_retval = output(out, aconf, banner, header, client_host=s.client_host, kex=kex, print_target=print_target)
# This is a policy test. # This is a policy test.
elif (aconf.policy is not None) and (aconf.make_policy is False): elif (aconf.policy is not None) and (aconf.make_policy is False):
program_retval = exitcodes.GOOD if evaluate_policy(aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE program_retval = exitcodes.GOOD if evaluate_policy(out, aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE
# A new policy should be made from this scan. # A new policy should be made from this scan.
elif (aconf.policy is None) and (aconf.make_policy is True): elif (aconf.policy is None) and (aconf.make_policy is True):
@ -853,7 +881,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
return program_retval return program_retval
def algorithm_lookup(alg_names: str) -> int: def algorithm_lookup(out: OutputBuffer, alg_names: str) -> int:
'''Looks up a comma-separated list of algorithms and outputs their security properties. Returns an exitcodes.* flag.''' '''Looks up a comma-separated list of algorithms and outputs their security properties. Returns an exitcodes.* flag.'''
retval = exitcodes.GOOD retval = exitcodes.GOOD
alg_types = { alg_types = {
@ -885,7 +913,7 @@ def algorithm_lookup(alg_names: str) -> int:
for alg_type in alg_types: for alg_type in alg_types:
if len(algorithms_dict[alg_type]) > 0: if len(algorithms_dict[alg_type]) > 0:
title = str(alg_types.get(alg_type)) title = str(alg_types.get(alg_type))
retval = output_algorithms(title, adb, alg_type, list(algorithms_dict[alg_type]), unknown_algorithms, False, retval, padding) retval = output_algorithms(out, title, adb, alg_type, list(algorithms_dict[alg_type]), unknown_algorithms, False, retval, padding)
algorithms_dict_flattened = [ algorithms_dict_flattened = [
alg_name alg_name
@ -915,7 +943,7 @@ def algorithm_lookup(alg_names: str) -> int:
for algorithm_not_found in algorithms_not_found: for algorithm_not_found in algorithms_not_found:
out.fail(algorithm_not_found) out.fail(algorithm_not_found)
print() out.sep()
if len(similar_algorithms) > 0: if len(similar_algorithms) > 0:
retval = exitcodes.FAILURE retval = exitcodes.FAILURE
@ -926,14 +954,45 @@ def algorithm_lookup(alg_names: str) -> int:
return retval return retval
out = Output() # Worker thread for scanning multiple targets concurrently.
def target_worker_thread(host: str, port: int, shared_aconf: AuditConf) -> Tuple[int, str]:
ret = -1
string_output = ''
out = OutputBuffer()
out.verbose = shared_aconf.verbose
my_aconf = copy.deepcopy(shared_aconf)
my_aconf.host = host
my_aconf.port = port
# If we're outputting JSON, turn off colors and ensure 'info' level messages go through.
if my_aconf.json:
out.json = True
out.use_colors = False
out.v("Running against: %s:%d..." % (my_aconf.host, my_aconf.port), write_now=True)
try:
ret = audit(out, my_aconf, print_target=True)
string_output = out.get_buffer()
except Exception:
ret = -1
string_output = "An exception occurred while scanning %s:%d:\n%s" % (host, port, str(traceback.format_exc()))
return ret, string_output
def main() -> int: def main() -> int:
aconf = process_commandline(sys.argv[1:], usage) out = OutputBuffer()
aconf = process_commandline(out, sys.argv[1:], usage)
# If we're outputting JSON, turn off colors and ensure 'info' level messages go through.
if aconf.json:
out.json = True
out.use_colors = False
if aconf.lookup != '': if aconf.lookup != '':
retval = algorithm_lookup(aconf.lookup) retval = algorithm_lookup(out, aconf.lookup)
out.write()
sys.exit(retval) sys.exit(retval)
# If multiple targets were specified... # If multiple targets were specified...
@ -945,31 +1004,46 @@ def main() -> int:
print('[', end='') print('[', end='')
# Loop through each target in the list. # Loop through each target in the list.
for i, target in enumerate(aconf.target_list): target_servers = []
aconf.host, port = Utils.parse_host_and_port(target) for _, target in enumerate(aconf.target_list):
if port == 0: host, port = Utils.parse_host_and_port(target, default_port=22)
port = 22 target_servers.append((host, port))
aconf.port = port
new_ret = audit(aconf, print_target=True) # A ranked list of return codes. Those with higher indices will take precendence over lower ones. For example, if three servers are scanned, yielding WARNING, GOOD, and UNKNOWN_ERROR, the overall result will be UNKNOWN_ERROR, since its index is the highest. Errors have highest priority, followed by failures, then warnings.
ranked_return_codes = [exitcodes.GOOD, exitcodes.WARNING, exitcodes.FAILURE, exitcodes.CONNECTION_ERROR, exitcodes.UNKNOWN_ERROR]
# Set the return value only if an unknown error occurred, a failure occurred, or if a warning occurred and the previous value was good. # Queue all worker threads.
if (new_ret == exitcodes.UNKNOWN_ERROR) or (new_ret == exitcodes.FAILURE) or ((new_ret == exitcodes.WARNING) and (ret == exitcodes.GOOD)): num_target_servers = len(target_servers)
ret = new_ret num_processed = 0
out.v("Scanning %u targets with %s%u threads..." % (num_target_servers, '(at most) ' if aconf.threads > num_target_servers else '', aconf.threads), write_now=True)
with concurrent.futures.ThreadPoolExecutor(max_workers=aconf.threads) as executor:
future_to_server = {executor.submit(target_worker_thread, target_server[0], target_server[1], aconf): target_server for target_server in target_servers}
for future in concurrent.futures.as_completed(future_to_server):
worker_ret, worker_output = future.result()
# Don't print a delimiter after the last target was handled. # If this worker's return code is ranked higher that what we've cached so far, update our cache.
if i + 1 != len(aconf.target_list): if ranked_return_codes.index(worker_ret) > ranked_return_codes.index(ret):
if aconf.json: ret = worker_ret
print(", ", end='')
else: # print("Worker for %s:%d returned %d: [%s]" % (target_server[0], target_server[1], worker_ret, worker_output))
print(("-" * 80) + "\n") print(worker_output, end='' if aconf.json else "\n")
# Don't print a delimiter after the last target was handled.
num_processed += 1
if num_processed < num_target_servers:
if aconf.json:
print(", ", end='')
else:
print(("-" * 80) + "\n")
if aconf.json: if aconf.json:
print(']') print(']')
return ret else: # Just a scan against a single target.
else: ret = audit(out, aconf)
return audit(aconf) out.write()
return ret
if __name__ == '__main__': # pragma: nocover if __name__ == '__main__': # pragma: nocover

View File

@ -36,7 +36,7 @@ from typing import Callable, Optional, Union, Any # noqa: F401
from ssh_audit import exitcodes from ssh_audit import exitcodes
from ssh_audit.banner import Banner from ssh_audit.banner import Banner
from ssh_audit.globals import SSH_HEADER from ssh_audit.globals import SSH_HEADER
from ssh_audit.output import Output from ssh_audit.outputbuffer import OutputBuffer
from ssh_audit.protocol import Protocol from ssh_audit.protocol import Protocol
from ssh_audit.readbuf import ReadBuf from ssh_audit.readbuf import ReadBuf
from ssh_audit.ssh1 import SSH1 from ssh_audit.ssh1 import SSH1
@ -95,7 +95,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
if not check or socktype == socket.SOCK_STREAM: if not check or socktype == socket.SOCK_STREAM:
yield af, addr yield af, addr
except socket.error as e: except socket.error as e:
Output().fail('[exception] {}'.format(e)) OutputBuffer().fail('[exception] {}'.format(e)).write()
sys.exit(exitcodes.CONNECTION_ERROR) sys.exit(exitcodes.CONNECTION_ERROR)
# Listens on a server socket and accepts one connection (used for # Listens on a server socket and accepts one connection (used for
@ -273,7 +273,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
payload_length = packet_length - padding_length - 1 payload_length = packet_length - padding_length - 1
check_size = 4 + 1 + payload_length + padding_length check_size = 4 + 1 + payload_length + padding_length
if check_size % self.__block_size != 0: if check_size % self.__block_size != 0:
Output().fail('[exception] invalid ssh packet (block size)') OutputBuffer().fail('[exception] invalid ssh packet (block size)').write()
sys.exit(exitcodes.CONNECTION_ERROR) sys.exit(exitcodes.CONNECTION_ERROR)
self.ensure_read(payload_length) self.ensure_read(payload_length)
if sshv == 1: if sshv == 1:
@ -288,7 +288,7 @@ class SSH_Socket(ReadBuf, WriteBuf):
if sshv == 1: if sshv == 1:
rcrc = SSH1.crc32(padding + payload) rcrc = SSH1.crc32(padding + payload)
if crc != rcrc: if crc != rcrc:
Output().fail('[exception] packet checksum CRC32 mismatch.') OutputBuffer().fail('[exception] packet checksum CRC32 mismatch.').write()
sys.exit(exitcodes.CONNECTION_ERROR) sys.exit(exitcodes.CONNECTION_ERROR)
else: else:
self.ensure_read(padding_length) self.ensure_read(padding_length)

View File

@ -129,10 +129,10 @@ class Utils:
return -1.0 return -1.0
@staticmethod @staticmethod
def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: def parse_host_and_port(host_and_port: str, default_port: int = 0) -> Tuple[str, int]:
'''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' '''Parses a string into a tuple of its host and port. The port is 0 if not specified.'''
host = host_and_port host = host_and_port
port = 0 port = default_port
mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port)
if mx is not None: if mx is not None:

View File

@ -94,7 +94,12 @@ The timeout, in seconds, for creating connections and reading data from the sock
.TP .TP
.B -T, \-\-targets=<hosts.txt> .B -T, \-\-targets=<hosts.txt>
.br .br
A file containing a list of target hosts. Each line must have one host, in the format of HOST[:PORT]. A file containing a list of target hosts. Each line must have one host, in the format of HOST[:PORT]. Use --threads to control concurrent scans.
.TP
.B \-\-threads=<threads>
.br
The number of threads to use when scanning multiple targets (with -T/--targets). Default is 32.
.TP .TP
.B -v, \-\-verbose .B -v, \-\-verbose

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-dropbear_2019.78", "software": "dropbear_2019.78"}, "compression": ["zlib@openssh.com", "none"], "enc": ["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc", "3des-ctr", "3des-cbc"], "fingerprints": [{"fp": "SHA256:CDfAU12pjQS7/91kg7gYacza0U/6PDbE04Ic3IpYxkM", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "kexguess2@matt.ucc.asn.au"}], "key": [{"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-sha1-96", "hmac-sha1", "hmac-sha2-256"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-dropbear_2019.78", "software": "dropbear_2019.78"}, "compression": ["zlib@openssh.com", "none"], "enc": ["aes128-ctr", "aes256-ctr", "aes128-cbc", "aes256-cbc", "3des-ctr", "3des-cbc"], "fingerprints": [{"fp": "SHA256:CDfAU12pjQS7/91kg7gYacza0U/6PDbE04Ic3IpYxkM", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "kexguess2@matt.ucc.asn.au"}], "key": [{"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-sha1-96", "hmac-sha1", "hmac-sha2-256"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [1, 99], "raw": "SSH-1.99-OpenSSH_4.0", "software": "OpenSSH_4.0"}, "compression": ["none", "zlib"], "enc": ["aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "arcfour", "aes192-cbc", "aes256-cbc", "rijndael-cbc@lysator.liu.se", "aes128-ctr", "aes192-ctr", "aes256-ctr"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} {"banner": {"comments": null, "protocol": [1, 99], "raw": "SSH-1.99-OpenSSH_4.0", "software": "OpenSSH_4.0"}, "compression": ["none", "zlib"], "enc": ["aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "arcfour", "aes192-cbc", "aes256-cbc", "rijndael-cbc@lysator.liu.se", "aes128-ctr", "aes192-ctr", "aes256-ctr"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-dss"}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:YZ457EBcJTSxRKI3yXRgtAj3PBf5B9/F36b1SVooml4", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 1024}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 1024}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 1024, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_5.6", "software": "OpenSSH_5.6"}, "compression": ["none", "zlib@openssh.com"], "enc": ["aes128-ctr", "aes192-ctr", "aes256-ctr", "arcfour256", "arcfour128", "aes128-cbc", "3des-cbc", "blowfish-cbc", "cast128-cbc", "aes192-cbc", "aes256-cbc", "arcfour", "rijndael-cbc@lysator.liu.se"], "fingerprints": [{"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 1024}, {"algorithm": "diffie-hellman-group-exchange-sha1", "keysize": 1024}, {"algorithm": "diffie-hellman-group14-sha1"}, {"algorithm": "diffie-hellman-group1-sha1"}], "key": [{"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ssh-rsa-cert-v01@openssh.com", "casize": 3072, "keysize": 3072}], "mac": ["hmac-md5", "hmac-sha1", "umac-64@openssh.com", "hmac-ripemd160", "hmac-ripemd160@openssh.com", "hmac-sha1-96", "hmac-md5-96"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}, {"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "rsa-sha2-512", "keysize": 3072}, {"algorithm": "rsa-sha2-256", "keysize": 3072}, {"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-ed25519"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}, {"fp": "SHA256:nsWtdJ9Z67Vrf7OsUzQov7esXhsWAfVppArGh25u244", "type": "ssh-rsa"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "rsa-sha2-512", "keysize": 3072}, {"algorithm": "rsa-sha2-256", "keysize": 3072}, {"algorithm": "ssh-rsa", "keysize": 3072}, {"algorithm": "ecdsa-sha2-nistp256"}, {"algorithm": "ssh-ed25519"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "ssh-ed25519"}, {"algorithm": "ssh-ed25519-cert-v01@openssh.com"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes128-ctr", "aes192-ctr", "aes256-ctr", "aes128-gcm@openssh.com", "aes256-gcm@openssh.com"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "ecdh-sha2-nistp256"}, {"algorithm": "ecdh-sha2-nistp384"}, {"algorithm": "ecdh-sha2-nistp521"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}, {"algorithm": "diffie-hellman-group16-sha512"}, {"algorithm": "diffie-hellman-group18-sha512"}, {"algorithm": "diffie-hellman-group14-sha256"}, {"algorithm": "diffie-hellman-group14-sha1"}], "key": [{"algorithm": "ssh-ed25519"}, {"algorithm": "ssh-ed25519-cert-v01@openssh.com"}], "mac": ["umac-64-etm@openssh.com", "umac-128-etm@openssh.com", "hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "hmac-sha1-etm@openssh.com", "umac-64@openssh.com", "umac-128@openssh.com", "hmac-sha2-256", "hmac-sha2-512", "hmac-sha1"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes256-gcm@openssh.com", "aes128-gcm@openssh.com", "aes256-ctr", "aes192-ctr", "aes128-ctr"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "umac-128-etm@openssh.com"]} {"banner": {"comments": null, "protocol": [2, 0], "raw": "SSH-2.0-OpenSSH_8.0", "software": "OpenSSH_8.0"}, "compression": ["none", "zlib@openssh.com"], "enc": ["chacha20-poly1305@openssh.com", "aes256-gcm@openssh.com", "aes128-gcm@openssh.com", "aes256-ctr", "aes192-ctr", "aes128-ctr"], "fingerprints": [{"fp": "SHA256:UrnXIVH+7dlw8UqYocl48yUEcKrthGDQG2CPCgp7MxU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "diffie-hellman-group-exchange-sha256", "keysize": 2048}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256-etm@openssh.com", "hmac-sha2-512-etm@openssh.com", "umac-128-etm@openssh.com"], "target": "localhost"}

View File

@ -1 +1 @@
{"banner": {"comments": "", "protocol": [2, 0], "raw": "", "software": "tinyssh_noversion"}, "compression": ["none"], "enc": ["chacha20-poly1305@openssh.com"], "fingerprints": [{"fp": "SHA256:89ocln1x7KNqnMgWffGoYtD70ksJ4FrH7BMJHa7SrwU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "sntrup4591761x25519-sha512@tinyssh.org"}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256"]} {"banner": {"comments": "", "protocol": [2, 0], "raw": "", "software": "tinyssh_noversion"}, "compression": ["none"], "enc": ["chacha20-poly1305@openssh.com"], "fingerprints": [{"fp": "SHA256:89ocln1x7KNqnMgWffGoYtD70ksJ4FrH7BMJHa7SrwU", "type": "ssh-ed25519"}], "kex": [{"algorithm": "curve25519-sha256"}, {"algorithm": "curve25519-sha256@libssh.org"}, {"algorithm": "sntrup4591761x25519-sha512@tinyssh.org"}], "key": [{"algorithm": "ssh-ed25519"}], "mac": ["hmac-sha2-256"], "target": "localhost"}

View File

@ -7,6 +7,7 @@ class TestAuditConf:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
self.OutputBuffer = ssh_audit.OutputBuffer
self.usage = ssh_audit.usage self.usage = ssh_audit.usage
self.process_commandline = process_commandline self.process_commandline = process_commandline
@ -127,7 +128,7 @@ class TestAuditConf:
def test_audit_conf_process_commandline(self): def test_audit_conf_process_commandline(self):
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
c = lambda x: self.process_commandline(x.split(), self.usage) # noqa c = lambda x: self.process_commandline(self.OutputBuffer, x.split(), self.usage) # noqa
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
conf = c('') conf = c('')
with pytest.raises(SystemExit): with pytest.raises(SystemExit):

View File

@ -35,7 +35,7 @@ def test_prevent_runtime_error_regression(ssh_audit, kex):
kex.set_host_key("ssh-rsa7", b"\x00\x00\x00\x07ssh-rsa\x00\x00\x00") kex.set_host_key("ssh-rsa7", b"\x00\x00\x00\x07ssh-rsa\x00\x00\x00")
kex.set_host_key("ssh-rsa8", b"\x00\x00\x00\x07ssh-rsa\x00\x00\x00") kex.set_host_key("ssh-rsa8", b"\x00\x00\x00\x07ssh-rsa\x00\x00\x00")
rv = ssh_audit.build_struct(banner=None, kex=kex) rv = ssh_audit.build_struct('localhost', banner=None, kex=kex)
assert len(rv["fingerprints"]) == 9 assert len(rv["fingerprints"]) == 9

View File

@ -2,12 +2,15 @@ import socket
import errno import errno
import pytest import pytest
from ssh_audit.outputbuffer import OutputBuffer
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
class TestErrors: class TestErrors:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
self.OutputBuffer = ssh_audit.OutputBuffer
self.audit = ssh_audit.audit self.audit = ssh_audit.audit
def _conf(self): def _conf(self):
@ -21,14 +24,21 @@ class TestErrors:
conf = self._conf() conf = self._conf()
spy.begin() spy.begin()
out = OutputBuffer()
if exit_expected: if exit_expected:
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
self.audit(conf) self.audit(out, conf)
else: else:
ret = self.audit(conf) ret = self.audit(out, conf)
assert ret != 0 assert ret != 0
out.write()
lines = spy.flush() lines = spy.flush()
# If the last line is empty, delete it.
if len(lines) > 1 and lines[-1] == '':
del lines[-1]
return lines return lines
def test_connection_unresolved(self, output_spy, virtual_socket): def test_connection_unresolved(self, output_spy, virtual_socket):

View File

@ -2,102 +2,107 @@ import pytest
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
class TestOutput: class TestOutputBuffer:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.Output = ssh_audit.Output
self.OutputBuffer = ssh_audit.OutputBuffer self.OutputBuffer = ssh_audit.OutputBuffer
def test_output_buffer_no_lines(self, output_spy): def test_outputbuffer_no_lines(self, output_spy):
output_spy.begin() output_spy.begin()
with self.OutputBuffer() as obuf: obuf = self.OutputBuffer()
pass obuf.write()
assert output_spy.flush() == [] assert output_spy.flush() == ['']
output_spy.begin() output_spy.begin()
with self.OutputBuffer() as obuf:
pass
obuf.flush()
assert output_spy.flush() == []
def test_output_buffer_no_flush(self, output_spy): def test_outputbuffer_defaults(self):
output_spy.begin() obuf = self.OutputBuffer()
with self.OutputBuffer():
print('abc')
assert output_spy.flush() == []
def test_output_buffer_flush(self, output_spy):
output_spy.begin()
with self.OutputBuffer() as obuf:
print('abc')
print()
print('def')
obuf.flush()
assert output_spy.flush() == ['abc', '', 'def']
def test_output_defaults(self):
out = self.Output()
# default: on # default: on
assert out.batch is False assert obuf.batch is False
assert out.use_colors is True assert obuf.use_colors is True
assert out.level == 'info' assert obuf.level == 'info'
def test_output_colors(self, output_spy): def test_outputbuffer_colors(self, output_spy):
out = self.Output() out = self.OutputBuffer()
# test without colors
# Test without colors.
out.use_colors = False out.use_colors = False
output_spy.begin() output_spy.begin()
out.info('info color') out.info('info color')
out.write()
assert output_spy.flush() == ['info color'] assert output_spy.flush() == ['info color']
output_spy.begin() output_spy.begin()
out.head('head color') out.head('head color')
out.write()
assert output_spy.flush() == ['head color'] assert output_spy.flush() == ['head color']
output_spy.begin() output_spy.begin()
out.good('good color') out.good('good color')
out.write()
assert output_spy.flush() == ['good color'] assert output_spy.flush() == ['good color']
output_spy.begin() output_spy.begin()
out.warn('warn color') out.warn('warn color')
out.write()
assert output_spy.flush() == ['warn color'] assert output_spy.flush() == ['warn color']
output_spy.begin() output_spy.begin()
out.fail('fail color') out.fail('fail color')
out.write()
assert output_spy.flush() == ['fail color'] assert output_spy.flush() == ['fail color']
# If colors aren't supported by this system, skip the color tests.
if not out.colors_supported: if not out.colors_supported:
return return
# test with colors
# Test with colors.
out.use_colors = True out.use_colors = True
output_spy.begin() output_spy.begin()
out.info('info color') out.info('info color')
out.write()
assert output_spy.flush() == ['info color'] assert output_spy.flush() == ['info color']
output_spy.begin() output_spy.begin()
out.head('head color') out.head('head color')
assert output_spy.flush() == ['\x1b[0;36mhead color\x1b[0m'] out.write()
assert output_spy.flush() in [['\x1b[0;36mhead color\x1b[0m'], ['\x1b[0;96mhead color\x1b[0m']]
output_spy.begin() output_spy.begin()
out.good('good color') out.good('good color')
assert output_spy.flush() == ['\x1b[0;32mgood color\x1b[0m'] out.write()
assert output_spy.flush() in [['\x1b[0;32mgood color\x1b[0m'], ['\x1b[0;92mgood color\x1b[0m']]
output_spy.begin() output_spy.begin()
out.warn('warn color') out.warn('warn color')
assert output_spy.flush() == ['\x1b[0;33mwarn color\x1b[0m'] out.write()
assert output_spy.flush() in [['\x1b[0;33mwarn color\x1b[0m'], ['\x1b[0;93mwarn color\x1b[0m']]
output_spy.begin() output_spy.begin()
out.fail('fail color') out.fail('fail color')
assert output_spy.flush() == ['\x1b[0;31mfail color\x1b[0m'] out.write()
assert output_spy.flush() in [['\x1b[0;31mfail color\x1b[0m'], ['\x1b[0;91mfail color\x1b[0m']]
def test_output_sep(self, output_spy): def test_outputbuffer_sep(self, output_spy):
out = self.Output() out = self.OutputBuffer()
output_spy.begin() output_spy.begin()
out.sep() out.sep()
out.sep() out.sep()
out.sep() out.sep()
out.write()
assert output_spy.flush() == ['', '', ''] assert output_spy.flush() == ['', '', '']
def test_output_levels(self): def test_outputbuffer_levels(self):
out = self.Output() out = self.OutputBuffer()
assert out.get_level('info') == 0 assert out.get_level('info') == 0
assert out.get_level('good') == 0 assert out.get_level('good') == 0
assert out.get_level('warn') == 1 assert out.get_level('warn') == 1
assert out.get_level('fail') == 2 assert out.get_level('fail') == 2
assert out.get_level('unknown') > 2 assert out.get_level('unknown') > 2
def test_output_level_property(self): def test_outputbuffer_level_property(self):
out = self.Output() out = self.OutputBuffer()
out.level = 'info' out.level = 'info'
assert out.level == 'info' assert out.level == 'info'
out.level = 'good' out.level = 'good'
@ -109,8 +114,8 @@ class TestOutput:
out.level = 'invalid level' out.level = 'invalid level'
assert out.level == 'unknown' assert out.level == 'unknown'
def test_output_level(self, output_spy): def test_outputbuffer_level(self, output_spy):
out = self.Output() out = self.OutputBuffer()
# visible: all # visible: all
out.level = 'info' out.level = 'info'
output_spy.begin() output_spy.begin()
@ -119,6 +124,7 @@ class TestOutput:
out.good('good color') out.good('good color')
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
out.write()
assert len(output_spy.flush()) == 5 assert len(output_spy.flush()) == 5
# visible: head, warn, fail # visible: head, warn, fail
out.level = 'warn' out.level = 'warn'
@ -128,6 +134,7 @@ class TestOutput:
out.good('good color') out.good('good color')
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
out.write()
assert len(output_spy.flush()) == 3 assert len(output_spy.flush()) == 3
# visible: head, fail # visible: head, fail
out.level = 'fail' out.level = 'fail'
@ -137,6 +144,7 @@ class TestOutput:
out.good('good color') out.good('good color')
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
out.write()
assert len(output_spy.flush()) == 2 assert len(output_spy.flush()) == 2
# visible: head # visible: head
out.level = 'invalid level' out.level = 'invalid level'
@ -146,10 +154,11 @@ class TestOutput:
out.good('good color') out.good('good color')
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
out.write()
assert len(output_spy.flush()) == 1 assert len(output_spy.flush()) == 1
def test_output_batch(self, output_spy): def test_outputbuffer_batch(self, output_spy):
out = self.Output() out = self.OutputBuffer()
# visible: all # visible: all
output_spy.begin() output_spy.begin()
out.level = 'info' out.level = 'info'
@ -159,6 +168,7 @@ class TestOutput:
out.good('good color') out.good('good color')
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
out.write()
assert len(output_spy.flush()) == 5 assert len(output_spy.flush()) == 5
# visible: all except head # visible: all except head
output_spy.begin() output_spy.begin()
@ -169,4 +179,5 @@ class TestOutput:
out.good('good color') out.good('good color')
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
out.write()
assert len(output_spy.flush()) == 4 assert len(output_spy.flush()) == 4

View File

@ -3,6 +3,7 @@ import pytest
from ssh_audit.auditconf import AuditConf from ssh_audit.auditconf import AuditConf
from ssh_audit.fingerprint import Fingerprint from ssh_audit.fingerprint import Fingerprint
from ssh_audit.outputbuffer import OutputBuffer
from ssh_audit.protocol import Protocol from ssh_audit.protocol import Protocol
from ssh_audit.readbuf import ReadBuf from ssh_audit.readbuf import ReadBuf
from ssh_audit.ssh1 import SSH1 from ssh_audit.ssh1 import SSH1
@ -15,6 +16,7 @@ from ssh_audit.writebuf import WriteBuf
class TestSSH1: class TestSSH1:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.OutputBuffer = OutputBuffer
self.protocol = Protocol self.protocol = Protocol
self.ssh1 = SSH1 self.ssh1 = SSH1
self.PublicKeyMessage = SSH1_PublicKeyMessage self.PublicKeyMessage = SSH1_PublicKeyMessage
@ -132,9 +134,11 @@ class TestSSH1:
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush())) vsocket.rdata.append(self._create_ssh1_packet(w.write_flush()))
output_spy.begin() output_spy.begin()
self.audit(self._conf()) out = self.OutputBuffer()
self.audit(out, self._conf())
out.write()
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 14 assert len(lines) == 15
def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket): def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
@ -144,10 +148,12 @@ class TestSSH1:
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush())) vsocket.rdata.append(self._create_ssh1_packet(w.write_flush()))
output_spy.begin() output_spy.begin()
ret = self.audit(self._conf()) out = self.OutputBuffer()
ret = self.audit(out, self._conf())
out.write()
assert ret != 0 assert ret != 0
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 8 assert len(lines) == 9
assert 'unknown message' in lines[-1] assert 'unknown message' in lines[-1]
def test_ssh1_server_invalid_checksum(self, output_spy, virtual_socket): def test_ssh1_server_invalid_checksum(self, output_spy, virtual_socket):
@ -158,8 +164,10 @@ class TestSSH1:
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush(), False)) vsocket.rdata.append(self._create_ssh1_packet(w.write_flush(), False))
output_spy.begin() output_spy.begin()
out = self.OutputBuffer()
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
self.audit(self._conf()) self.audit(out, self._conf())
out.write()
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 1 assert len(lines) == 3
assert 'checksum' in lines[-1] assert ('checksum' in lines[0]) or ('checksum' in lines[1]) or ('checksum' in lines[2])

View File

@ -3,6 +3,7 @@ import struct
import pytest import pytest
from ssh_audit.auditconf import AuditConf from ssh_audit.auditconf import AuditConf
from ssh_audit.outputbuffer import OutputBuffer
from ssh_audit.protocol import Protocol from ssh_audit.protocol import Protocol
from ssh_audit.readbuf import ReadBuf from ssh_audit.readbuf import ReadBuf
from ssh_audit.ssh2_kex import SSH2_Kex from ssh_audit.ssh2_kex import SSH2_Kex
@ -15,6 +16,7 @@ from ssh_audit.writebuf import WriteBuf
class TestSSH2: class TestSSH2:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.OutputBuffer = OutputBuffer
self.protocol = Protocol self.protocol = Protocol
self.ssh2_kex = SSH2_Kex self.ssh2_kex = SSH2_Kex
self.ssh2_kexparty = SSH2_KexParty self.ssh2_kexparty = SSH2_KexParty
@ -141,9 +143,11 @@ class TestSSH2:
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush())) vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin() output_spy.begin()
self.audit(self._conf()) out = self.OutputBuffer()
self.audit(out, self._conf())
out.write()
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 68 assert len(lines) == 69
def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket): def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
@ -152,8 +156,10 @@ class TestSSH2:
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush())) vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin() output_spy.begin()
ret = self.audit(self._conf()) out = self.OutputBuffer()
ret = self.audit(out, self._conf())
out.write()
assert ret != 0 assert ret != 0
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 4 assert len(lines) == 5
assert 'unknown message' in lines[-1] assert 'unknown message' in lines[-1]