comparison rhodecode/model/validators.py @ 2466:7010dc12f10c codereview

Added rewritten validators module + tests
author Marcin Kuzminski <marcin@python-works.com>
date Sun, 17 Jun 2012 21:31:31 +0200
parents
children 9225597688f4
comparison
equal deleted inserted replaced
2465:8a8805478312 2466:7010dc12f10c
1 """
2 Set of generic validators
3 """
4 import os
5 import re
6 import formencode
7 import logging
8 from pylons.i18n.translation import _
9 from webhelpers.pylonslib.secure_form import authentication_token
10
11 from formencode.validators import (
12 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set
13 )
14
15 from rhodecode.lib.utils import repo_name_slug
16 from rhodecode.model.db import RepoGroup, Repository, UsersGroup, User
17 from rhodecode.lib.auth import authenticate
18 from rhodecode.lib.exceptions import LdapImportError
19 from rhodecode.config.routing import ADMIN_PREFIX
20 # silence warnings and pylint
21 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set
22
23 log = logging.getLogger(__name__)
24
25
26 class StateObj(object):
27 """
28 this is needed to translate the messages using _() in validators
29 """
30 _ = staticmethod(_)
31
32
33 def M(self, key, state=None, **kwargs):
34 """
35 returns string from self.message based on given key,
36 passed kw params are used to substitute %(named)s params inside
37 translated strings
38
39 :param msg:
40 :param state:
41 """
42 if state is None:
43 state = StateObj()
44 else:
45 state._ = staticmethod(_)
46 #inject validator into state object
47 return self.message(key, state, **kwargs)
48
49
50 def ValidUsername(edit=False, old_data={}):
51 class _validator(formencode.validators.FancyValidator):
52 messages = {
53 'username_exists': _(u'Username "%(username)s" already exists'),
54 'system_invalid_username':
55 _(u'Username "%(username)s" is forbidden'),
56 'invalid_username':
57 _(u'Username may only contain alphanumeric characters '
58 'underscores, periods or dashes and must begin with '
59 'alphanumeric character')
60 }
61
62 def validate_python(self, value, state):
63 if value in ['default', 'new_user']:
64 msg = M(self, 'system_invalid_username', state, username=value)
65 raise formencode.Invalid(msg, value, state)
66 #check if user is unique
67 old_un = None
68 if edit:
69 old_un = User.get(old_data.get('user_id')).username
70
71 if old_un != value or not edit:
72 if User.get_by_username(value, case_insensitive=True):
73 msg = M(self, 'username_exists', state, username=value)
74 raise formencode.Invalid(msg, value, state)
75
76 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
77 msg = M(self, 'invalid_username', state)
78 raise formencode.Invalid(msg, value, state)
79 return _validator
80
81
82 def ValidRepoUser():
83 class _validator(formencode.validators.FancyValidator):
84 messages = {
85 'invalid_username': _(u'Username %(username)s is not valid')
86 }
87
88 def validate_python(self, value, state):
89 try:
90 User.query().filter(User.active == True)\
91 .filter(User.username == value).one()
92 except Exception:
93 msg = M(self, 'invalid_username', state, username=value)
94 raise formencode.Invalid(msg, value, state,
95 error_dict=dict(username=msg)
96 )
97
98 return _validator
99
100
101 def ValidUsersGroup(edit=False, old_data={}):
102 class _validator(formencode.validators.FancyValidator):
103 messages = {
104 'invalid_group': _(u'Invalid users group name'),
105 'group_exist': _(u'Users group "%(usersgroup)s" already exists'),
106 'invalid_usersgroup_name':
107 _(u'users group name may only contain alphanumeric '
108 'characters underscores, periods or dashes and must begin '
109 'with alphanumeric character')
110 }
111
112 def validate_python(self, value, state):
113 if value in ['default']:
114 msg = M(self, 'invalid_group', state)
115 raise formencode.Invalid(msg, value, state,
116 error_dict=dict(users_group_name=msg)
117 )
118 #check if group is unique
119 old_ugname = None
120 if edit:
121 old_id = old_data.get('users_group_id')
122 old_ugname = UsersGroup.get(old_id).users_group_name
123
124 if old_ugname != value or not edit:
125 is_existing_group = UsersGroup.get_by_group_name(value,
126 case_insensitive=True)
127 if is_existing_group:
128 msg = M(self, 'group_exist', state, usersgroup=value)
129 raise formencode.Invalid(msg, value, state,
130 error_dict=dict(users_group_name=msg)
131 )
132
133 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
134 msg = M(self, 'invalid_usersgroup_name', state)
135 raise formencode.Invalid(msg, value, state,
136 error_dict=dict(users_group_name=msg)
137 )
138
139 return _validator
140
141
142 def ValidReposGroup(edit=False, old_data={}):
143 class _validator(formencode.validators.FancyValidator):
144 messages = {
145 'group_parent_id': _(u'Cannot assign this group as parent'),
146 'group_exists': _(u'Group "%(group_name)s" already exists'),
147 'repo_exists':
148 _(u'Repository with name "%(group_name)s" already exists')
149 }
150
151 def validate_python(self, value, state):
152 # TODO WRITE VALIDATIONS
153 group_name = value.get('group_name')
154 group_parent_id = value.get('group_parent_id')
155
156 # slugify repo group just in case :)
157 slug = repo_name_slug(group_name)
158
159 # check for parent of self
160 parent_of_self = lambda: (
161 old_data['group_id'] == int(group_parent_id)
162 if group_parent_id else False
163 )
164 if edit and parent_of_self():
165 msg = M(self, 'group_parent_id', state)
166 raise formencode.Invalid(msg, value, state,
167 error_dict=dict(group_parent_id=msg)
168 )
169
170 old_gname = None
171 if edit:
172 old_gname = RepoGroup.get(old_data.get('group_id')).group_name
173
174 if old_gname != group_name or not edit:
175
176 # check group
177 gr = RepoGroup.query()\
178 .filter(RepoGroup.group_name == slug)\
179 .filter(RepoGroup.group_parent_id == group_parent_id)\
180 .scalar()
181
182 if gr:
183 msg = M(self, 'group_exists', state, group_name=slug)
184 raise formencode.Invalid(msg, value, state,
185 error_dict=dict(group_name=msg)
186 )
187
188 # check for same repo
189 repo = Repository.query()\
190 .filter(Repository.repo_name == slug)\
191 .scalar()
192
193 if repo:
194 msg = M(self, 'repo_exists', state, group_name=slug)
195 raise formencode.Invalid(msg, value, state,
196 error_dict=dict(group_name=msg)
197 )
198
199 return _validator
200
201
202 def ValidPassword():
203 class _validator(formencode.validators.FancyValidator):
204 messages = {
205 'invalid_password':
206 _(u'Invalid characters (non-ascii) in password')
207 }
208
209 def validate_python(self, value, state):
210 try:
211 (value or '').decode('ascii')
212 except UnicodeError:
213 msg = M(self, 'invalid_password', state)
214 raise formencode.Invalid(msg, value, state,)
215 return _validator
216
217
218 def ValidPasswordsMatch():
219 class _validator(formencode.validators.FancyValidator):
220 messages = {
221 'password_mismatch': _(u'Passwords do not match'),
222 }
223
224 def validate_python(self, value, state):
225
226 pass_val = value.get('password') or value.get('new_password')
227 if pass_val != value['password_confirmation']:
228 msg = M(self, 'password_mismatch', state)
229 raise formencode.Invalid(msg, value, state,
230 error_dict=dict(password_confirmation=msg)
231 )
232 return _validator
233
234
235 def ValidAuth():
236 class _validator(formencode.validators.FancyValidator):
237 messages = {
238 'invalid_password': _(u'invalid password'),
239 'invalid_username': _(u'invalid user name'),
240 'disabled_account': _(u'Your account is disabled')
241 }
242
243 def validate_python(self, value, state):
244 password = value['password']
245 username = value['username']
246
247 if not authenticate(username, password):
248 user = User.get_by_username(username)
249 if user and user.active is False:
250 log.warning('user %s is disabled' % username)
251 msg = M(self, 'disabled_account', state)
252 raise formencode.Invalid(msg, value, state,
253 error_dict=dict(username=msg)
254 )
255 else:
256 log.warning('user %s failed to authenticate' % username)
257 msg = M(self, 'invalid_username', state)
258 msg2 = M(self, 'invalid_password', state)
259 raise formencode.Invalid(msg, value, state,
260 error_dict=dict(username=msg, password=msg2)
261 )
262 return _validator
263
264
265 def ValidAuthToken():
266 class _validator(formencode.validators.FancyValidator):
267 messages = {
268 'invalid_token': _(u'Token mismatch')
269 }
270
271 def validate_python(self, value, state):
272 if value != authentication_token():
273 msg = M(self, 'invalid_token', state)
274 raise formencode.Invalid(msg, value, state)
275 return _validator
276
277
278 def ValidRepoName(edit=False, old_data={}):
279 class _validator(formencode.validators.FancyValidator):
280 messages = {
281 'invalid_repo_name':
282 _(u'Repository name %(repo)s is disallowed'),
283 'repository_exists':
284 _(u'Repository named %(repo)s already exists'),
285 'repository_in_group_exists': _(u'Repository "%(repo)s" already '
286 'exists in group "%(group)s"'),
287 'same_group_exists': _(u'Repositories group with name "%(repo)s" '
288 'already exists')
289 }
290
291 def _to_python(self, value, state):
292 repo_name = repo_name_slug(value.get('repo_name', ''))
293 repo_group = value.get('repo_group')
294 if repo_group:
295 gr = RepoGroup.get(repo_group)
296 group_path = gr.full_path
297 group_name = gr.group_name
298 # value needs to be aware of group name in order to check
299 # db key This is an actual just the name to store in the
300 # database
301 repo_name_full = group_path + RepoGroup.url_sep() + repo_name
302 else:
303 group_name = group_path = ''
304 repo_name_full = repo_name
305
306 value['repo_name'] = repo_name
307 value['repo_name_full'] = repo_name_full
308 value['group_path'] = group_path
309 value['group_name'] = group_name
310 return value
311
312 def validate_python(self, value, state):
313
314 repo_name = value.get('repo_name')
315 repo_name_full = value.get('repo_name_full')
316 group_path = value.get('group_path')
317 group_name = value.get('group_name')
318
319 if repo_name in [ADMIN_PREFIX, '']:
320 msg = M(self, 'invalid_repo_name', state, repo=repo_name)
321 raise formencode.Invalid(msg, value, state,
322 error_dict=dict(repo_name=msg)
323 )
324
325 rename = old_data.get('repo_name') != repo_name_full
326 create = not edit
327 if rename or create:
328
329 if group_path != '':
330 if Repository.get_by_repo_name(repo_name_full):
331 msg = M(self, 'repository_in_group_exists', state,
332 repo=repo_name, group=group_name)
333 raise formencode.Invalid(msg, value, state,
334 error_dict=dict(repo_name=msg)
335 )
336 elif RepoGroup.get_by_group_name(repo_name_full):
337 msg = M(self, 'same_group_exists', state,
338 repo=repo_name)
339 raise formencode.Invalid(msg, value, state,
340 error_dict=dict(repo_name=msg)
341 )
342
343 elif Repository.get_by_repo_name(repo_name_full):
344 msg = M(self, 'repository_exists', state,
345 repo=repo_name)
346 raise formencode.Invalid(msg, value, state,
347 error_dict=dict(repo_name=msg)
348 )
349 return value
350 return _validator
351
352
353 def ValidForkName(*args, **kwargs):
354 return ValidRepoName(*args, **kwargs)
355
356
357 def SlugifyName():
358 class _validator(formencode.validators.FancyValidator):
359
360 def _to_python(self, value, state):
361 return repo_name_slug(value)
362
363 def validate_python(self, value, state):
364 pass
365
366 return _validator
367
368
369 def ValidCloneUri():
370 from rhodecode.lib.utils import make_ui
371
372 def url_handler(repo_type, url, proto, ui=None):
373 if repo_type == 'hg':
374 from mercurial.httprepo import httprepository, httpsrepository
375 if proto == 'https':
376 httpsrepository(make_ui('db'), url).capabilities
377 elif proto == 'http':
378 httprepository(make_ui('db'), url).capabilities
379 elif repo_type == 'git':
380 #TODO: write a git url validator
381 pass
382
383 class _validator(formencode.validators.FancyValidator):
384 messages = {
385 'clone_uri': _(u'invalid clone url'),
386 'invalid_clone_uri': _(u'Invalid clone url, provide a '
387 'valid clone http\s url')
388 }
389
390 def validate_python(self, value, state):
391 repo_type = value.get('repo_type')
392 url = value.get('clone_uri')
393
394 if not url:
395 pass
396 elif url.startswith('https') or url.startswith('http'):
397 _type = 'https' if url.startswith('https') else 'http'
398 try:
399 url_handler(repo_type, url, _type, make_ui('db'))
400 except Exception:
401 log.exception('Url validation failed')
402 msg = M(self, 'clone_uri')
403 raise formencode.Invalid(msg, value, state,
404 error_dict=dict(clone_uri=msg)
405 )
406 else:
407 msg = M(self, 'invalid_clone_uri', state)
408 raise formencode.Invalid(msg, value, state,
409 error_dict=dict(clone_uri=msg)
410 )
411 return _validator
412
413
414 def ValidForkType(old_data={}):
415 class _validator(formencode.validators.FancyValidator):
416 messages = {
417 'invalid_fork_type': _(u'Fork have to be the same type as parent')
418 }
419
420 def validate_python(self, value, state):
421 if old_data['repo_type'] != value:
422 msg = M(self, 'invalid_fork_type', state)
423 raise formencode.Invalid(msg, value, state,
424 error_dict=dict(repo_type=msg)
425 )
426 return _validator
427
428
429 def ValidPerms(type_='repo'):
430 if type_ == 'group':
431 EMPTY_PERM = 'group.none'
432 elif type_ == 'repo':
433 EMPTY_PERM = 'repository.none'
434
435 class _validator(formencode.validators.FancyValidator):
436 messages = {
437 'perm_new_member_name':
438 _(u'This username or users group name is not valid')
439 }
440
441 def to_python(self, value, state):
442 perms_update = []
443 perms_new = []
444 # build a list of permission to update and new permission to create
445 for k, v in value.items():
446 # means new added member to permissions
447 if k.startswith('perm_new_member'):
448 new_perm = value.get('perm_new_member', False)
449 new_member = value.get('perm_new_member_name', False)
450 new_type = value.get('perm_new_member_type')
451
452 if new_member and new_perm:
453 if (new_member, new_perm, new_type) not in perms_new:
454 perms_new.append((new_member, new_perm, new_type))
455 elif k.startswith('u_perm_') or k.startswith('g_perm_'):
456 member = k[7:]
457 t = {'u': 'user',
458 'g': 'users_group'
459 }[k[0]]
460 if member == 'default':
461 if value.get('private'):
462 # set none for default when updating to
463 # private repo
464 v = EMPTY_PERM
465 perms_update.append((member, v, t))
466
467 value['perms_updates'] = perms_update
468 value['perms_new'] = perms_new
469
470 # update permissions
471 for k, v, t in perms_new:
472 try:
473 if t is 'user':
474 self.user_db = User.query()\
475 .filter(User.active == True)\
476 .filter(User.username == k).one()
477 if t is 'users_group':
478 self.user_db = UsersGroup.query()\
479 .filter(UsersGroup.users_group_active == True)\
480 .filter(UsersGroup.users_group_name == k).one()
481
482 except Exception:
483 log.exception('Updated permission failed')
484 msg = M(self, 'perm_new_member_type', state)
485 raise formencode.Invalid(msg, value, state,
486 error_dict=dict(perm_new_member_name=msg)
487 )
488 return value
489 return _validator
490
491
492 def ValidSettings():
493 class _validator(formencode.validators.FancyValidator):
494 def _to_python(self, value, state):
495 # settings form can't edit user
496 if 'user' in value:
497 del value['user']
498 return value
499
500 def validate_python(self, value, state):
501 pass
502 return _validator
503
504
505 def ValidPath():
506 class _validator(formencode.validators.FancyValidator):
507 messages = {
508 'invalid_path': _(u'This is not a valid path')
509 }
510
511 def validate_python(self, value, state):
512 if not os.path.isdir(value):
513 msg = M(self, 'invalid_path', state)
514 raise formencode.Invalid(msg, value, state,
515 error_dict=dict(paths_root_path=msg)
516 )
517 return _validator
518
519
520 def UniqSystemEmail(old_data={}):
521 class _validator(formencode.validators.FancyValidator):
522 messages = {
523 'email_taken': _(u'This e-mail address is already taken')
524 }
525
526 def _to_python(self, value, state):
527 return value.lower()
528
529 def validate_python(self, value, state):
530 if (old_data.get('email') or '').lower() != value:
531 user = User.get_by_email(value, case_insensitive=True)
532 if user:
533 msg = M(self, 'email_taken', state)
534 raise formencode.Invalid(msg, value, state,
535 error_dict=dict(email=msg)
536 )
537 return _validator
538
539
540 def ValidSystemEmail():
541 class _validator(formencode.validators.FancyValidator):
542 messages = {
543 'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
544 }
545
546 def _to_python(self, value, state):
547 return value.lower()
548
549 def validate_python(self, value, state):
550 user = User.get_by_email(value, case_insensitive=True)
551 if user is None:
552 msg = M(self, 'non_existing_email', state, email=value)
553 raise formencode.Invalid(msg, value, state,
554 error_dict=dict(email=msg)
555 )
556
557 return _validator
558
559
560 def LdapLibValidator():
561 class _validator(formencode.validators.FancyValidator):
562 messages = {
563
564 }
565
566 def validate_python(self, value, state):
567 try:
568 import ldap
569 ldap # pyflakes silence !
570 except ImportError:
571 raise LdapImportError()
572
573 return _validator
574
575
576 def AttrLoginValidator():
577 class _validator(formencode.validators.FancyValidator):
578 messages = {
579 'invalid_cn':
580 _(u'The LDAP Login attribute of the CN must be specified - '
581 'this is the name of the attribute that is equivalent '
582 'to "username"')
583 }
584
585 def validate_python(self, value, state):
586 if not value or not isinstance(value, (str, unicode)):
587 msg = M(self, 'invalid_cn', state)
588 raise formencode.Invalid(msg, value, state,
589 error_dict=dict(ldap_attr_login=msg)
590 )
591
592 return _validator