1 '''
2 Created on Apr 30, 2010
3
4 @author: jnaous
5 '''
6
7 from django.db import models
8 import os
9 import binascii
10 from django.conf import settings
11 from datetime import timedelta, datetime
12 import time
13 from expedient.common.utils.transport import TestClientTransport
14 from urlparse import urlparse
15 from django.contrib.auth.models import User
16 import xmlrpclib
17 from expedient.common.tests.utils import test_to_http
18
23
26
28 parsed = urlparse(uri.lower())
29 if username:
30 if password:
31 new_url = "%s://%s:%s@%s%s" % (parsed.scheme,
32 username,
33 password,
34 parsed.netloc,
35 parsed.path)
36 else:
37 new_url = "%s://%s@%s%s" % (parsed.scheme,
38 username,
39 parsed.netloc,
40 parsed.path)
41 else:
42 new_url = uri
43 return new_url
44
46 - def __init__(self, uri, username=None, password=None, **kwargs):
51
53 parsed = urlparse(self._ServerProxy__host)
54 new_url = "%s://%s%s" % (parsed.scheme,
55 parsed.netloc,
56 parsed.path)
57 return (
58 "<ServerProxy for %s%s>" %
59 (new_url, self._ServerProxy__handler)
60 )
61
62 __str__ = __repr__
63
65 '''
66 Implements a server proxy for XML-RPC calls that checks SSL certs and
67 uses a password to talk to the server. The password is automatically renewed
68 whenever it expires.
69 '''
70
71 username = models.CharField(
72 max_length=100,
73 help_text="Username to use to access the remote server.")
74 password = models.CharField(
75 max_length=get_max_password_len(),
76 help_text="Password to use to access the remote server.")
77 max_password_age = models.PositiveIntegerField(
78 'Max Password age (days)', default=0,
79 help_text="If this is set to non-zero, the password "\
80 "will automatically be changed once the password ages past the "\
81 "maximum. The new password is then randomly generated.")
82 password_timestamp = models.DateTimeField(auto_now_add=True)
83 url = models.CharField("Server URL", max_length=1024)
84
85 verify_certs = models.BooleanField(
86 "Verify Certificates?", default=False,
87 help_text="Disabling this check will still verify that the "\
88 "certificate itself is well-formed. In particular, the "\
89 "server's hostname must match the certificate's Common Name.")
90
92 parsed = urlparse(self.url.lower())
93 self.transport = None
94
95 if parsed.scheme == "test":
96 self.proxy = BasicAuthServerProxy(
97 test_to_http(self.url),
98 username=self.username,
99 password=self.password,
100 transport=TestClientTransport())
101 elif parsed.scheme == "https":
102 from M2Crypto.m2xmlrpclib import SSL_Transport
103 from M2Crypto.SSL import Context
104 self.transport = SSL_Transport(Context(protocol='tlsv1'))
105 self.proxy = BasicAuthServerProxy(self.url,
106 username=self.username,
107 password=self.password,
108 transport=self.transport,
109 verbose=getattr(settings, "DEBUG", False))
110 self.set_verify_certs()
111 else:
112 self.proxy = BasicAuthServerProxy(self.url,
113 username=self.username,
114 password=self.password)
115
126
132
133
134
135
136
137
138 - def save(self, *args, **kwargs):
143
145 '''Enable/disable SSL certificate verification.'''
146 if self.transport:
147 from M2Crypto import SSL
148 if self.verify_certs:
149 self.transport.ssl_ctx.set_verify(
150 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 16)
151 self.transport.ssl_ctx.load_verify_locations(
152 capath=settings.XMLRPC_TRUSTED_CA_PATH)
153 else:
154 self.transport.ssl_ctx.set_verify(
155 SSL.verify_none, 1)
156
170
171 - def ping(self, data):
172 return self.proxy.ping(data)
173
175 '''Call the server's ping method, and see if we get a pong'''
176 try:
177 ret = self.ping("PING")
178 except Exception as e:
179 import traceback
180 print "Exception while pinging server: %s" % e
181 traceback.print_exc()
182 if get_info:
183 return (False, str(e))
184 else:
185 return False
186
187 if "PING" in ret and "PONG" in ret:
188 if get_info:
189 return (True, None)
190 else:
191 return True
192 else:
193 msg = "Server at %s returned unexpected data %s" % (self.url, ret)
194 print msg
195 if get_info:
196 return (False, msg)
197 else:
198 return False
199
201 '''
202 Add the CA that signed the certificate for self.url as trusted.
203 '''
204 import ssl
205 import subprocess
206
207
208 res = urlparse(self.url)
209 if res.scheme.lower() != "https":
210 return
211
212 port = res.port or 443
213
214
215 cert = ssl.get_server_certificate((res.hostname, port))
216
217
218 if not cert.endswith("\n-----END CERTIFICATE-----\n"):
219 cert = cert.replace("-----END CERTIFICATE-----",
220 "\n-----END CERTIFICATE-----\n")
221
222
223 with open(os.path.join(settings.XMLRPC_TRUSTED_CA_PATH,
224 res.hostname+"-ca.crt"),
225 'w') as cert_file:
226 cert_file.write(cert)
227
228
229 subprocess.Popen(['make', '-C', settings.XMLRPC_TRUSTED_CA_PATH],
230 stdin=subprocess.PIPE,
231 stdout=subprocess.PIPE,
232 stderr=subprocess.PIPE,
233 )
234