Mercurial > kallithea
comparison kallithea/model/permission.py @ 8685:dff9658bdd98
model: don't import Session from db - import meta and get it from the real source
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Mon, 12 Oct 2020 01:44:10 +0200 |
parents | 2ce710e81e61 |
children | 5e46f73f0d1c |
comparison
equal
deleted
inserted
replaced
8684:89f11587b2dc | 8685:dff9658bdd98 |
---|---|
30 import traceback | 30 import traceback |
31 | 31 |
32 from sqlalchemy.exc import DatabaseError | 32 from sqlalchemy.exc import DatabaseError |
33 | 33 |
34 from kallithea.lib.utils2 import asbool | 34 from kallithea.lib.utils2 import asbool |
35 from kallithea.model.db import Permission, Session, User, UserRepoGroupToPerm, UserRepoToPerm, UserToPerm, UserUserGroupToPerm | 35 from kallithea.model import meta |
36 from kallithea.model.db import Permission, User, UserRepoGroupToPerm, UserRepoToPerm, UserToPerm, UserUserGroupToPerm | |
36 | 37 |
37 | 38 |
38 log = logging.getLogger(__name__) | 39 log = logging.getLogger(__name__) |
39 | 40 |
40 | 41 |
49 """ | 50 """ |
50 for p in Permission.PERMS: | 51 for p in Permission.PERMS: |
51 if not Permission.get_by_key(p[0]): | 52 if not Permission.get_by_key(p[0]): |
52 new_perm = Permission() | 53 new_perm = Permission() |
53 new_perm.permission_name = p[0] | 54 new_perm.permission_name = p[0] |
54 Session().add(new_perm) | 55 meta.Session().add(new_perm) |
55 | 56 |
56 def create_default_permissions(self, user, force=False): | 57 def create_default_permissions(self, user, force=False): |
57 """ | 58 """ |
58 Create missing default permissions for user. If force is set, the default | 59 Create missing default permissions for user. If force is set, the default |
59 permissions for the user are reset, otherwise only missing permissions are | 60 permissions for the user are reset, otherwise only missing permissions are |
76 defined_perms_groups = set(_get_group(x.permission.permission_name) for x in perms) | 77 defined_perms_groups = set(_get_group(x.permission.permission_name) for x in perms) |
77 log.debug('GOT ALREADY DEFINED:%s', perms) | 78 log.debug('GOT ALREADY DEFINED:%s', perms) |
78 | 79 |
79 if force: | 80 if force: |
80 for perm in perms: | 81 for perm in perms: |
81 Session().delete(perm) | 82 meta.Session().delete(perm) |
82 Session().commit() | 83 meta.Session().commit() |
83 defined_perms_groups = [] | 84 defined_perms_groups = [] |
84 # For every default permission that needs to be created, we check if | 85 # For every default permission that needs to be created, we check if |
85 # its group is already defined. If it's not, we create default permission. | 86 # its group is already defined. If it's not, we create default permission. |
86 for perm_name in Permission.DEFAULT_USER_PERMISSIONS: | 87 for perm_name in Permission.DEFAULT_USER_PERMISSIONS: |
87 gr = _get_group(perm_name) | 88 gr = _get_group(perm_name) |
88 if gr not in defined_perms_groups: | 89 if gr not in defined_perms_groups: |
89 log.debug('GR:%s not found, creating permission %s', | 90 log.debug('GR:%s not found, creating permission %s', |
90 gr, perm_name) | 91 gr, perm_name) |
91 new_perm = _make_perm(perm_name) | 92 new_perm = _make_perm(perm_name) |
92 Session().add(new_perm) | 93 meta.Session().add(new_perm) |
93 | 94 |
94 def update(self, form_result): | 95 def update(self, form_result): |
95 perm_user = User.get_by_username(username=form_result['perm_user_name']) | 96 perm_user = User.get_by_username(username=form_result['perm_user_name']) |
96 | 97 |
97 try: | 98 try: |
111 # are somehow missing | 112 # are somehow missing |
112 u2p = UserToPerm.query() \ | 113 u2p = UserToPerm.query() \ |
113 .filter(UserToPerm.user == perm_user) \ | 114 .filter(UserToPerm.user == perm_user) \ |
114 .all() | 115 .all() |
115 for p in u2p: | 116 for p in u2p: |
116 Session().delete(p) | 117 meta.Session().delete(p) |
117 # create fresh set of permissions | 118 # create fresh set of permissions |
118 for def_perm_key in ['default_repo_perm', | 119 for def_perm_key in ['default_repo_perm', |
119 'default_group_perm', | 120 'default_group_perm', |
120 'default_user_group_perm', | 121 'default_user_group_perm', |
121 'default_repo_create', | 122 'default_repo_create', |
122 'default_user_group_create', | 123 'default_user_group_create', |
123 'default_fork', | 124 'default_fork', |
124 'default_register', | 125 'default_register', |
125 'default_extern_activate']: | 126 'default_extern_activate']: |
126 p = _make_new(perm_user, form_result[def_perm_key]) | 127 p = _make_new(perm_user, form_result[def_perm_key]) |
127 Session().add(p) | 128 meta.Session().add(p) |
128 | 129 |
129 # stage 3 update all default permissions for repos if checked | 130 # stage 3 update all default permissions for repos if checked |
130 if form_result['overwrite_default_repo']: | 131 if form_result['overwrite_default_repo']: |
131 _def_name = form_result['default_repo_perm'].split('repository.')[-1] | 132 _def_name = form_result['default_repo_perm'].split('repository.')[-1] |
132 _def = Permission.get_by_key('repository.' + _def_name) | 133 _def = Permission.get_by_key('repository.' + _def_name) |
155 for g2p in UserUserGroupToPerm.query() \ | 156 for g2p in UserUserGroupToPerm.query() \ |
156 .filter(UserUserGroupToPerm.user == perm_user) \ | 157 .filter(UserUserGroupToPerm.user == perm_user) \ |
157 .all(): | 158 .all(): |
158 g2p.permission = _def | 159 g2p.permission = _def |
159 | 160 |
160 Session().commit() | 161 meta.Session().commit() |
161 except (DatabaseError,): | 162 except (DatabaseError,): |
162 log.error(traceback.format_exc()) | 163 log.error(traceback.format_exc()) |
163 Session().rollback() | 164 meta.Session().rollback() |
164 raise | 165 raise |