Mercurial > kallithea
diff rhodecode/model/forms.py @ 547:1e757ac98988
renamed project to rhodecode
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Wed, 06 Oct 2010 03:18:16 +0200 |
parents | pylons_app/model/forms.py@a08f610e545e |
children | b75b77ef649d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/model/forms.py Wed Oct 06 03:18:16 2010 +0200 @@ -0,0 +1,361 @@ +""" this is forms validation classes +http://formencode.org/module-formencode.validators.html +for list off all availible validators + +we can create our own validators + +The table below outlines the options which can be used in a schema in addition to the validators themselves +pre_validators [] These validators will be applied before the schema +chained_validators [] These validators will be applied after the schema +allow_extra_fields False If True, then it is not an error when keys that aren't associated with a validator are present +filter_extra_fields False If True, then keys that aren't associated with a validator are removed +if_key_missing NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value. +ignore_key_missing False If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already + + +<name> = formencode.validators.<name of validator> +<name> must equal form name +list=[1,2,3,4,5] +for SELECT use formencode.All(OneOf(list), Int()) + +""" +from formencode import All +from formencode.validators import UnicodeString, OneOf, Int, Number, Regex, \ + Email, Bool, StringBoolean +from pylons import session +from pylons.i18n.translation import _ +from rhodecode.lib.auth import check_password, get_crypt_password +from rhodecode.model import meta +from rhodecode.model.user_model import UserModel +from rhodecode.model.db import User, Repository +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound +from webhelpers.pylonslib.secure_form import authentication_token +import formencode +import logging +import os +import rhodecode.lib.helpers as h +log = logging.getLogger(__name__) + + +#this is needed to translate the messages using _() in validators +class State_obj(object): + _ = staticmethod(_) + +#=============================================================================== +# VALIDATORS +#=============================================================================== +class ValidAuthToken(formencode.validators.FancyValidator): + messages = {'invalid_token':_('Token mismatch')} + + def validate_python(self, value, state): + + if value != authentication_token(): + raise formencode.Invalid(self.message('invalid_token', state, + search_number=value), value, state) + +def ValidUsername(edit, old_data): + class _ValidUsername(formencode.validators.FancyValidator): + + def validate_python(self, value, state): + if value in ['default', 'new_user']: + raise formencode.Invalid(_('Invalid username'), value, state) + #check if user is uniq + sa = meta.Session + old_un = None + if edit: + old_un = sa.query(User).get(old_data.get('user_id')).username + + if old_un != value or not edit: + if sa.query(User).filter(User.username == value).scalar(): + raise formencode.Invalid(_('This username already exists') , + value, state) + meta.Session.remove() + + return _ValidUsername + +class ValidPassword(formencode.validators.FancyValidator): + + def to_python(self, value, state): + if value: + return get_crypt_password(value) + +class ValidAuth(formencode.validators.FancyValidator): + messages = { + 'invalid_password':_('invalid password'), + 'invalid_login':_('invalid user name'), + 'disabled_account':_('Your acccount is disabled') + + } + #error mapping + e_dict = {'username':messages['invalid_login'], + 'password':messages['invalid_password']} + e_dict_disable = {'username':messages['disabled_account']} + + def validate_python(self, value, state): + password = value['password'] + username = value['username'] + user = UserModel().get_user_by_name(username) + if user is None: + raise formencode.Invalid(self.message('invalid_password', + state=State_obj), value, state, + error_dict=self.e_dict) + if user: + if user.active: + if user.username == username and check_password(password, + user.password): + return value + else: + log.warning('user %s not authenticated', username) + raise formencode.Invalid(self.message('invalid_password', + state=State_obj), value, state, + error_dict=self.e_dict) + else: + log.warning('user %s is disabled', username) + raise formencode.Invalid(self.message('disabled_account', + state=State_obj), + value, state, + error_dict=self.e_dict_disable) + +class ValidRepoUser(formencode.validators.FancyValidator): + + def to_python(self, value, state): + try: + self.user_db = meta.Session.query(User)\ + .filter(User.active == True)\ + .filter(User.username == value).one() + except Exception: + raise formencode.Invalid(_('This username is not valid'), + value, state) + finally: + meta.Session.remove() + + return self.user_db.user_id + +def ValidRepoName(edit, old_data): + class _ValidRepoName(formencode.validators.FancyValidator): + + def to_python(self, value, state): + slug = h.repo_name_slug(value) + if slug in ['_admin']: + raise formencode.Invalid(_('This repository name is disallowed'), + value, state) + if old_data.get('repo_name') != value or not edit: + sa = meta.Session + if sa.query(Repository).filter(Repository.repo_name == slug).scalar(): + raise formencode.Invalid(_('This repository already exists') , + value, state) + meta.Session.remove() + return slug + + + return _ValidRepoName + +class ValidPerms(formencode.validators.FancyValidator): + messages = {'perm_new_user_name':_('This username is not valid')} + + def to_python(self, value, state): + perms_update = [] + perms_new = [] + #build a list of permission to update and new permission to create + for k, v in value.items(): + if k.startswith('perm_'): + if k.startswith('perm_new_user'): + new_perm = value.get('perm_new_user', False) + new_user = value.get('perm_new_user_name', False) + if new_user and new_perm: + if (new_user, new_perm) not in perms_new: + perms_new.append((new_user, new_perm)) + else: + usr = k[5:] + if usr == 'default': + if value['private']: + #set none for default when updating to private repo + v = 'repository.none' + perms_update.append((usr, v)) + value['perms_updates'] = perms_update + value['perms_new'] = perms_new + sa = meta.Session + for k, v in perms_new: + try: + self.user_db = sa.query(User)\ + .filter(User.active == True)\ + .filter(User.username == k).one() + except Exception: + msg = self.message('perm_new_user_name', + state=State_obj) + raise formencode.Invalid(msg, value, state, error_dict={'perm_new_user_name':msg}) + return value + +class ValidSettings(formencode.validators.FancyValidator): + + def to_python(self, value, state): + #settings form can't edit user + if value.has_key('user'): + del['value']['user'] + + return value + +class ValidPath(formencode.validators.FancyValidator): + def to_python(self, value, state): + isdir = os.path.isdir(value.replace('*', '')) + if (value.endswith('/*') or value.endswith('/**')) and isdir: + return value + elif not isdir: + msg = _('This is not a valid path') + else: + msg = _('You need to specify * or ** at the end of path (ie. /tmp/*)') + + raise formencode.Invalid(msg, value, state, + error_dict={'paths_root_path':msg}) + +def UniqSystemEmail(old_data): + class _UniqSystemEmail(formencode.validators.FancyValidator): + def to_python(self, value, state): + if old_data.get('email') != value: + sa = meta.Session + try: + user = sa.query(User).filter(User.email == value).scalar() + if user: + raise formencode.Invalid(_("That e-mail address is already taken") , + value, state) + finally: + meta.Session.remove() + + return value + + return _UniqSystemEmail + +class ValidSystemEmail(formencode.validators.FancyValidator): + def to_python(self, value, state): + sa = meta.Session + try: + user = sa.query(User).filter(User.email == value).scalar() + if user is None: + raise formencode.Invalid(_("That e-mail address doesn't exist.") , + value, state) + finally: + meta.Session.remove() + + return value + +#=============================================================================== +# FORMS +#=============================================================================== +class LoginForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = True + username = UnicodeString( + strip=True, + min=1, + not_empty=True, + messages={ + 'empty':_('Please enter a login'), + 'tooShort':_('Enter a value %(min)i characters long or more')} + ) + + password = UnicodeString( + strip=True, + min=6, + not_empty=True, + messages={ + 'empty':_('Please enter a password'), + 'tooShort':_('Enter a value %(min)i characters long or more')} + ) + + + #chained validators have access to all data + chained_validators = [ValidAuth] + +def UserForm(edit=False, old_data={}): + class _UserForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = True + username = All(UnicodeString(strip=True, min=1, not_empty=True), ValidUsername(edit, old_data)) + if edit: + new_password = All(UnicodeString(strip=True, min=6, not_empty=False), ValidPassword) + admin = StringBoolean(if_missing=False) + else: + password = All(UnicodeString(strip=True, min=6, not_empty=True), ValidPassword) + active = StringBoolean(if_missing=False) + name = UnicodeString(strip=True, min=1, not_empty=True) + lastname = UnicodeString(strip=True, min=1, not_empty=True) + email = All(Email(not_empty=True), UniqSystemEmail(old_data)) + + return _UserForm + +RegisterForm = UserForm + +def PasswordResetForm(): + class _PasswordResetForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = True + email = All(ValidSystemEmail(), Email(not_empty=True)) + return _PasswordResetForm + +def RepoForm(edit=False, old_data={}): + class _RepoForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = False + repo_name = All(UnicodeString(strip=True, min=1, not_empty=True), ValidRepoName(edit, old_data)) + description = UnicodeString(strip=True, min=1, not_empty=True) + private = StringBoolean(if_missing=False) + + if edit: + user = All(Int(not_empty=True), ValidRepoUser) + + chained_validators = [ValidPerms] + return _RepoForm + +def RepoForkForm(edit=False, old_data={}): + class _RepoForkForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = False + fork_name = All(UnicodeString(strip=True, min=1, not_empty=True), ValidRepoName(edit, old_data)) + description = UnicodeString(strip=True, min=1, not_empty=True) + private = StringBoolean(if_missing=False) + + return _RepoForkForm + +def RepoSettingsForm(edit=False, old_data={}): + class _RepoForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = False + repo_name = All(UnicodeString(strip=True, min=1, not_empty=True), ValidRepoName(edit, old_data)) + description = UnicodeString(strip=True, min=1, not_empty=True) + private = StringBoolean(if_missing=False) + + chained_validators = [ValidPerms, ValidSettings] + return _RepoForm + + +def ApplicationSettingsForm(): + class _ApplicationSettingsForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = False + hg_app_title = UnicodeString(strip=True, min=1, not_empty=True) + hg_app_realm = UnicodeString(strip=True, min=1, not_empty=True) + + return _ApplicationSettingsForm + +def ApplicationUiSettingsForm(): + class _ApplicationUiSettingsForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = False + web_push_ssl = OneOf(['true', 'false'], if_missing='false') + paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True)) + hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False) + hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False) + + return _ApplicationUiSettingsForm + +def DefaultPermissionsForm(perms_choices, register_choices, create_choices): + class _DefaultPermissionsForm(formencode.Schema): + allow_extra_fields = True + filter_extra_fields = True + overwrite_default = OneOf(['true', 'false'], if_missing='false') + default_perm = OneOf(perms_choices) + default_register = OneOf(register_choices) + default_create = OneOf(create_choices) + + return _DefaultPermissionsForm