CVS commit (wiking): cms.py certificates.py appl.py
Tomas Cerha
cerha at devel.brailcom.org
Thu May 7 19:19:58 CEST 2009
Update of /var/lib/cvs/wiking/lib/wiking/cms
In directory devel:/tmp/cvs-serv24677/lib/wiking/cms
Modified Files:
cms.py appl.py
Added Files:
certificates.py
Log Message:
Support for certificates moved to a saparate module and disabled for now.
/var/www/hosts/cvs.freebsoft.org/src/wiking:
Index: lib/wiking/configuration.py
===================================================================
RCS file: /var/lib/cvs/wiking/lib/wiking/configuration.py,v
retrieving revision 1.42
retrieving revision 1.41
diff -u -r1.42 -r1.41
--- lib/wiking/configuration.py 2 Mar 2009 21:35:44 -0000 1.41
+++ lib/wiking/configuration.py 7 May 2009 17:19:56 -0000 1.42
@@ -335,37 +335,16 @@
"and 'md5' (passwords are stored in the form of MD5 hashes).")
_DEFAULT = 'plain'
+ class _Option_login_is_email(pc.BooleanOption, pc.HiddenOption):
+ _DESCR = _("Whether to use e-mails as login names")
+ _DOC = _("Iff true, users must use e-mail addresses as their login names.")
+ _DEFAULT = False
+
class _Option_registration_expiry_days(pc.NumericOption, pc.HiddenOption):
_DESCR = "Number of days after unanswered user registration expires"
_DOC = ("When registration by e-mail is enabled, each newly registered user is required "
"to answer the registration e-mail within the limit given here.")
_DEFAULT = 2
- class _Option_certificate_expiration_days(pc.NumericOption, pc.HiddenOption):
- _DESCR = "Number of days to make signed certificates valid."
- _DOC = ("User authentication certificates signed by our local certificate authority "
- "will be made valid for that number of days.")
- _DEFAULT = 5*365
-
- class _Option_ca_certificate_file(pc.StringOption, pc.HiddenOption):
- _DESCR = "Name of the file containing the local certification authority certificate."
- _DOC = ("This certificate is used to sign users' certificates used for authentication to the application.")
- _DEFAULT = '/etc/wiking/ca-cert.pem'
-
- class _Option_ca_key_file(pc.StringOption, pc.HiddenOption):
- _DESCR = "Name of the file containing the key corresponding to the local certification authority certificate."
- _DOC = ("This is the secret certificate private key.")
- _DEFAULT = '/etc/wiking/ca-key.pem'
-
- class _Option_certificate_authentication(pc.BooleanOption, pc.HiddenOption):
- _DESCR = "Whether certificate authentication is enabled."
- def default(self):
- ca_certificate_file = self._configuration.ca_certificate_file
- return ca_certificate_file and os.path.exists(ca_certificate_file)
-
- class _Option_login_is_email(pc.BooleanOption, pc.HiddenOption):
- _DESCR = _("Whether to use e-mails as login names")
- _DOC = _("Iff true, users must use e-mail addresses as their login names.")
- _DEFAULT = False
Index: lib/wiking/cms/appl.py
===================================================================
RCS file: /var/lib/cvs/wiking/lib/wiking/cms/appl.py,v
retrieving revision 1.33
retrieving revision 1.32
diff -u -r1.33 -r1.32
--- lib/wiking/cms/appl.py 6 Mar 2009 15:57:10 -0000 1.32
+++ lib/wiking/cms/appl.py 7 May 2009 17:19:56 -0000 1.33
@@ -133,18 +133,6 @@
raise Exception("Invalid password storage option", password_storage)
return password == record['password'].value()
- def authenticate(self, req):
- user = None
- if cfg.certificate_authentication:
- certificate = req.certificate()
- if certificate is not None:
- user = self._module('UserCertificates').certificate_user(req, certificate)
- if user is not None:
- user.set_authentication_parameters(method='certificate', auto=True)
- if user is None:
- user = super(Application, self).authenticate(req)
- return user
-
def authorize(self, req, module, action=None, record=None, **kwargs):
if req.path[0] == '_registration' and module.name() == 'Users':
# This hack redirects action authorization back to Registration after redirection to
Index: lib/wiking/cms/certificates.py
===================================================================
RCS file: lib/wiking/cms/certificates.py
diff -N lib/wiking/cms/certificates.py
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ lib/wiking/cms/certificates.py 7 May 2009 17:19:55 -0000 1.1
@@ -0,0 +1,398 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2006-2009 Brailcom, o.p.s.
+# Author: Milan Zamazal
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""Support for certificate administration and authentication.
+
+This code is currently unused. If it is ever needed again, it is necessary to re-inregrate it into
+Wiking CMS. It was taken out from cms.py on 2009-05-07, so the other related changes can be found
+in the version control system history.
+
+"""
+
+from wiking.cms import *
+
+import cStringIO
+import os
+import subprocess
+import mx.DateTime
+
+import pytis.data
+from pytis.presentation import computer, Computer, Fields, Field
+from lcg import log as debug
+
+ONCE = pp.Editable.ONCE
+NEVER = pp.Editable.NEVER
+ALWAYS = pp.Editable.ALWAYS
+ASC = pd.ASCENDENT
+DESC = pd.DESCENDANT
+
+# Strings for this domain will not be translated until this module is unused.
+_ = lcg.TranslatableTextFactory('wiking-cms-certificates')
+
+
+def _certificate_mail_info(record):
+ text = _("To generate the request, you can use the OpenSSL package "
+ "and the attached OpenSSL configuration file.\n"
+ "In such a case use the following command to generate the certificate "
+ "request, assuming your private key is stored in a file named `key.pem':")
+ text += ("\n\n"
+ " openssl req -utf8 -new -key key.pem -out request.pem -config openssl.cnf\n\n")
+ text += ("If you don't have a private key, you can generate one together with the "
+ "certificate request using the following command:\n\n"
+ " openssl req -utf8 -newkey rsa:2048 -keyout key.pem -out request.pem"
+ " -config openssl.cnf\n\n")
+ attachment = "openssl.cnf"
+ user_name = '%s %s' % (record['firstname'].value(), record['surname'].value(),)
+ user_email = record['email'].value()
+ attachment_stream = cStringIO.StringIO(str (
+ '''[ req ]
+distinguished_name = req_distinguished_name
+attributes = req_attributes
+x509_extensions = v3_ca
+prompt = no
+string_mask = utf8only
+[ req_distinguished_name ]
+CN = %s
+emailAddress = %s
+[ req_attributes ]
+[ usr_cert ]
+[ v3_req ]
+basicConstraints = CA:FALSE
+nsCertType = client
+keyUsage = keyEncipherment
+[ v3_ca ]
+
+[ crl_ext ]
+''' % (user_name, user_email,)))
+ return text, attachment, attachment_stream
+
+
+class Certificates(CMSModule):
+ """Base class of classes handling various kinds of certificates."""
+
+ class UniType(pytis.data.Type):
+ """Universal type able to contain any value.
+
+ This is just a utility type for internal use within 'Certificates'
+ class, without implementing any of the 'Type' methods.
+
+ """
+
+ class Spec(Specification):
+
+ def __init__(self, *args, **kwargs):
+ Specification.__init__(self, *args, **kwargs)
+ self._ca_x509 = None
+
+ _ID_COLUMN = 'certificates_id'
+
+ def fields(self): return (
+ Field(self._ID_COLUMN, width=8, editable=NEVER),
+ Field('file', _("PEM file"), virtual=True, editable=ALWAYS,
+ type=pytis.data.Binary(not_null=True, maxlen=10000),
+ descr=_("Upload a PEM file containing the certificate")),
+ Field('certificate', _("Certificate"), width=60, height=20, editable=NEVER,
+ computer=Computer(self._certificate_computer, depends=('file',))),
+ Field('x509', _("X509 structure"), virtual=True, editable=NEVER, type=Certificates.UniType(),
+ computer=Computer(self._x509_computer, depends=('certificate',))),
+ Field('serial_number', _("Serial number"), editable=NEVER,
+ computer=self._make_x509_computer(self._serial_number_computer)),
+ Field('text', _("Certificate"), width=60, height=20, editable=NEVER,
+ computer=self._make_x509_computer(self._text_computer)),
+ Field('issuer', _("Certification Authority"), width=32, editable=NEVER,
+ computer=self._make_x509_computer(self._issuer_computer)),
+ Field('valid_from', _("Valid from"), editable=NEVER,
+ computer=self._make_x509_computer(self._valid_from_computer)),
+ Field('valid_until', _("Valid until"), editable=NEVER,
+ computer=self._make_x509_computer(self._valid_until_computer)),
+ Field('trusted', _("Trusted"), default=False,
+ descr=_("When this is checked, certificates signed by this root certificate are considered valid.")),
+ )
+
+ columns = ('issuer', 'valid_from', 'valid_until', 'trusted',)
+ sorting = (('issuer', ASC,), ('valid_until', ASC,))
+ layout = ('trusted', 'issuer', 'valid_from', 'valid_until', 'text',)
+
+ def _certificate_computation(self, buffer):
+ return str(buffer)
+ def _certificate_computer(self, record):
+ file_value = record['file'].value()
+ if file_value is None: # new record form
+ return None
+ certificate = self._certificate_computation(file_value.buffer())
+ return certificate
+ def _x509_computer(self, record):
+ import gnutls.crypto
+ if self._ca_x509 is None:
+ self._ca_x509 = gnutls.crypto.X509Certificate(open(cfg.ca_certificate_file).read())
+ certificate = record['certificate'].value()
+ if certificate is None: # new record form
+ return None
+ try:
+ x509 = gnutls.crypto.X509Certificate(certificate)
+ except Exception, e:
+ raise Exception(_("Invalid certificate"), e)
+ if not x509.has_issuer(x509) and not x509.has_issuer(self._ca_x509):
+ x509 = None
+ return x509
+ def _make_x509_computer(self, function):
+ def func(record):
+ x509 = record['x509'].value()
+ if x509 is None:
+ return None
+ return function(x509)
+ return Computer(func, depends=('x509',))
+ def _serial_number_computer(self, x509):
+ number = int(x509.serial_number)
+ if not isinstance(number, int): # it may be long
+ raise Exception(_("Unprocessable serial number"))
+ return number
+ def _issuer_computer(self, x509):
+ return unicode(x509.issuer)
+ def _valid_from_computer(self, x509):
+ return self._convert_x509_timestamp(x509.activation_time)
+ def _valid_until_computer(self, x509):
+ return self._convert_x509_timestamp(x509.expiration_time)
+ def _text_computer(self, x509):
+ return ('Subject: %s\nIssuer: %s\nSerial number: %s\nVersion: %s\nValid from: %s\nValid until: %s\n' %
+ (x509.subject, x509.issuer, x509.serial_number, x509.version, time.ctime(x509.activation_time), time.ctime(x509.expiration_time),))
+ def _convert_x509_timestamp(self, timestamp):
+ time_tuple = time.gmtime(timestamp)
+ mx_time = mx.DateTime.DateTime(*time_tuple[:6])
+ return mx_time
+
+ def check(self, record):
+ x509 = record['x509'].value()
+ if x509 is None:
+ return ('file', _("The certificate is not valid"),)
+
+ RIGHTS_view = (Roles.ADMIN,)
+ RIGHTS_list = (Roles.ADMIN,)
+ RIGHTS_rss = (Roles.ADMIN,)
+ RIGHTS_insert = (Roles.ADMIN,)
+ RIGHTS_update = (Roles.ADMIN,)
+ RIGHTS_delete = (Roles.ADMIN,)
+
+ _LAYOUT = {'insert': ('file',)}
+
+class CACertificates(Certificates):
+ """Management of root certificates."""
+
+ class Spec(Certificates.Spec):
+
+ _ID_COLUMN = 'cacertificates_id'
+
+ table = 'cacertificates'
+ title = _("CA Certificates")
+ help = _("Manage trusted root certificates.")
+
+ def check(self, record):
+ error = Certificates.Spec.check(self, record)
+ if error is not None:
+ return error
+ x509 = record['x509'].value()
+ if x509.check_ca() != 1:
+ return ('file', _("This is not a CA certificate."))
+
+ _LAYOUT = {'insert': ('file',)}
+
+ #WMI_SECTION = WikingManagementInterface.SECTION_CERTIFICATES
+ WMI_ORDER = 100
+
+class UserCertificates(Certificates):
+ """Management of user certificates, especially for the purpose of authentication."""
+
+ class Spec(Certificates.Spec):
+
+ title = _("User Certificates")
+ help = _("Manage user and other kinds of certificates.")
+ table = 'certificates'
+
+ _PURPOSE_AUTHENTICATION = 1
+
+ def fields(self):
+ fields = Certificates.Spec.fields(self)
+ fields = fields + (Field('subject', _("Subject"), virtual=True, editable=NEVER, type=Certificates.UniType(),
+ computer=self._make_x509_computer(self._subject_computer)),
+ Field('common_name', _("Name"), editable=NEVER,
+ computer=Computer(self._common_name_computer, depends=('subject',))),
+ Field('email', _("E-mail"), editable=NEVER,
+ computer=Computer(self._email_computer, depends=('subject',))),
+ Field('uid', not_null=True),
+ Field('purpose', not_null=True),
+ )
+ return fields
+
+ _OWNER_COLUMN = 'uid'
+ columns = ('common_name', 'valid_from', 'valid_until', 'trusted',)
+ layout = ('trusted', 'common_name', 'email', 'issuer', 'valid_from', 'valid_until', 'text',)
+
+ def _subject_computer(self, x509):
+ return x509.subject
+ def _common_name_computer(self, record):
+ subject = record['subject'].value()
+ if subject is None:
+ return ''
+ return subject.common_name
+ def _email_computer(self, record):
+ subject = record['subject'].value()
+ if subject is None:
+ return ''
+ return subject.email
+
+ #WMI_SECTION = WikingManagementInterface.SECTION_CERTIFICATES
+ WMI_ORDER = 200
+
+ def authentication_certificate(self, uid):
+ """Return authentication certificate row of the given user.
+
+ The return value is the corresponding 'pytis.data.Row' instance. If
+ the user doesn't have authentication certificate assigned, return
+ 'None'.
+
+ This method considers only authentication certificates. Certificates
+ present for other purposes are ignored.
+
+ Arguments:
+
+ uid -- user id as an integer
+
+ """
+ data = self._data
+ uid_value = pd.Value(data.find_column('uid').type(), uid)
+ purpose_value = pd.Value(data.find_column('purpose').type(), self.Spec._PURPOSE_AUTHENTICATION)
+ self._data.select(pd.AND(pd.EQ('uid', uid_value), pd.EQ('purpose', purpose_value)))
+ row = self._data.fetchone()
+ self._data.close()
+ return row
+
+ def certificate_user(self, req, certificate):
+ """Return user corresponding to 'certificate' and request 'req'.
+
+ If there is no such user, return 'None'.
+
+ The method assumes the certificate has already been verified by site CA
+ certificate, so no verification is performed.
+
+ Arguments:
+
+ req -- 'Request' instance to provide for construction of the user object
+ certificate -- PEM encoded certificate verified against the site's CA
+ certificate, a string
+
+ """
+ user = None
+ import gnutls.crypto
+ x509 = gnutls.crypto.X509Certificate(certificate)
+ serial_number = int(x509.serial_number)
+ row = self._data.get_row(serial_number=serial_number)
+ if row is not None:
+ uid = row['uid'].value()
+ user_module = self._module('Users')
+ user_record = user_module.find_user(req, uid)
+ user = user_module.user(req, user_record['login'].value())
+ return user
+
+class CertificateRequest(UserCertificates):
+
+ class Spec(UserCertificates.Spec):
+ # This is a *public* method. But it has to begin with underscore
+ # otherwise it would be handled in a special way by Wiking, causing
+ # failure.
+ # Comment by TC: This is not a good practice. Specification shouldnt care about DB
+ # connection. The code that needs it should be defined at the module level.
+ def _set_dbconnection(self, dbconnection):
+ self._serial_number_counter = pd.DBCounterDefault('certificate_serial_number', dbconnection)
+
+ def fields(self):
+ fields = pp.Fields(UserCertificates.Spec.fields(self))
+ overridden = [Field(inherit=fields['file'], descr=_("Upload a PEM file containing the certificate request")),
+ Field(inherit=fields['purpose'], default=self._PURPOSE_AUTHENTICATION)]
+ # We add some fields to propagate last form values to the new request
+ extra = [Field('regcode', type=pytis.data.String(), virtual=True)]
+ return fields.fields(override=overridden) + extra
+
+ def _certificate_computation(self, buffer):
+ serial_number = self._serial_number_counter.next()
+ working_dir = os.path.join(cfg.storage, 'certificate-%d' % (serial_number,))
+ request_file = os.path.join(working_dir, 'request')
+ certificate_file = os.path.join(working_dir, 'certificate.pem')
+ log_file = os.path.join(working_dir, 'log')
+ template_file = os.path.join(working_dir, 'certtool.cfg')
+ os.mkdir(working_dir)
+ try:
+ stdout = open(log_file, 'w')
+ open(request_file, 'w').write(str(buffer))
+ open(template_file, 'w').write('serial = %s\nexpiration_days = %s\ntls_www_client\n' %
+ (serial_number, cfg.certificate_expiration_days,))
+ return_code = subprocess.call(('/usr/bin/certtool', '--generate-certificate',
+ '--load-request', request_file,
+ '--outfile', certificate_file,
+ '--load-ca-certificate', cfg.ca_certificate_file, '--load-ca-privkey', cfg.ca_key_file,
+ '--template', template_file,),
+ stdout=stdout, stderr=stdout)
+ if return_code != 0:
+ raise Exception(_("Certificate request could not be processed"), open(log_file).read())
+ certificate = open(certificate_file).read()
+ finally:
+ for file_name in os.listdir(working_dir):
+ os.remove(os.path.join(working_dir, file_name))
+ os.rmdir(working_dir)
+ return certificate
+
+ def _spec(self, resolver):
+ spec = super(CertificateRequest, self)._spec(resolver)
+ spec._set_dbconnection(self._dbconnection)
+ return spec
+
+ def _layout(self, req, action, record=None):
+ # This is necessary to propagate `uid' given in the form to the actual
+ # data row
+ if action == 'insert' and req.param('submit'):
+ result = pp.GroupSpec(('uid', 'file',), orientation=pp.Orientation.VERTICAL)
+ else:
+ result = super(CertificateRequest, self)._layout(req, action, record)
+ return result
+
+ def _document_title(self, req, record):
+ return _("Certificate upload")
+
+
+# Related configuration options:
+ #class _Option_certificate_expiration_days(pc.NumericOption, pc.HiddenOption):
+ # _DESCR = "Number of days to make signed certificates valid."
+ # _DOC = ("User authentication certificates signed by our local certificate authority "
+ # "will be made valid for that number of days.")
+ # _DEFAULT = 5*365
+ #
+ #class _Option_ca_certificate_file(pc.StringOption, pc.HiddenOption):
+ # _DESCR = "Name of the file containing the local certification authority certificate."
+ # _DOC = ("This certificate is used to sign users' certificates used for authentication to the application.")
+ # _DEFAULT = '/etc/wiking/ca-cert.pem'
+ #
+ #class _Option_ca_key_file(pc.StringOption, pc.HiddenOption):
+ # _DESCR = "Name of the file containing the key corresponding to the local certification authority certificate."
+ # _DOC = ("This is the secret certificate private key.")
+ # _DEFAULT = '/etc/wiking/ca-key.pem'
+ #
+ #class _Option_certificate_authentication(pc.BooleanOption, pc.HiddenOption):
+ # _DESCR = "Whether certificate authentication is enabled."
+ # def default(self):
+ # ca_certificate_file = self._configuration.ca_certificate_file
+ # return ca_certificate_file and os.path.exists(ca_certificate_file)
+
Index: lib/wiking/cms/cms.py
===================================================================
RCS file: /var/lib/cvs/wiking/lib/wiking/cms/cms.py,v
retrieving revision 1.160
retrieving revision 1.159
diff -u -r1.160 -r1.159
--- lib/wiking/cms/cms.py 4 May 2009 15:50:49 -0000 1.159
+++ lib/wiking/cms/cms.py 7 May 2009 17:19:55 -0000 1.160
@@ -30,8 +30,6 @@
import random
import re
import string
-import subprocess
-import tempfile
import mx.DateTime
from mx.DateTime import today, TimeDelta
@@ -90,14 +88,11 @@
SECTION_USERS = 'users'
SECTION_STYLE = 'style'
SECTION_SETUP = 'setup'
- SECTION_CERTIFICATES = 'certificates'
_SECTIONS = ((SECTION_STYLE, _("Look & Feel"),
_("Customize the appearance of your site.")),
(SECTION_USERS, _("User Management"),
_("Manage registered users and their privileges.")),
- #(SECTION_CERTIFICATES, _("Certificate Management"),
- # _("Manage trusted certificates of your site.")),
(SECTION_SETUP, _("Setup"),
_("Edit global properties of your web site.")),
)
@@ -145,41 +140,6 @@
[MenuItem('__site_menu__', '', hidden=True, variants=variants,
submenu=self._module('Pages').menu(req))]
-def _certificate_mail_info(record):
- text = _("To generate the request, you can use the OpenSSL package "
- "and the attached OpenSSL configuration file.\n"
- "In such a case use the following command to generate the certificate "
- "request, assuming your private key is stored in a file named `key.pem':")
- text += ("\n\n"
- " openssl req -utf8 -new -key key.pem -out request.pem -config openssl.cnf\n\n")
- text += ("If you don't have a private key, you can generate one together with the "
- "certificate request using the following command:\n\n"
- " openssl req -utf8 -newkey rsa:2048 -keyout key.pem -out request.pem"
- " -config openssl.cnf\n\n")
- attachment = "openssl.cnf"
- user_name = '%s %s' % (record['firstname'].value(), record['surname'].value(),)
- user_email = record['email'].value()
- attachment_stream = cStringIO.StringIO(str (
- '''[ req ]
-distinguished_name = req_distinguished_name
-attributes = req_attributes
-x509_extensions = v3_ca
-prompt = no
-string_mask = utf8only
-[ req_distinguished_name ]
-CN = %s
-emailAddress = %s
-[ req_attributes ]
-[ usr_cert ]
-[ v3_req ]
-basicConstraints = CA:FALSE
-nsCertType = client
-keyUsage = keyEncipherment
-[ v3_ca ]
-
-[ crl_ext ]
-''' % (user_name, user_email,)))
- return text, attachment, attachment_stream
class Registration(Module, ActionHandler):
"""User registration and account management.
@@ -226,88 +186,43 @@
def action_insert(self, req):
if not cfg.appl.allow_registration:
raise Forbidden()
- if req.param('form_name') == 'CertificateRequest':
- certificate_request_module = self._module('CertificateRequest')
- record, errors, layout = certificate_request_module.action_insert_perform(req)
- if errors:
- result = certificate_request_module.action_insert_document(req, layout, errors,
- record)
- else:
- result = self.action_confirm(req)
- else:
- result = self._module('Users').action_insert(req)
- return result
+ return self._module('Users').action_insert(req)
RIGHTS_insert = (Roles.ANYONE,)
def action_remind(self, req):
- certificate_authentication = cfg.certificate_authentication
- req_user = req.user()
- if certificate_authentication:
- if req_user is None:
- title = _("Password reminder and certificate change")
- else:
- title = req_user.name() + " :: " + _("Certificate renewal")
- else:
- title = _("Password reminder")
- if req.param('login') or req_user:
+ title = _("Password reminder")
+ if req.param('login'):
users = self._module('Users')
- record = users.find_user(req, req.param('login') or req_user.login())
+ record = users.find_user(req, req.param('login'))
if record:
- if req_user is not None:
- text = ""
- else:
- text = concat(
- _("A password reminder request has been made at %(server_uri)s.",
- server_uri=req.server_uri()), '',
- separator='\n')
- attachments = []
+ text = _("A password reminder request has been made at %(server_uri)s.",
+ server_uri=req.server_uri()) +'\n\n'
uid = record['uid'].value()
login = record['login'].value()
password = record['password'].value()
- if req_user is not None:
- pass
- elif password:
- if cfg.password_storage == 'md5':
- password = self._generate_password()
- password_value, password_error = \
- pytis.data.Password(md5=True).validate(password, verify=password)
- assert password_error is None, password_error
- try:
- record.update(password=password_value.value())
- except pd.DBException, e:
- req.message(unicode(e.exception()), type=req.ERROR)
- content = self.ReminderForm()
- return Document(title, content)
- intro_text = _("Your credentials were reset to:")
- else:
- intro_text = _("Your credentials are:")
- text = concat(
- text,
- intro_text,
- ' '+_("Login name") +': '+ login,
- ' '+_("Password") +': '+ password, '',
- _("We strongly recommend you change your password at nearest occassion, "
- "since it has been exposed to an unsecure channel."),
- separator='\n')
+ if cfg.password_storage == 'md5':
+ password = self._generate_password()
+ password_value, password_error = \
+ pytis.data.Password(md5=True).validate(password, verify=password)
+ assert password_error is None, password_error
+ try:
+ record.update(password=password_value.value())
+ except pd.DBException, e:
+ req.message(unicode(e.exception()), type=req.ERROR)
+ content = self.ReminderForm()
+ return Document(title, content)
+ intro_text = _("Your credentials were reset to:")
else:
- text = concat(
- text,
- _("No password authentication for your account"))
- text += "\n"
- if certificate_authentication:
- code = users.set_registration_code(uid)
- attachments = ()
- cert_text, attachment, attachment_stream = _certificate_mail_info(record)
- attachments += (MailAttachment(attachment, stream=attachment_stream),)
- uri = req.server_uri() + make_uri(self._base_uri(req), action='certload',
- uid=uid, regcode=code, reset=1)
- text = concat (
- text,
- _("Visit %(uri)s to upload your certificate request.", uri=uri),
- cert_text,
- separator='\n') + "\n"
- err = send_mail(record['email'].value(), title, text, lang=req.prefered_language(),
- attachments=attachments)
+ intro_text = _("Your credentials are:")
+ text += concat(
+ intro_text,
+ ' '+_("Login name") +': '+ login,
+ ' '+_("Password") +': '+ password, '',
+ _("We strongly recommend you change your password at nearest occassion, "
+ "since it has been exposed to an unsecure channel."),
+ separator='\n')
+ err = send_mail(record['email'].value(), title, text+"\n",
+ lang=req.prefered_language())
if err:
req.message(_("Failed sending e-mail notification:") +' '+ err, type=req.ERROR)
msg = _("Please try repeating your request later or contact the administrator!")
@@ -334,22 +249,6 @@
return self._module('Users').action_confirm(req)
RIGHTS_confirm = (Roles.ANYONE,)
- def action_certload(self, req):
- users = self._module('Users')
- try:
- record = users.check_registration_code(req)
- except BadRequest:
- raise AuthorizationError()
- result = self._module('CertificateRequest').action_insert(req)
- if req.param('submit'):
- users.delete_registration_code(req)
- return result
- RIGHTS_certload = (Roles.ANYONE,)
-
- def action_certrequest(self, req):
- return self.action_remind(req)
- RIGHTS_certrequest = (Roles.ANYONE,)
-
class CMSModule(PytisModule, RssModule, Panelizable):
"Base class for all CMS modules."""
@@ -1920,9 +1819,7 @@
descr=_("A valid login name can only contain letters, digits, underscores, "
"dashes and dots and must start with a letter.")),
Field('password', _("Password"), width=16,
- type=pd.Password(minlen=4, maxlen=32,
- not_null=(not cfg.certificate_authentication),
- md5=md5_passwords),
+ type=pd.Password(minlen=4, maxlen=32, not_null=True, md5=md5_passwords),
descr=_("Please, write the password into each of the two fields to eliminate "
"typos.")),
Field('old_password', _(u"Old password"), virtual=True, width=16,
@@ -1955,22 +1852,12 @@
Field('lang'),
Field('regexpire', computer=Computer(self._registration_expiry, depends=())),
Field('regcode', computer=Computer(self._registration_code, depends=())),
- Field('certauth', _("Certificate authentication"), type=pd.Boolean(),
- descr=_("Check this field to authenticate by a certificate rather than by "
- "a password."), ),
#Field('organization', _("Organization"), editable=ONCE,
# descr=_(("If you are a member of an organization registered in the application "
# "write the name of the organization here. "
# "Otherwise leave the field empty."))),
#Field('organization_id', _("Organization"), codebook='Organizations', not_null=False),
- Field('certfile', _("Certificat request file"), virtual=True, editable=ALWAYS,
- type=pytis.data.Binary(not_null=True, maxlen=10000),
- descr=_("Upload a PEM file containing the certificate")),
)
- def check(self, record):
- if cfg.certificate_authentication:
- if not record['password'].value() and not record['certauth'].value():
- return 'password', _("No password given")
def _check_email(self, email):
result = wiking.validate_email_address(email)
if not result[0]:
@@ -2035,8 +1922,7 @@
((not cfg.login_is_email) and ('email',) or ()) +
('phone', 'address', 'uri')),
FieldSet(_("Login information"),
- ((cfg.login_is_email and 'email' or 'login'), 'password') + \
- (cfg.certificate_authentication and ('certauth',) or ())))
+ ((cfg.login_is_email and 'email' or 'login'), 'password')))
elif action == 'passwd' and record is not None:
layout = ['new_password']
if not req.check_roles(Roles.ADMIN) or req.user().uid() == record['uid'].value():
@@ -2103,14 +1989,7 @@
actions = (Action(_("Enable"), 'enable', descr=_("Enable this account")),) + \
actions
if req.user():
- authentication_method = req.user().authentication_method()
- if authentication_method == 'password':
- actions += (Action(_("Change password"), 'passwd',
- descr=_("Change user's password")),)
- elif authentication_method == 'certificate':
- actions += (Action(_("Change certificate"), 'newcert',
- descr=_("Generate new certificate"),
- uid=req.user().uid()),)
+ actions += (Action(_("Change password"), 'passwd', descr=_("Change user's password")),)
return actions
def _actions(self, req, record):
@@ -2124,32 +2003,10 @@
def _send_admin_confirmation_mail(self, req, record):
self._module('Users').send_admin_approval_mail(req, record)
-
- def _certificate_confirmation(self, req, record, email=None):
- uid = record['uid'].value()
- certificate_row = self._module('UserCertificates').authentication_certificate(uid)
- if certificate_row is not None:
- certificate = certificate_row['certificate'].value()
- text = _("Here is your certificate to authenticate to the application")
- try:
- language = record['lang'].value()
- except KeyError:
- language = req.prefered_language()
- if email is None:
- email = record['email'].value()
- send_mail(email,
- _("Your certificate for %s", req.server_hostname()),
- text,
- lang=language,
- attachments=(MailAttachment('cert.pem',
- stream=cStringIO.StringIO(str(certificate))),))
- return certificate_row
def _confirmation_success_content(self, req, record):
content = [lcg.p(_("Registration completed successfuly. "
"Your account now awaits administrator's approval."))]
- if cfg.certificate_authentication and self._certificate_confirmation(req, record):
- content.append(lcg.p(_("The signed certificate has been sent to you by e-mail.")))
return content
def action_confirm(self, req):
@@ -2166,30 +2023,6 @@
return Document(_("Registration confirmed"), content)
RIGHTS_confirm = (Roles.ANYONE,)
- # TODO: Nutno zruÅ¡it a pÅedÄlat to, kvůli Äemu to tu bylo.
- def action_insert(self, req, record=None):
- if req.param('form_name') == 'CertificateRequest':
- result = self.action_newcert(req, record)
- else:
- result = super(Users, self).action_insert(req)
- return result
-
- def action_newcert(self, req, record):
- certificate_request_module = self._module('CertificateRequest')
- if req.param('submit'):
- record, errors, layout = certificate_request_module.action_insert_perform(req)
- if errors:
- result = certificate_request_module.action_insert_document(req, layout, errors,
- record)
- else:
- email = req.user().email()
- self._certificate_confirmation(req, record, email=email)
- result = Document(_("Certificate updated"), lcg.p(_("New certificate installed.")))
- else:
- result = certificate_request_module.action_insert(req)
- return result
- RIGHTS_newcert = (Roles.OWNER,)
-
def _registration_success_content(self, req, record):
content = lcg.p(_("Registration completed successfuly. "
"Your account now awaits administrator's approval."))
@@ -2199,12 +2032,7 @@
msg, err = None, None
base_uri = self._application.module_uri('Registration') or '/_wmi/'+ self.name()
server_hostname = req.server_hostname()
- certificate_authentication = (cfg.certificate_authentication and req.param('certauth'))
- if certificate_authentication:
- action = 'certload'
- else:
- action = 'confirm'
- uri = req.server_uri() + make_uri(base_uri, action=action, uid=record['uid'].value(),
+ uri = req.server_uri() + make_uri(base_uri, action='confirm', uid=record['uid'].value(),
regcode=record['regcode'].value())
text = _("The first step of your registration at %(server_hostname)s "
"was successfully completed.\n\n"
@@ -2213,11 +2041,6 @@
server_hostname=server_hostname,
uri=uri)
attachments = ()
- if certificate_authentication:
- cert_text, attachment, attachment_stream = _certificate_mail_info(record)
- text += _("\nYou will be asked to upload your certificate request.\n")
- text += cert_text
- attachments += (MailAttachment(attachment, stream=attachment_stream),)
return text, attachments
def _redirect_after_insert(self, req, record):
@@ -2387,8 +2210,7 @@
Return the generated code.
- This method can be used in registration or in password and certificate
- reminders.
+ This method can be used in registration or in password reminders.
"""
code = self.Spec._generate_registration_code()
@@ -2745,291 +2567,3 @@
return self.text(label, lang=lang, _method=Texts.parsed_text)
-class Certificates(CMSModule):
- """Base class of classes handling various kinds of certificates."""
-
- class UniType(pytis.data.Type):
- """Universal type able to contain any value.
-
- This is just a utility type for internal use within 'Certificates'
- class, without implementing any of the 'Type' methods.
-
- """
-
- class Spec(Specification):
-
- def __init__(self, *args, **kwargs):
- Specification.__init__(self, *args, **kwargs)
- self._ca_x509 = None
-
- _ID_COLUMN = 'certificates_id'
-
- def fields(self): return (
- Field(self._ID_COLUMN, width=8, editable=NEVER),
- Field('file', _("PEM file"), virtual=True, editable=ALWAYS,
- type=pytis.data.Binary(not_null=True, maxlen=10000),
- descr=_("Upload a PEM file containing the certificate")),
- Field('certificate', _("Certificate"), width=60, height=20, editable=NEVER,
- computer=Computer(self._certificate_computer, depends=('file',))),
- Field('x509', _("X509 structure"), virtual=True, editable=NEVER, type=Certificates.UniType(),
- computer=Computer(self._x509_computer, depends=('certificate',))),
- Field('serial_number', _("Serial number"), editable=NEVER,
- computer=self._make_x509_computer(self._serial_number_computer)),
- Field('text', _("Certificate"), width=60, height=20, editable=NEVER,
- computer=self._make_x509_computer(self._text_computer)),
- Field('issuer', _("Certification Authority"), width=32, editable=NEVER,
- computer=self._make_x509_computer(self._issuer_computer)),
- Field('valid_from', _("Valid from"), editable=NEVER,
- computer=self._make_x509_computer(self._valid_from_computer)),
- Field('valid_until', _("Valid until"), editable=NEVER,
- computer=self._make_x509_computer(self._valid_until_computer)),
- Field('trusted', _("Trusted"), default=False,
- descr=_("When this is checked, certificates signed by this root certificate are considered valid.")),
- )
-
- columns = ('issuer', 'valid_from', 'valid_until', 'trusted',)
- sorting = (('issuer', ASC,), ('valid_until', ASC,))
- layout = ('trusted', 'issuer', 'valid_from', 'valid_until', 'text',)
-
- def _certificate_computation(self, buffer):
- return str(buffer)
- def _certificate_computer(self, record):
- file_value = record['file'].value()
- if file_value is None: # new record form
- return None
- certificate = self._certificate_computation(file_value.buffer())
- return certificate
- def _x509_computer(self, record):
- import gnutls.crypto
- if self._ca_x509 is None:
- self._ca_x509 = gnutls.crypto.X509Certificate(open(cfg.ca_certificate_file).read())
- certificate = record['certificate'].value()
- if certificate is None: # new record form
- return None
- try:
- x509 = gnutls.crypto.X509Certificate(certificate)
- except Exception, e:
- raise Exception(_("Invalid certificate"), e)
- if not x509.has_issuer(x509) and not x509.has_issuer(self._ca_x509):
- x509 = None
- return x509
- def _make_x509_computer(self, function):
- def func(record):
- x509 = record['x509'].value()
- if x509 is None:
- return None
- return function(x509)
- return Computer(func, depends=('x509',))
- def _serial_number_computer(self, x509):
- number = int(x509.serial_number)
- if not isinstance(number, int): # it may be long
- raise Exception(_("Unprocessable serial number"))
- return number
- def _issuer_computer(self, x509):
- return unicode(x509.issuer)
- def _valid_from_computer(self, x509):
- return self._convert_x509_timestamp(x509.activation_time)
- def _valid_until_computer(self, x509):
- return self._convert_x509_timestamp(x509.expiration_time)
- def _text_computer(self, x509):
- return ('Subject: %s\nIssuer: %s\nSerial number: %s\nVersion: %s\nValid from: %s\nValid until: %s\n' %
- (x509.subject, x509.issuer, x509.serial_number, x509.version, time.ctime(x509.activation_time), time.ctime(x509.expiration_time),))
- def _convert_x509_timestamp(self, timestamp):
- time_tuple = time.gmtime(timestamp)
- mx_time = mx.DateTime.DateTime(*time_tuple[:6])
- return mx_time
-
- def check(self, record):
- x509 = record['x509'].value()
- if x509 is None:
- return ('file', _("The certificate is not valid"),)
-
- RIGHTS_view = (Roles.ADMIN,)
- RIGHTS_list = (Roles.ADMIN,)
- RIGHTS_rss = (Roles.ADMIN,)
- RIGHTS_insert = (Roles.ADMIN,)
- RIGHTS_update = (Roles.ADMIN,)
- RIGHTS_delete = (Roles.ADMIN,)
-
- _LAYOUT = {'insert': ('file',)}
-
-class CACertificates(Certificates):
- """Management of root certificates."""
-
- class Spec(Certificates.Spec):
-
- _ID_COLUMN = 'cacertificates_id'
-
- table = 'cacertificates'
- title = _("CA Certificates")
- help = _("Manage trusted root certificates.")
-
- def check(self, record):
- error = Certificates.Spec.check(self, record)
- if error is not None:
- return error
- x509 = record['x509'].value()
- if x509.check_ca() != 1:
- return ('file', _("This is not a CA certificate."))
-
- _LAYOUT = {'insert': ('file',)}
-
- WMI_SECTION = WikingManagementInterface.SECTION_CERTIFICATES
- WMI_ORDER = 100
-
-class UserCertificates(Certificates):
- """Management of user certificates, especially for the purpose of authentication."""
-
- class Spec(Certificates.Spec):
-
- title = _("User Certificates")
- help = _("Manage user and other kinds of certificates.")
- table = 'certificates'
-
- _PURPOSE_AUTHENTICATION = 1
-
- def fields(self):
- fields = Certificates.Spec.fields(self)
- fields = fields + (Field('subject', _("Subject"), virtual=True, editable=NEVER, type=Certificates.UniType(),
- computer=self._make_x509_computer(self._subject_computer)),
- Field('common_name', _("Name"), editable=NEVER,
- computer=Computer(self._common_name_computer, depends=('subject',))),
- Field('email', _("E-mail"), editable=NEVER,
- computer=Computer(self._email_computer, depends=('subject',))),
- Field('uid', not_null=True),
- Field('purpose', not_null=True),
- )
- return fields
-
- _OWNER_COLUMN = 'uid'
- columns = ('common_name', 'valid_from', 'valid_until', 'trusted',)
- layout = ('trusted', 'common_name', 'email', 'issuer', 'valid_from', 'valid_until', 'text',)
-
- def _subject_computer(self, x509):
- return x509.subject
- def _common_name_computer(self, record):
- subject = record['subject'].value()
- if subject is None:
- return ''
- return subject.common_name
- def _email_computer(self, record):
- subject = record['subject'].value()
- if subject is None:
- return ''
- return subject.email
-
- WMI_SECTION = WikingManagementInterface.SECTION_CERTIFICATES
- WMI_ORDER = 200
-
- def authentication_certificate(self, uid):
- """Return authentication certificate row of the given user.
-
- The return value is the corresponding 'pytis.data.Row' instance. If
- the user doesn't have authentication certificate assigned, return
- 'None'.
-
- This method considers only authentication certificates. Certificates
- present for other purposes are ignored.
-
- Arguments:
-
- uid -- user id as an integer
-
- """
- data = self._data
- uid_value = pd.Value(data.find_column('uid').type(), uid)
- purpose_value = pd.Value(data.find_column('purpose').type(), self.Spec._PURPOSE_AUTHENTICATION)
- self._data.select(pd.AND(pd.EQ('uid', uid_value), pd.EQ('purpose', purpose_value)))
- row = self._data.fetchone()
- self._data.close()
- return row
-
- def certificate_user(self, req, certificate):
- """Return user corresponding to 'certificate' and request 'req'.
-
- If there is no such user, return 'None'.
-
- The method assumes the certificate has already been verified by site CA
- certificate, so no verification is performed.
-
- Arguments:
-
- req -- 'Request' instance to provide for construction of the user object
- certificate -- PEM encoded certificate verified against the site's CA
- certificate, a string
-
- """
- user = None
- import gnutls.crypto
- x509 = gnutls.crypto.X509Certificate(certificate)
- serial_number = int(x509.serial_number)
- row = self._data.get_row(serial_number=serial_number)
- if row is not None:
- uid = row['uid'].value()
- user_module = self._module('Users')
- user_record = user_module.find_user(req, uid)
- user = user_module.user(req, user_record['login'].value())
- return user
-
-class CertificateRequest(UserCertificates):
-
- class Spec(UserCertificates.Spec):
-
- # This is a *public* method. But it has to begin with underscore
- # otherwise it would be handled in a special way by Wiking, causing
- # failure.
- def _set_dbconnection(self, dbconnection):
- self._serial_number_counter = pd.DBCounterDefault('certificate_serial_number', dbconnection)
-
- def fields(self):
- fields = pp.Fields(UserCertificates.Spec.fields(self))
- overridden = [Field(inherit=fields['file'], descr=_("Upload a PEM file containing the certificate request")),
- Field(inherit=fields['purpose'], default=self._PURPOSE_AUTHENTICATION)]
- # We add some fields to propagate last form values to the new request
- extra = [Field('regcode', type=pytis.data.String(), virtual=True)]
- return fields.fields(override=overridden) + extra
-
- def _certificate_computation(self, buffer):
- serial_number = self._serial_number_counter.next()
- working_dir = os.path.join(cfg.storage, 'certificate-%d' % (serial_number,))
- request_file = os.path.join(working_dir, 'request')
- certificate_file = os.path.join(working_dir, 'certificate.pem')
- log_file = os.path.join(working_dir, 'log')
- template_file = os.path.join(working_dir, 'certtool.cfg')
- os.mkdir(working_dir)
- try:
- stdout = open(log_file, 'w')
- open(request_file, 'w').write(str(buffer))
- open(template_file, 'w').write('serial = %s\nexpiration_days = %s\ntls_www_client\n' %
- (serial_number, cfg.certificate_expiration_days,))
- return_code = subprocess.call(('/usr/bin/certtool', '--generate-certificate',
- '--load-request', request_file,
- '--outfile', certificate_file,
- '--load-ca-certificate', cfg.ca_certificate_file, '--load-ca-privkey', cfg.ca_key_file,
- '--template', template_file,),
- stdout=stdout, stderr=stdout)
- if return_code != 0:
- raise Exception(_("Certificate request could not be processed"), open(log_file).read())
- certificate = open(certificate_file).read()
- finally:
- for file_name in os.listdir(working_dir):
- os.remove(os.path.join(working_dir, file_name))
- os.rmdir(working_dir)
- return certificate
-
- def _spec(self, resolver):
- spec = super(CertificateRequest, self)._spec(resolver)
- spec._set_dbconnection(self._dbconnection)
- return spec
-
- def _layout(self, req, action, record=None):
- # This is necessary to propagate `uid' given in the form to the actual
- # data row
- if action == 'insert' and req.param('submit'):
- result = pp.GroupSpec(('uid', 'file',), orientation=pp.Orientation.VERTICAL)
- else:
- result = super(CertificateRequest, self)._layout(req, action, record)
- return result
-
- def _document_title(self, req, record):
- return _("Certificate upload")
More information about the Wiking-cvs
mailing list