forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth_webbrowser.py
176 lines (153 loc) · 6.34 KB
/
auth_webbrowser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2018 Snowflake Computing Inc. All right reserved.
#
import json
import logging
import socket
import webbrowser
from .auth import Auth, AuthByExternalService, EXTERNAL_BROWSER_AUTHENTICATOR
from .compat import (unquote)
from .errorcode import (ER_UNABLE_TO_OPEN_BROWSER, ER_IDP_CONNECTION_ERROR)
from .network import (CONTENT_TYPE_APPLICATION_JSON,
PYTHON_CONNECTOR_USER_AGENT)
logger = logging.getLogger(__name__)
BUF_SIZE = 16384
# global state of web server that receives the SAML assertion from
# Snowflake server
class AuthByWebBrowser(AuthByExternalService):
"""
Authenticate user by web browser. Only used for SAML based
authentication.
"""
def __init__(self, rest, application,
webbrowser_pkg=None, socket_pkg=None):
self._rest = rest
self._token = None
self._application = application
self._proof_key = None
self._webbrowser = webbrowser if webbrowser_pkg is None else webbrowser_pkg
self._socket = socket.socket if socket_pkg is None else socket_pkg
@property
def assertion_content(self):
""" Returns the token."""
return self._token
def update_body(self, body):
""" Used by Auth to update the request that gets sent to
/v1/login-request.
Args:
body: existing request dictionary
"""
body[u'data'][u'AUTHENTICATOR'] = EXTERNAL_BROWSER_AUTHENTICATOR
body[u'data'][u'TOKEN'] = self._token
body[u'data'][u'PROOF_KEY'] = self._proof_key
def authenticate(self, authenticator, account, user, password):
"""
Web Browser based Authentication.
"""
logger.info(u'authenticating by Web Browser')
# ignore password. user is still needed by GS to verify
# the assertion.
_ = password # noqa: F841
socket_connection = self._socket(socket.AF_INET, socket.SOCK_STREAM)
try:
socket_connection.bind(('localhost', 0))
socket_connection.listen(0) # no backlog
callback_port = socket_connection.getsockname()[1]
print("Initiating login request with your identity provider. A "
"browser window should have opened for you to complete the "
"login. If you can't see it, check existing browser windows, "
"or your OS settings. Press CTRL+C to abort and try again...")
logger.debug(u'step 1: query GS to obtain SSO url')
sso_url = self._get_sso_url(
account, authenticator, callback_port, user)
logger.debug(u'step 2: open a browser')
if not self._webbrowser.open_new(sso_url):
logger.error(
u'Unable to open a browser in this environment.',
exc_info=True)
self.handle_failure({
u'code': ER_UNABLE_TO_OPEN_BROWSER,
u'message': u"Unable to open a browser in this environment."
})
return # required for test case
logger.debug(u'step 3: accept SAML token')
self._receive_saml_token(socket_connection)
finally:
socket_connection.close()
def _receive_saml_token(self, socket_connection):
"""
Receives SAML token from web browser
"""
socket_client, _ = socket_connection.accept()
try:
# Receive the data in small chunks and retransmit it
data = socket_client.recv(BUF_SIZE).decode('utf-8').split("\r\n")
target_lines = \
[line for line in data if line.startswith("GET ")]
if len(target_lines) < 1:
self.handle_failure({
u'code': ER_IDP_CONNECTION_ERROR,
u'message': u"Invalid HTTP request from web browser. Idp "
u"authentication could have failed."
})
return # required for test case
target_line = target_lines[0]
user_agent = [line for line in data if line.lower().startswith(
'user-agent')]
if len(user_agent) > 0:
logger.debug(user_agent[0])
else:
logger.debug("No User-Agent")
_, url, _ = target_line.split()
self._token = unquote(url[len('/?token='):])
msg = """
<!DOCTYPE html><html><head><meta charset="UTF-8"/>
<title>SAML Response for Snowflake</title></head>
<body>
Your identity was confirmed and propagated to Snowflake {0}.
You can close this window now and go back where you started from.
</body></html>""".format(self._application)
content = [
"HTTP/1.0 200 OK",
"Content-Type: text/html",
"Content-Length: {0}".format(len(msg)),
"",
msg
]
socket_client.sendall('\r\n'.join(content).encode('utf-8'))
finally:
socket_client.shutdown(socket.SHUT_RDWR)
socket_client.close()
def _get_sso_url(self, account, authenticator, callback_port, user):
"""
Gets SSO URL from Snowflake
"""
headers = {
u'Content-Type': CONTENT_TYPE_APPLICATION_JSON,
u"accept": CONTENT_TYPE_APPLICATION_JSON,
u"User-Agent": PYTHON_CONNECTOR_USER_AGENT,
}
url = u"/session/authenticator-request"
body = Auth.base_auth_data(
user, account,
self._rest._connection.application,
self._rest._connection._internal_application_name,
self._rest._connection._internal_application_version)
body[u'data'][u'AUTHENTICATOR'] = authenticator
body[u'data'][u"BROWSER_MODE_REDIRECT_PORT"] = str(callback_port)
logger.debug(u'account=%s, authenticator=%s, user=%s',
account, authenticator, user)
ret = self._rest._post_request(
url,
headers,
json.dumps(body),
timeout=self._rest._connection._login_timeout,
socket_timeout=self._rest._connection._login_timeout)
if not ret[u'success']:
self.handle_failure(ret)
data = ret[u'data']
sso_url = data[u'ssoUrl']
self._proof_key = data[u'proofKey']
return sso_url