Mercurial > kallithea
changeset 8658:4791487dbec1
api: stop using 'Optional', 'OAttr'/'OptionalAttr' classes
There does not seem to be a good reason to use the 'Optional' and
'OptionalAttr' classes.
It makes the code harder to understand. And worse, the 'default value'
specified is not always used, which can thus give false information to
users.
The way Optional was used in the API calls is twofold:
1.either by effectively extracting a value, via Optional.extract(param).
If 'param' was indeed specified by the user, then this would yield that
user-specified value. Otherwise, it would yield the value declared in the
parameter declaration, e.g. param=Optional(defaultvalue).
2.or by checking if a variable is an instance of the Optional class. In case
a user effectively passed a value, this value will not be of the
Optional class. So if a parameter is an object of class Optional, we know
the user did not pass a value, and we can apply some default.
In the declaration of the parameter, the specified default value will only
be used if the 'extract' method is used, i.e. method 1 above.
A simpler way to address this problem of default values is just with Python
default values, using 'None' as magic value if the default will be
calculated inside the method.
The docstrings still specify something like:
type: Optional(bool)
which is humanly readable and does not necessarily refer to a class called
'Optional', so such strings are kept.
author | Thomas De Schampheleire <thomas.de_schampheleire@nokia.com> |
---|---|
date | Fri, 09 Oct 2020 19:43:18 +0200 |
parents | 26fcb8c16c2c |
children | 0f3fbd5fb9ea |
files | kallithea/controllers/api/api.py |
diffstat | 1 files changed, 103 insertions(+), 128 deletions(-) [+] |
line wrap: on
line diff
--- a/kallithea/controllers/api/api.py Fri Oct 09 19:31:44 2020 +0200 +++ b/kallithea/controllers/api/api.py Fri Oct 09 19:43:18 2020 +0200 @@ -36,7 +36,6 @@ HasUserGroupPermissionLevel) from kallithea.lib.exceptions import DefaultUserException, UserGroupsAssignedException from kallithea.lib.utils import action_logger, repo2db_mapper -from kallithea.lib.utils2 import OAttr, Optional from kallithea.lib.vcs.backends.base import EmptyChangeset from kallithea.lib.vcs.exceptions import EmptyRepositoryError from kallithea.model.changeset_status import ChangesetStatusModel @@ -57,10 +56,10 @@ def store_update(updates, attr, name): """ - Stores param in updates dict if it's not instance of Optional - allows easy updates of passed in params + Stores param in updates dict if it's not None (i.e. if user explicitly set + a parameter). This allows easy updates of passed in params. """ - if not isinstance(attr, Optional): + if attr is not None: updates[name] = attr @@ -161,7 +160,7 @@ return args @HasPermissionAnyDecorator('hg.admin') - def pull(self, repoid, clone_uri=Optional(None)): + def pull(self, repoid, clone_uri=None): """ Triggers a pull from remote location on given repo. Can be used to automatically keep remote repos up to date. This command can be executed @@ -197,7 +196,7 @@ ScmModel().pull_changes(repo.repo_name, request.authuser.username, request.ip_addr, - clone_uri=Optional.extract(clone_uri)) + clone_uri=clone_uri) return dict( msg='Pulled from `%s`' % repo.repo_name, repository=repo.repo_name @@ -209,7 +208,7 @@ ) @HasPermissionAnyDecorator('hg.admin') - def rescan_repos(self, remove_obsolete=Optional(False)): + def rescan_repos(self, remove_obsolete=False): """ Triggers rescan repositories action. If remove_obsolete is set than also delete repos that are in database but not in the filesystem. @@ -240,7 +239,7 @@ """ try: - rm_obsolete = Optional.extract(remove_obsolete) + rm_obsolete = remove_obsolete added, removed = repo2db_mapper(ScmModel().repo_scan(), remove_obsolete=rm_obsolete) return {'added': added, 'removed': removed} @@ -295,7 +294,7 @@ ) @HasPermissionAnyDecorator('hg.admin') - def get_ip(self, userid=Optional(OAttr('apiuser'))): + def get_ip(self, userid=None): """ Shows IP address as seen from Kallithea server, together with all defined IP addresses for given user. If userid is not passed data is @@ -321,7 +320,7 @@ } """ - if isinstance(userid, Optional): + if userid is None: userid = request.authuser.user_id user = get_user_or_error(userid) ips = UserIpMap.query().filter(UserIpMap.user == user).all() @@ -352,7 +351,7 @@ """ return Setting.get_server_info() - def get_user(self, userid=Optional(OAttr('apiuser'))): + def get_user(self, userid=None): """ Gets a user by username or user_id, Returns empty result if user is not found. If userid param is skipped it is set to id of user who is @@ -397,12 +396,12 @@ if not HasPermissionAny('hg.admin')(): # make sure normal user does not pass someone else userid, # he is not allowed to do that - if not isinstance(userid, Optional) and userid != request.authuser.user_id: + if userid is not None and userid != request.authuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) - if isinstance(userid, Optional): + if userid is None: userid = request.authuser.user_id user = get_user_or_error(userid) @@ -432,11 +431,11 @@ ] @HasPermissionAnyDecorator('hg.admin') - def create_user(self, username, email, password=Optional(''), - firstname=Optional(''), lastname=Optional(''), - active=Optional(True), admin=Optional(False), - extern_type=Optional(User.DEFAULT_AUTH_TYPE), - extern_name=Optional('')): + def create_user(self, username, email, password='', + firstname='', lastname='', + active=True, admin=False, + extern_type=User.DEFAULT_AUTH_TYPE, + extern_name=''): """ Creates new user. Returns new user object. This command can be executed only using api_key belonging to user with admin rights. @@ -492,15 +491,15 @@ try: user = UserModel().create_or_update( - username=Optional.extract(username), - password=Optional.extract(password), - email=Optional.extract(email), - firstname=Optional.extract(firstname), - lastname=Optional.extract(lastname), - active=Optional.extract(active), - admin=Optional.extract(admin), - extern_type=Optional.extract(extern_type), - extern_name=Optional.extract(extern_name) + username=username, + password=password, + email=email, + firstname=firstname, + lastname=lastname, + active=active, + admin=admin, + extern_type=extern_type, + extern_name=extern_name ) Session().commit() return dict( @@ -512,11 +511,11 @@ raise JSONRPCError('failed to create user `%s`' % (username,)) @HasPermissionAnyDecorator('hg.admin') - def update_user(self, userid, username=Optional(None), - email=Optional(None), password=Optional(None), - firstname=Optional(None), lastname=Optional(None), - active=Optional(None), admin=Optional(None), - extern_type=Optional(None), extern_name=Optional(None)): + def update_user(self, userid, username=None, + email=None, password=None, + firstname=None, lastname=None, + active=None, admin=None, + extern_type=None, extern_name=None): """ updates given user if such user exists. This command can be executed only using api_key belonging to user with admin rights. @@ -686,8 +685,8 @@ ] @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') - def create_user_group(self, group_name, description=Optional(''), - owner=Optional(OAttr('apiuser')), active=Optional(True)): + def create_user_group(self, group_name, description='', + owner=None, active=True): """ Creates new user group. This command can be executed only using api_key belonging to user with admin rights or an user who has create user group @@ -727,12 +726,10 @@ raise JSONRPCError("user group `%s` already exist" % (group_name,)) try: - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) - active = Optional.extract(active) - description = Optional.extract(description) ug = UserGroupModel().create(name=group_name, description=description, owner=owner, active=active) Session().commit() @@ -745,9 +742,9 @@ raise JSONRPCError('failed to create group `%s`' % (group_name,)) # permission check inside - def update_user_group(self, usergroupid, group_name=Optional(''), - description=Optional(''), owner=Optional(None), - active=Optional(True)): + def update_user_group(self, usergroupid, group_name=None, + description=None, owner=None, + active=None): """ Updates given usergroup. This command can be executed only using api_key belonging to user with admin rights or an admin of given user group @@ -786,7 +783,7 @@ if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) - if not isinstance(owner, Optional): + if owner is not None: owner = get_user_or_error(owner) updates = {} @@ -963,8 +960,8 @@ # permission check inside def get_repo(self, repoid, - with_revision_names=Optional(False), - with_pullrequests=Optional(False)): + with_revision_names=False, + with_pullrequests=False): """ Gets an existing repository by it's name or repository_id. Members will return either users_group or user associated to that repository. This command can be @@ -1064,8 +1061,8 @@ for uf in repo.followers ] - data = repo.get_api_data(with_revision_names=Optional.extract(with_revision_names), - with_pullrequests=Optional.extract(with_pullrequests)) + data = repo.get_api_data(with_revision_names=with_revision_names, + with_pullrequests=with_pullrequests) data['members'] = members data['followers'] = followers return data @@ -1112,7 +1109,7 @@ # permission check inside def get_repo_nodes(self, repoid, revision, root_path, - ret_type=Optional('all')): + ret_type='all'): """ returns a list of nodes and it's children in a flat list for a given path at given revision. It's possible to specify ret_type to show only `files` or @@ -1147,7 +1144,7 @@ if not HasRepoPermissionLevel('read')(repo.repo_name): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - ret_type = Optional.extract(ret_type) + ret_type = ret_type _map = {} try: _d, _f = ScmModel().get_nodes(repo, revision, root_path, @@ -1168,13 +1165,13 @@ ) @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository') - def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')), - repo_type=Optional('hg'), description=Optional(''), - private=Optional(False), clone_uri=Optional(None), - landing_rev=Optional('rev:tip'), - enable_statistics=Optional(False), - enable_downloads=Optional(False), - copy_permissions=Optional(False)): + def create_repo(self, repo_name, owner=None, + repo_type=None, description='', + private=False, clone_uri=None, + landing_rev='rev:tip', + enable_statistics=None, + enable_downloads=None, + copy_permissions=False): """ Creates a repository. The repository name contains the full path, but the parent repository group must exist. For example "foo/bar/baz" require the groups @@ -1225,12 +1222,12 @@ """ if not HasPermissionAny('hg.admin')(): - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' ) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) @@ -1239,20 +1236,15 @@ raise JSONRPCError("repo `%s` already exist" % repo_name) defs = Setting.get_default_repo_settings(strip_prefix=True) - if isinstance(private, Optional): - private = defs.get('repo_private') or Optional.extract(private) - if isinstance(repo_type, Optional): + if private is None: + private = defs.get('repo_private') or False + if repo_type is None: repo_type = defs.get('repo_type') - if isinstance(enable_statistics, Optional): + if enable_statistics is None: enable_statistics = defs.get('repo_enable_statistics') - if isinstance(enable_downloads, Optional): + if enable_downloads is None: enable_downloads = defs.get('repo_enable_downloads') - clone_uri = Optional.extract(clone_uri) - description = Optional.extract(description) - landing_rev = Optional.extract(landing_rev) - copy_permissions = Optional.extract(copy_permissions) - try: repo_name_parts = repo_name.split('/') repo_group = None @@ -1291,13 +1283,13 @@ 'failed to create repository `%s`' % (repo_name,)) # permission check inside - def update_repo(self, repoid, name=Optional(None), - owner=Optional(OAttr('apiuser')), - group=Optional(None), - description=Optional(''), private=Optional(False), - clone_uri=Optional(None), landing_rev=Optional('rev:tip'), - enable_statistics=Optional(False), - enable_downloads=Optional(False)): + def update_repo(self, repoid, name=None, + owner=None, + group=None, + description=None, private=None, + clone_uri=None, landing_rev=None, + enable_statistics=None, + enable_downloads=None): """ Updates repo @@ -1324,7 +1316,7 @@ ): raise JSONRPCError('no permission to create (or move) repositories') - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' @@ -1332,7 +1324,7 @@ updates = {} repo_group = group - if not isinstance(repo_group, Optional): + if repo_group is not None: repo_group = get_repo_group_or_error(repo_group) repo_group = repo_group.group_id try: @@ -1358,9 +1350,9 @@ @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository') def fork_repo(self, repoid, fork_name, - owner=Optional(OAttr('apiuser')), - description=Optional(''), copy_permissions=Optional(False), - private=Optional(False), landing_rev=Optional('rev:tip')): + owner=None, + description='', copy_permissions=False, + private=False, landing_rev='rev:tip'): """ Creates a fork of given repo. In case of using celery this will immediately return success message, while fork is going to be created @@ -1413,7 +1405,7 @@ if HasPermissionAny('hg.admin')(): pass elif HasRepoPermissionLevel('read')(repo.repo_name): - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' @@ -1424,7 +1416,7 @@ else: raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) @@ -1443,10 +1435,10 @@ repo_name_full=fork_name, repo_group=repo_group, repo_type=repo.repo_type, - description=Optional.extract(description), - private=Optional.extract(private), - copy_permissions=Optional.extract(copy_permissions), - landing_rev=Optional.extract(landing_rev), + description=description, + private=private, + copy_permissions=copy_permissions, + landing_rev=landing_rev, update_after_clone=False, fork_parent_id=repo.repo_id, ) @@ -1468,7 +1460,7 @@ ) # permission check inside - def delete_repo(self, repoid, forks=Optional('')): + def delete_repo(self, repoid, forks=''): """ Deletes a repository. This command can be executed only using api_key belonging to user with admin rights or regular user that have admin access to repository. @@ -1497,7 +1489,7 @@ raise JSONRPCError('repository `%s` does not exist' % (repoid,)) try: - handle_forks = Optional.extract(forks) + handle_forks = forks _forks_msg = '' _forks = [f for f in repo.forks] if handle_forks == 'detach': @@ -1768,10 +1760,10 @@ ] @HasPermissionAnyDecorator('hg.admin') - def create_repo_group(self, group_name, description=Optional(''), - owner=Optional(OAttr('apiuser')), - parent=Optional(None), - copy_permissions=Optional(False)): + def create_repo_group(self, group_name, description='', + owner=None, + parent=None, + copy_permissions=False): """ Creates a repository group. This command can be executed only using api_key belonging to user with admin rights. @@ -1808,14 +1800,13 @@ if RepoGroup.get_by_group_name(group_name): raise JSONRPCError("repo group `%s` already exist" % (group_name,)) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id - group_description = Optional.extract(description) - parent_group = Optional.extract(parent) - if not isinstance(parent, Optional): - parent_group = get_repo_group_or_error(parent_group) + group_description = description + parent_group = None + if parent is not None: + parent_group = get_repo_group_or_error(parent) - copy_permissions = Optional.extract(copy_permissions) try: repo_group = RepoGroupModel().create( group_name=group_name, @@ -1835,10 +1826,10 @@ raise JSONRPCError('failed to create repo group `%s`' % (group_name,)) @HasPermissionAnyDecorator('hg.admin') - def update_repo_group(self, repogroupid, group_name=Optional(''), - description=Optional(''), - owner=Optional(OAttr('apiuser')), - parent=Optional(None)): + def update_repo_group(self, repogroupid, group_name=None, + description=None, + owner=None, + parent=None): repo_group = get_repo_group_or_error(repogroupid) updates = {} @@ -1902,7 +1893,7 @@ # permission check inside def grant_user_permission_to_repo_group(self, repogroupid, userid, - perm, apply_to_children=Optional('none')): + perm, apply_to_children='none'): """ Grant permission for user on given repository group, or update existing one if found. This command can be executed only using api_key belonging @@ -1944,7 +1935,6 @@ user = get_user_or_error(userid) perm = get_perm_or_error(perm, prefix='group.') - apply_to_children = Optional.extract(apply_to_children) try: RepoGroupModel().add_permission(repo_group=repo_group, @@ -1967,7 +1957,7 @@ # permission check inside def revoke_user_permission_from_repo_group(self, repogroupid, userid, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Revoke permission for user on given repository group. This command can be executed only using api_key belonging to user with admin rights, or @@ -2006,7 +1996,6 @@ raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,)) user = get_user_or_error(userid) - apply_to_children = Optional.extract(apply_to_children) try: RepoGroupModel().delete_permission(repo_group=repo_group, @@ -2030,7 +2019,7 @@ # permission check inside def grant_user_group_permission_to_repo_group( self, repogroupid, usergroupid, perm, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Grant permission for user group on given repository group, or update existing one if found. This command can be executed only using @@ -2077,8 +2066,6 @@ raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) - apply_to_children = Optional.extract(apply_to_children) - try: RepoGroupModel().add_permission(repo_group=repo_group, obj=user_group, @@ -2105,7 +2092,7 @@ # permission check inside def revoke_user_group_permission_from_repo_group( self, repogroupid, usergroupid, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Revoke permission for user group on given repository. This command can be executed only using api_key belonging to user with admin rights, or @@ -2147,8 +2134,6 @@ raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) - apply_to_children = Optional.extract(apply_to_children) - try: RepoGroupModel().delete_permission(repo_group=repo_group, obj=user_group, @@ -2182,7 +2167,7 @@ raise JSONRPCError('gist `%s` does not exist' % (gistid,)) return gist.get_api_data() - def get_gists(self, userid=Optional(OAttr('apiuser'))): + def get_gists(self, userid=None): """ Get all gists for given user. If userid is empty returned gists are for user who called the api @@ -2193,12 +2178,12 @@ if not HasPermissionAny('hg.admin')(): # make sure normal user does not pass someone else userid, # he is not allowed to do that - if not isinstance(userid, Optional) and userid != request.authuser.user_id: + if userid is not None and userid != request.authuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) - if isinstance(userid, Optional): + if userid is None: user_id = request.authuser.user_id else: user_id = get_user_or_error(userid).user_id @@ -2211,9 +2196,9 @@ .order_by(Gist.created_on.desc()) ] - def create_gist(self, files, owner=Optional(OAttr('apiuser')), - gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1), - description=Optional('')): + def create_gist(self, files, owner=None, + gist_type=Gist.GIST_PUBLIC, lifetime=-1, + description=''): """ Creates new Gist @@ -2250,13 +2235,10 @@ """ try: - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) - description = Optional.extract(description) - gist_type = Optional.extract(gist_type) - lifetime = Optional.extract(lifetime) gist = GistModel().create(description=description, owner=owner, @@ -2273,12 +2255,6 @@ log.error(traceback.format_exc()) raise JSONRPCError('failed to create gist') - # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')), - # gist_type=Optional(Gist.GIST_PUBLIC), - # gist_lifetime=Optional(-1), gist_description=Optional('')): - # gist = get_gist_or_error(gistid) - # updates = {} - # permission check inside def delete_gist(self, gistid): """ @@ -2342,7 +2318,7 @@ raise JSONRPCError('Repository is empty') # permission check inside - def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)): + def get_changeset(self, repoid, raw_id, with_reviews=False): repo = get_repo_or_error(repoid) if not HasRepoPermissionLevel('read')(repo.repo_name): raise JSONRPCError('Access denied to repo %s' % repo.repo_name) @@ -2352,7 +2328,6 @@ info = dict(changeset.as_dict()) - with_reviews = Optional.extract(with_reviews) if with_reviews: reviews = ChangesetStatusModel().get_statuses( repo.repo_name, raw_id)