Mercurial > kallithea
comparison rhodecode/model/validators.py @ 2466:7010dc12f10c codereview
Added rewritten validators module + tests
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Sun, 17 Jun 2012 21:31:31 +0200 |
parents | |
children | 9225597688f4 |
comparison
equal
deleted
inserted
replaced
2465:8a8805478312 | 2466:7010dc12f10c |
---|---|
1 """ | |
2 Set of generic validators | |
3 """ | |
4 import os | |
5 import re | |
6 import formencode | |
7 import logging | |
8 from pylons.i18n.translation import _ | |
9 from webhelpers.pylonslib.secure_form import authentication_token | |
10 | |
11 from formencode.validators import ( | |
12 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set | |
13 ) | |
14 | |
15 from rhodecode.lib.utils import repo_name_slug | |
16 from rhodecode.model.db import RepoGroup, Repository, UsersGroup, User | |
17 from rhodecode.lib.auth import authenticate | |
18 from rhodecode.lib.exceptions import LdapImportError | |
19 from rhodecode.config.routing import ADMIN_PREFIX | |
20 # silence warnings and pylint | |
21 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set | |
22 | |
23 log = logging.getLogger(__name__) | |
24 | |
25 | |
26 class StateObj(object): | |
27 """ | |
28 this is needed to translate the messages using _() in validators | |
29 """ | |
30 _ = staticmethod(_) | |
31 | |
32 | |
33 def M(self, key, state=None, **kwargs): | |
34 """ | |
35 returns string from self.message based on given key, | |
36 passed kw params are used to substitute %(named)s params inside | |
37 translated strings | |
38 | |
39 :param msg: | |
40 :param state: | |
41 """ | |
42 if state is None: | |
43 state = StateObj() | |
44 else: | |
45 state._ = staticmethod(_) | |
46 #inject validator into state object | |
47 return self.message(key, state, **kwargs) | |
48 | |
49 | |
50 def ValidUsername(edit=False, old_data={}): | |
51 class _validator(formencode.validators.FancyValidator): | |
52 messages = { | |
53 'username_exists': _(u'Username "%(username)s" already exists'), | |
54 'system_invalid_username': | |
55 _(u'Username "%(username)s" is forbidden'), | |
56 'invalid_username': | |
57 _(u'Username may only contain alphanumeric characters ' | |
58 'underscores, periods or dashes and must begin with ' | |
59 'alphanumeric character') | |
60 } | |
61 | |
62 def validate_python(self, value, state): | |
63 if value in ['default', 'new_user']: | |
64 msg = M(self, 'system_invalid_username', state, username=value) | |
65 raise formencode.Invalid(msg, value, state) | |
66 #check if user is unique | |
67 old_un = None | |
68 if edit: | |
69 old_un = User.get(old_data.get('user_id')).username | |
70 | |
71 if old_un != value or not edit: | |
72 if User.get_by_username(value, case_insensitive=True): | |
73 msg = M(self, 'username_exists', state, username=value) | |
74 raise formencode.Invalid(msg, value, state) | |
75 | |
76 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None: | |
77 msg = M(self, 'invalid_username', state) | |
78 raise formencode.Invalid(msg, value, state) | |
79 return _validator | |
80 | |
81 | |
82 def ValidRepoUser(): | |
83 class _validator(formencode.validators.FancyValidator): | |
84 messages = { | |
85 'invalid_username': _(u'Username %(username)s is not valid') | |
86 } | |
87 | |
88 def validate_python(self, value, state): | |
89 try: | |
90 User.query().filter(User.active == True)\ | |
91 .filter(User.username == value).one() | |
92 except Exception: | |
93 msg = M(self, 'invalid_username', state, username=value) | |
94 raise formencode.Invalid(msg, value, state, | |
95 error_dict=dict(username=msg) | |
96 ) | |
97 | |
98 return _validator | |
99 | |
100 | |
101 def ValidUsersGroup(edit=False, old_data={}): | |
102 class _validator(formencode.validators.FancyValidator): | |
103 messages = { | |
104 'invalid_group': _(u'Invalid users group name'), | |
105 'group_exist': _(u'Users group "%(usersgroup)s" already exists'), | |
106 'invalid_usersgroup_name': | |
107 _(u'users group name may only contain alphanumeric ' | |
108 'characters underscores, periods or dashes and must begin ' | |
109 'with alphanumeric character') | |
110 } | |
111 | |
112 def validate_python(self, value, state): | |
113 if value in ['default']: | |
114 msg = M(self, 'invalid_group', state) | |
115 raise formencode.Invalid(msg, value, state, | |
116 error_dict=dict(users_group_name=msg) | |
117 ) | |
118 #check if group is unique | |
119 old_ugname = None | |
120 if edit: | |
121 old_id = old_data.get('users_group_id') | |
122 old_ugname = UsersGroup.get(old_id).users_group_name | |
123 | |
124 if old_ugname != value or not edit: | |
125 is_existing_group = UsersGroup.get_by_group_name(value, | |
126 case_insensitive=True) | |
127 if is_existing_group: | |
128 msg = M(self, 'group_exist', state, usersgroup=value) | |
129 raise formencode.Invalid(msg, value, state, | |
130 error_dict=dict(users_group_name=msg) | |
131 ) | |
132 | |
133 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None: | |
134 msg = M(self, 'invalid_usersgroup_name', state) | |
135 raise formencode.Invalid(msg, value, state, | |
136 error_dict=dict(users_group_name=msg) | |
137 ) | |
138 | |
139 return _validator | |
140 | |
141 | |
142 def ValidReposGroup(edit=False, old_data={}): | |
143 class _validator(formencode.validators.FancyValidator): | |
144 messages = { | |
145 'group_parent_id': _(u'Cannot assign this group as parent'), | |
146 'group_exists': _(u'Group "%(group_name)s" already exists'), | |
147 'repo_exists': | |
148 _(u'Repository with name "%(group_name)s" already exists') | |
149 } | |
150 | |
151 def validate_python(self, value, state): | |
152 # TODO WRITE VALIDATIONS | |
153 group_name = value.get('group_name') | |
154 group_parent_id = value.get('group_parent_id') | |
155 | |
156 # slugify repo group just in case :) | |
157 slug = repo_name_slug(group_name) | |
158 | |
159 # check for parent of self | |
160 parent_of_self = lambda: ( | |
161 old_data['group_id'] == int(group_parent_id) | |
162 if group_parent_id else False | |
163 ) | |
164 if edit and parent_of_self(): | |
165 msg = M(self, 'group_parent_id', state) | |
166 raise formencode.Invalid(msg, value, state, | |
167 error_dict=dict(group_parent_id=msg) | |
168 ) | |
169 | |
170 old_gname = None | |
171 if edit: | |
172 old_gname = RepoGroup.get(old_data.get('group_id')).group_name | |
173 | |
174 if old_gname != group_name or not edit: | |
175 | |
176 # check group | |
177 gr = RepoGroup.query()\ | |
178 .filter(RepoGroup.group_name == slug)\ | |
179 .filter(RepoGroup.group_parent_id == group_parent_id)\ | |
180 .scalar() | |
181 | |
182 if gr: | |
183 msg = M(self, 'group_exists', state, group_name=slug) | |
184 raise formencode.Invalid(msg, value, state, | |
185 error_dict=dict(group_name=msg) | |
186 ) | |
187 | |
188 # check for same repo | |
189 repo = Repository.query()\ | |
190 .filter(Repository.repo_name == slug)\ | |
191 .scalar() | |
192 | |
193 if repo: | |
194 msg = M(self, 'repo_exists', state, group_name=slug) | |
195 raise formencode.Invalid(msg, value, state, | |
196 error_dict=dict(group_name=msg) | |
197 ) | |
198 | |
199 return _validator | |
200 | |
201 | |
202 def ValidPassword(): | |
203 class _validator(formencode.validators.FancyValidator): | |
204 messages = { | |
205 'invalid_password': | |
206 _(u'Invalid characters (non-ascii) in password') | |
207 } | |
208 | |
209 def validate_python(self, value, state): | |
210 try: | |
211 (value or '').decode('ascii') | |
212 except UnicodeError: | |
213 msg = M(self, 'invalid_password', state) | |
214 raise formencode.Invalid(msg, value, state,) | |
215 return _validator | |
216 | |
217 | |
218 def ValidPasswordsMatch(): | |
219 class _validator(formencode.validators.FancyValidator): | |
220 messages = { | |
221 'password_mismatch': _(u'Passwords do not match'), | |
222 } | |
223 | |
224 def validate_python(self, value, state): | |
225 | |
226 pass_val = value.get('password') or value.get('new_password') | |
227 if pass_val != value['password_confirmation']: | |
228 msg = M(self, 'password_mismatch', state) | |
229 raise formencode.Invalid(msg, value, state, | |
230 error_dict=dict(password_confirmation=msg) | |
231 ) | |
232 return _validator | |
233 | |
234 | |
235 def ValidAuth(): | |
236 class _validator(formencode.validators.FancyValidator): | |
237 messages = { | |
238 'invalid_password': _(u'invalid password'), | |
239 'invalid_username': _(u'invalid user name'), | |
240 'disabled_account': _(u'Your account is disabled') | |
241 } | |
242 | |
243 def validate_python(self, value, state): | |
244 password = value['password'] | |
245 username = value['username'] | |
246 | |
247 if not authenticate(username, password): | |
248 user = User.get_by_username(username) | |
249 if user and user.active is False: | |
250 log.warning('user %s is disabled' % username) | |
251 msg = M(self, 'disabled_account', state) | |
252 raise formencode.Invalid(msg, value, state, | |
253 error_dict=dict(username=msg) | |
254 ) | |
255 else: | |
256 log.warning('user %s failed to authenticate' % username) | |
257 msg = M(self, 'invalid_username', state) | |
258 msg2 = M(self, 'invalid_password', state) | |
259 raise formencode.Invalid(msg, value, state, | |
260 error_dict=dict(username=msg, password=msg2) | |
261 ) | |
262 return _validator | |
263 | |
264 | |
265 def ValidAuthToken(): | |
266 class _validator(formencode.validators.FancyValidator): | |
267 messages = { | |
268 'invalid_token': _(u'Token mismatch') | |
269 } | |
270 | |
271 def validate_python(self, value, state): | |
272 if value != authentication_token(): | |
273 msg = M(self, 'invalid_token', state) | |
274 raise formencode.Invalid(msg, value, state) | |
275 return _validator | |
276 | |
277 | |
278 def ValidRepoName(edit=False, old_data={}): | |
279 class _validator(formencode.validators.FancyValidator): | |
280 messages = { | |
281 'invalid_repo_name': | |
282 _(u'Repository name %(repo)s is disallowed'), | |
283 'repository_exists': | |
284 _(u'Repository named %(repo)s already exists'), | |
285 'repository_in_group_exists': _(u'Repository "%(repo)s" already ' | |
286 'exists in group "%(group)s"'), | |
287 'same_group_exists': _(u'Repositories group with name "%(repo)s" ' | |
288 'already exists') | |
289 } | |
290 | |
291 def _to_python(self, value, state): | |
292 repo_name = repo_name_slug(value.get('repo_name', '')) | |
293 repo_group = value.get('repo_group') | |
294 if repo_group: | |
295 gr = RepoGroup.get(repo_group) | |
296 group_path = gr.full_path | |
297 group_name = gr.group_name | |
298 # value needs to be aware of group name in order to check | |
299 # db key This is an actual just the name to store in the | |
300 # database | |
301 repo_name_full = group_path + RepoGroup.url_sep() + repo_name | |
302 else: | |
303 group_name = group_path = '' | |
304 repo_name_full = repo_name | |
305 | |
306 value['repo_name'] = repo_name | |
307 value['repo_name_full'] = repo_name_full | |
308 value['group_path'] = group_path | |
309 value['group_name'] = group_name | |
310 return value | |
311 | |
312 def validate_python(self, value, state): | |
313 | |
314 repo_name = value.get('repo_name') | |
315 repo_name_full = value.get('repo_name_full') | |
316 group_path = value.get('group_path') | |
317 group_name = value.get('group_name') | |
318 | |
319 if repo_name in [ADMIN_PREFIX, '']: | |
320 msg = M(self, 'invalid_repo_name', state, repo=repo_name) | |
321 raise formencode.Invalid(msg, value, state, | |
322 error_dict=dict(repo_name=msg) | |
323 ) | |
324 | |
325 rename = old_data.get('repo_name') != repo_name_full | |
326 create = not edit | |
327 if rename or create: | |
328 | |
329 if group_path != '': | |
330 if Repository.get_by_repo_name(repo_name_full): | |
331 msg = M(self, 'repository_in_group_exists', state, | |
332 repo=repo_name, group=group_name) | |
333 raise formencode.Invalid(msg, value, state, | |
334 error_dict=dict(repo_name=msg) | |
335 ) | |
336 elif RepoGroup.get_by_group_name(repo_name_full): | |
337 msg = M(self, 'same_group_exists', state, | |
338 repo=repo_name) | |
339 raise formencode.Invalid(msg, value, state, | |
340 error_dict=dict(repo_name=msg) | |
341 ) | |
342 | |
343 elif Repository.get_by_repo_name(repo_name_full): | |
344 msg = M(self, 'repository_exists', state, | |
345 repo=repo_name) | |
346 raise formencode.Invalid(msg, value, state, | |
347 error_dict=dict(repo_name=msg) | |
348 ) | |
349 return value | |
350 return _validator | |
351 | |
352 | |
353 def ValidForkName(*args, **kwargs): | |
354 return ValidRepoName(*args, **kwargs) | |
355 | |
356 | |
357 def SlugifyName(): | |
358 class _validator(formencode.validators.FancyValidator): | |
359 | |
360 def _to_python(self, value, state): | |
361 return repo_name_slug(value) | |
362 | |
363 def validate_python(self, value, state): | |
364 pass | |
365 | |
366 return _validator | |
367 | |
368 | |
369 def ValidCloneUri(): | |
370 from rhodecode.lib.utils import make_ui | |
371 | |
372 def url_handler(repo_type, url, proto, ui=None): | |
373 if repo_type == 'hg': | |
374 from mercurial.httprepo import httprepository, httpsrepository | |
375 if proto == 'https': | |
376 httpsrepository(make_ui('db'), url).capabilities | |
377 elif proto == 'http': | |
378 httprepository(make_ui('db'), url).capabilities | |
379 elif repo_type == 'git': | |
380 #TODO: write a git url validator | |
381 pass | |
382 | |
383 class _validator(formencode.validators.FancyValidator): | |
384 messages = { | |
385 'clone_uri': _(u'invalid clone url'), | |
386 'invalid_clone_uri': _(u'Invalid clone url, provide a ' | |
387 'valid clone http\s url') | |
388 } | |
389 | |
390 def validate_python(self, value, state): | |
391 repo_type = value.get('repo_type') | |
392 url = value.get('clone_uri') | |
393 | |
394 if not url: | |
395 pass | |
396 elif url.startswith('https') or url.startswith('http'): | |
397 _type = 'https' if url.startswith('https') else 'http' | |
398 try: | |
399 url_handler(repo_type, url, _type, make_ui('db')) | |
400 except Exception: | |
401 log.exception('Url validation failed') | |
402 msg = M(self, 'clone_uri') | |
403 raise formencode.Invalid(msg, value, state, | |
404 error_dict=dict(clone_uri=msg) | |
405 ) | |
406 else: | |
407 msg = M(self, 'invalid_clone_uri', state) | |
408 raise formencode.Invalid(msg, value, state, | |
409 error_dict=dict(clone_uri=msg) | |
410 ) | |
411 return _validator | |
412 | |
413 | |
414 def ValidForkType(old_data={}): | |
415 class _validator(formencode.validators.FancyValidator): | |
416 messages = { | |
417 'invalid_fork_type': _(u'Fork have to be the same type as parent') | |
418 } | |
419 | |
420 def validate_python(self, value, state): | |
421 if old_data['repo_type'] != value: | |
422 msg = M(self, 'invalid_fork_type', state) | |
423 raise formencode.Invalid(msg, value, state, | |
424 error_dict=dict(repo_type=msg) | |
425 ) | |
426 return _validator | |
427 | |
428 | |
429 def ValidPerms(type_='repo'): | |
430 if type_ == 'group': | |
431 EMPTY_PERM = 'group.none' | |
432 elif type_ == 'repo': | |
433 EMPTY_PERM = 'repository.none' | |
434 | |
435 class _validator(formencode.validators.FancyValidator): | |
436 messages = { | |
437 'perm_new_member_name': | |
438 _(u'This username or users group name is not valid') | |
439 } | |
440 | |
441 def to_python(self, value, state): | |
442 perms_update = [] | |
443 perms_new = [] | |
444 # build a list of permission to update and new permission to create | |
445 for k, v in value.items(): | |
446 # means new added member to permissions | |
447 if k.startswith('perm_new_member'): | |
448 new_perm = value.get('perm_new_member', False) | |
449 new_member = value.get('perm_new_member_name', False) | |
450 new_type = value.get('perm_new_member_type') | |
451 | |
452 if new_member and new_perm: | |
453 if (new_member, new_perm, new_type) not in perms_new: | |
454 perms_new.append((new_member, new_perm, new_type)) | |
455 elif k.startswith('u_perm_') or k.startswith('g_perm_'): | |
456 member = k[7:] | |
457 t = {'u': 'user', | |
458 'g': 'users_group' | |
459 }[k[0]] | |
460 if member == 'default': | |
461 if value.get('private'): | |
462 # set none for default when updating to | |
463 # private repo | |
464 v = EMPTY_PERM | |
465 perms_update.append((member, v, t)) | |
466 | |
467 value['perms_updates'] = perms_update | |
468 value['perms_new'] = perms_new | |
469 | |
470 # update permissions | |
471 for k, v, t in perms_new: | |
472 try: | |
473 if t is 'user': | |
474 self.user_db = User.query()\ | |
475 .filter(User.active == True)\ | |
476 .filter(User.username == k).one() | |
477 if t is 'users_group': | |
478 self.user_db = UsersGroup.query()\ | |
479 .filter(UsersGroup.users_group_active == True)\ | |
480 .filter(UsersGroup.users_group_name == k).one() | |
481 | |
482 except Exception: | |
483 log.exception('Updated permission failed') | |
484 msg = M(self, 'perm_new_member_type', state) | |
485 raise formencode.Invalid(msg, value, state, | |
486 error_dict=dict(perm_new_member_name=msg) | |
487 ) | |
488 return value | |
489 return _validator | |
490 | |
491 | |
492 def ValidSettings(): | |
493 class _validator(formencode.validators.FancyValidator): | |
494 def _to_python(self, value, state): | |
495 # settings form can't edit user | |
496 if 'user' in value: | |
497 del value['user'] | |
498 return value | |
499 | |
500 def validate_python(self, value, state): | |
501 pass | |
502 return _validator | |
503 | |
504 | |
505 def ValidPath(): | |
506 class _validator(formencode.validators.FancyValidator): | |
507 messages = { | |
508 'invalid_path': _(u'This is not a valid path') | |
509 } | |
510 | |
511 def validate_python(self, value, state): | |
512 if not os.path.isdir(value): | |
513 msg = M(self, 'invalid_path', state) | |
514 raise formencode.Invalid(msg, value, state, | |
515 error_dict=dict(paths_root_path=msg) | |
516 ) | |
517 return _validator | |
518 | |
519 | |
520 def UniqSystemEmail(old_data={}): | |
521 class _validator(formencode.validators.FancyValidator): | |
522 messages = { | |
523 'email_taken': _(u'This e-mail address is already taken') | |
524 } | |
525 | |
526 def _to_python(self, value, state): | |
527 return value.lower() | |
528 | |
529 def validate_python(self, value, state): | |
530 if (old_data.get('email') or '').lower() != value: | |
531 user = User.get_by_email(value, case_insensitive=True) | |
532 if user: | |
533 msg = M(self, 'email_taken', state) | |
534 raise formencode.Invalid(msg, value, state, | |
535 error_dict=dict(email=msg) | |
536 ) | |
537 return _validator | |
538 | |
539 | |
540 def ValidSystemEmail(): | |
541 class _validator(formencode.validators.FancyValidator): | |
542 messages = { | |
543 'non_existing_email': _(u'e-mail "%(email)s" does not exist.') | |
544 } | |
545 | |
546 def _to_python(self, value, state): | |
547 return value.lower() | |
548 | |
549 def validate_python(self, value, state): | |
550 user = User.get_by_email(value, case_insensitive=True) | |
551 if user is None: | |
552 msg = M(self, 'non_existing_email', state, email=value) | |
553 raise formencode.Invalid(msg, value, state, | |
554 error_dict=dict(email=msg) | |
555 ) | |
556 | |
557 return _validator | |
558 | |
559 | |
560 def LdapLibValidator(): | |
561 class _validator(formencode.validators.FancyValidator): | |
562 messages = { | |
563 | |
564 } | |
565 | |
566 def validate_python(self, value, state): | |
567 try: | |
568 import ldap | |
569 ldap # pyflakes silence ! | |
570 except ImportError: | |
571 raise LdapImportError() | |
572 | |
573 return _validator | |
574 | |
575 | |
576 def AttrLoginValidator(): | |
577 class _validator(formencode.validators.FancyValidator): | |
578 messages = { | |
579 'invalid_cn': | |
580 _(u'The LDAP Login attribute of the CN must be specified - ' | |
581 'this is the name of the attribute that is equivalent ' | |
582 'to "username"') | |
583 } | |
584 | |
585 def validate_python(self, value, state): | |
586 if not value or not isinstance(value, (str, unicode)): | |
587 msg = M(self, 'invalid_cn', state) | |
588 raise formencode.Invalid(msg, value, state, | |
589 error_dict=dict(ldap_attr_login=msg) | |
590 ) | |
591 | |
592 return _validator |