changeset 8658:4791487dbec1

api: stop using 'Optional', 'OAttr'/'OptionalAttr' classes There does not seem to be a good reason to use the 'Optional' and 'OptionalAttr' classes. It makes the code harder to understand. And worse, the 'default value' specified is not always used, which can thus give false information to users. The way Optional was used in the API calls is twofold: 1.either by effectively extracting a value, via Optional.extract(param). If 'param' was indeed specified by the user, then this would yield that user-specified value. Otherwise, it would yield the value declared in the parameter declaration, e.g. param=Optional(defaultvalue). 2.or by checking if a variable is an instance of the Optional class. In case a user effectively passed a value, this value will not be of the Optional class. So if a parameter is an object of class Optional, we know the user did not pass a value, and we can apply some default. In the declaration of the parameter, the specified default value will only be used if the 'extract' method is used, i.e. method 1 above. A simpler way to address this problem of default values is just with Python default values, using 'None' as magic value if the default will be calculated inside the method. The docstrings still specify something like: type: Optional(bool) which is humanly readable and does not necessarily refer to a class called 'Optional', so such strings are kept.
author Thomas De Schampheleire <thomas.de_schampheleire@nokia.com>
date Fri, 09 Oct 2020 19:43:18 +0200
parents 26fcb8c16c2c
children 0f3fbd5fb9ea
files kallithea/controllers/api/api.py
diffstat 1 files changed, 103 insertions(+), 128 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/api/api.py	Fri Oct 09 19:31:44 2020 +0200
+++ b/kallithea/controllers/api/api.py	Fri Oct 09 19:43:18 2020 +0200
@@ -36,7 +36,6 @@
                                 HasUserGroupPermissionLevel)
 from kallithea.lib.exceptions import DefaultUserException, UserGroupsAssignedException
 from kallithea.lib.utils import action_logger, repo2db_mapper
-from kallithea.lib.utils2 import OAttr, Optional
 from kallithea.lib.vcs.backends.base import EmptyChangeset
 from kallithea.lib.vcs.exceptions import EmptyRepositoryError
 from kallithea.model.changeset_status import ChangesetStatusModel
@@ -57,10 +56,10 @@
 
 def store_update(updates, attr, name):
     """
-    Stores param in updates dict if it's not instance of Optional
-    allows easy updates of passed in params
+    Stores param in updates dict if it's not None (i.e. if user explicitly set
+    a parameter). This allows easy updates of passed in params.
     """
-    if not isinstance(attr, Optional):
+    if attr is not None:
         updates[name] = attr
 
 
@@ -161,7 +160,7 @@
         return args
 
     @HasPermissionAnyDecorator('hg.admin')
-    def pull(self, repoid, clone_uri=Optional(None)):
+    def pull(self, repoid, clone_uri=None):
         """
         Triggers a pull from remote location on given repo. Can be used to
         automatically keep remote repos up to date. This command can be executed
@@ -197,7 +196,7 @@
             ScmModel().pull_changes(repo.repo_name,
                                     request.authuser.username,
                                     request.ip_addr,
-                                    clone_uri=Optional.extract(clone_uri))
+                                    clone_uri=clone_uri)
             return dict(
                 msg='Pulled from `%s`' % repo.repo_name,
                 repository=repo.repo_name
@@ -209,7 +208,7 @@
             )
 
     @HasPermissionAnyDecorator('hg.admin')
-    def rescan_repos(self, remove_obsolete=Optional(False)):
+    def rescan_repos(self, remove_obsolete=False):
         """
         Triggers rescan repositories action. If remove_obsolete is set
         than also delete repos that are in database but not in the filesystem.
@@ -240,7 +239,7 @@
         """
 
         try:
-            rm_obsolete = Optional.extract(remove_obsolete)
+            rm_obsolete = remove_obsolete
             added, removed = repo2db_mapper(ScmModel().repo_scan(),
                                             remove_obsolete=rm_obsolete)
             return {'added': added, 'removed': removed}
@@ -295,7 +294,7 @@
             )
 
     @HasPermissionAnyDecorator('hg.admin')
-    def get_ip(self, userid=Optional(OAttr('apiuser'))):
+    def get_ip(self, userid=None):
         """
         Shows IP address as seen from Kallithea server, together with all
         defined IP addresses for given user. If userid is not passed data is
@@ -321,7 +320,7 @@
             }
 
         """
-        if isinstance(userid, Optional):
+        if userid is None:
             userid = request.authuser.user_id
         user = get_user_or_error(userid)
         ips = UserIpMap.query().filter(UserIpMap.user == user).all()
@@ -352,7 +351,7 @@
         """
         return Setting.get_server_info()
 
-    def get_user(self, userid=Optional(OAttr('apiuser'))):
+    def get_user(self, userid=None):
         """
         Gets a user by username or user_id, Returns empty result if user is
         not found. If userid param is skipped it is set to id of user who is
@@ -397,12 +396,12 @@
         if not HasPermissionAny('hg.admin')():
             # make sure normal user does not pass someone else userid,
             # he is not allowed to do that
-            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
+            if userid is not None and userid != request.authuser.user_id:
                 raise JSONRPCError(
                     'userid is not the same as your user'
                 )
 
-        if isinstance(userid, Optional):
+        if userid is None:
             userid = request.authuser.user_id
 
         user = get_user_or_error(userid)
@@ -432,11 +431,11 @@
         ]
 
     @HasPermissionAnyDecorator('hg.admin')
-    def create_user(self, username, email, password=Optional(''),
-                    firstname=Optional(''), lastname=Optional(''),
-                    active=Optional(True), admin=Optional(False),
-                    extern_type=Optional(User.DEFAULT_AUTH_TYPE),
-                    extern_name=Optional('')):
+    def create_user(self, username, email, password='',
+                    firstname='', lastname='',
+                    active=True, admin=False,
+                    extern_type=User.DEFAULT_AUTH_TYPE,
+                    extern_name=''):
         """
         Creates new user. Returns new user object. This command can
         be executed only using api_key belonging to user with admin rights.
@@ -492,15 +491,15 @@
 
         try:
             user = UserModel().create_or_update(
-                username=Optional.extract(username),
-                password=Optional.extract(password),
-                email=Optional.extract(email),
-                firstname=Optional.extract(firstname),
-                lastname=Optional.extract(lastname),
-                active=Optional.extract(active),
-                admin=Optional.extract(admin),
-                extern_type=Optional.extract(extern_type),
-                extern_name=Optional.extract(extern_name)
+                username=username,
+                password=password,
+                email=email,
+                firstname=firstname,
+                lastname=lastname,
+                active=active,
+                admin=admin,
+                extern_type=extern_type,
+                extern_name=extern_name
             )
             Session().commit()
             return dict(
@@ -512,11 +511,11 @@
             raise JSONRPCError('failed to create user `%s`' % (username,))
 
     @HasPermissionAnyDecorator('hg.admin')
-    def update_user(self, userid, username=Optional(None),
-                    email=Optional(None), password=Optional(None),
-                    firstname=Optional(None), lastname=Optional(None),
-                    active=Optional(None), admin=Optional(None),
-                    extern_type=Optional(None), extern_name=Optional(None)):
+    def update_user(self, userid, username=None,
+                    email=None, password=None,
+                    firstname=None, lastname=None,
+                    active=None, admin=None,
+                    extern_type=None, extern_name=None):
         """
         updates given user if such user exists. This command can
         be executed only using api_key belonging to user with admin rights.
@@ -686,8 +685,8 @@
         ]
 
     @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
-    def create_user_group(self, group_name, description=Optional(''),
-                          owner=Optional(OAttr('apiuser')), active=Optional(True)):
+    def create_user_group(self, group_name, description='',
+                          owner=None, active=True):
         """
         Creates new user group. This command can be executed only using api_key
         belonging to user with admin rights or an user who has create user group
@@ -727,12 +726,10 @@
             raise JSONRPCError("user group `%s` already exist" % (group_name,))
 
         try:
-            if isinstance(owner, Optional):
+            if owner is None:
                 owner = request.authuser.user_id
 
             owner = get_user_or_error(owner)
-            active = Optional.extract(active)
-            description = Optional.extract(description)
             ug = UserGroupModel().create(name=group_name, description=description,
                                          owner=owner, active=active)
             Session().commit()
@@ -745,9 +742,9 @@
             raise JSONRPCError('failed to create group `%s`' % (group_name,))
 
     # permission check inside
-    def update_user_group(self, usergroupid, group_name=Optional(''),
-                          description=Optional(''), owner=Optional(None),
-                          active=Optional(True)):
+    def update_user_group(self, usergroupid, group_name=None,
+                          description=None, owner=None,
+                          active=None):
         """
         Updates given usergroup.  This command can be executed only using api_key
         belonging to user with admin rights or an admin of given user group
@@ -786,7 +783,7 @@
             if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
-        if not isinstance(owner, Optional):
+        if owner is not None:
             owner = get_user_or_error(owner)
 
         updates = {}
@@ -963,8 +960,8 @@
 
     # permission check inside
     def get_repo(self, repoid,
-                 with_revision_names=Optional(False),
-                 with_pullrequests=Optional(False)):
+                 with_revision_names=False,
+                 with_pullrequests=False):
         """
         Gets an existing repository by it's name or repository_id. Members will return
         either users_group or user associated to that repository. This command can be
@@ -1064,8 +1061,8 @@
             for uf in repo.followers
         ]
 
-        data = repo.get_api_data(with_revision_names=Optional.extract(with_revision_names),
-                                 with_pullrequests=Optional.extract(with_pullrequests))
+        data = repo.get_api_data(with_revision_names=with_revision_names,
+                                 with_pullrequests=with_pullrequests)
         data['members'] = members
         data['followers'] = followers
         return data
@@ -1112,7 +1109,7 @@
 
     # permission check inside
     def get_repo_nodes(self, repoid, revision, root_path,
-                       ret_type=Optional('all')):
+                       ret_type='all'):
         """
         returns a list of nodes and it's children in a flat list for a given path
         at given revision. It's possible to specify ret_type to show only `files` or
@@ -1147,7 +1144,7 @@
             if not HasRepoPermissionLevel('read')(repo.repo_name):
                 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
-        ret_type = Optional.extract(ret_type)
+        ret_type = ret_type
         _map = {}
         try:
             _d, _f = ScmModel().get_nodes(repo, revision, root_path,
@@ -1168,13 +1165,13 @@
             )
 
     @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
-    def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')),
-                    repo_type=Optional('hg'), description=Optional(''),
-                    private=Optional(False), clone_uri=Optional(None),
-                    landing_rev=Optional('rev:tip'),
-                    enable_statistics=Optional(False),
-                    enable_downloads=Optional(False),
-                    copy_permissions=Optional(False)):
+    def create_repo(self, repo_name, owner=None,
+                    repo_type=None, description='',
+                    private=False, clone_uri=None,
+                    landing_rev='rev:tip',
+                    enable_statistics=None,
+                    enable_downloads=None,
+                    copy_permissions=False):
         """
         Creates a repository. The repository name contains the full path, but the
         parent repository group must exist. For example "foo/bar/baz" require the groups
@@ -1225,12 +1222,12 @@
 
         """
         if not HasPermissionAny('hg.admin')():
-            if not isinstance(owner, Optional):
+            if owner is not None:
                 # forbid setting owner for non-admins
                 raise JSONRPCError(
                     'Only Kallithea admin can specify `owner` param'
                 )
-        if isinstance(owner, Optional):
+        if owner is None:
             owner = request.authuser.user_id
 
         owner = get_user_or_error(owner)
@@ -1239,20 +1236,15 @@
             raise JSONRPCError("repo `%s` already exist" % repo_name)
 
         defs = Setting.get_default_repo_settings(strip_prefix=True)
-        if isinstance(private, Optional):
-            private = defs.get('repo_private') or Optional.extract(private)
-        if isinstance(repo_type, Optional):
+        if private is None:
+            private = defs.get('repo_private') or False
+        if repo_type is None:
             repo_type = defs.get('repo_type')
-        if isinstance(enable_statistics, Optional):
+        if enable_statistics is None:
             enable_statistics = defs.get('repo_enable_statistics')
-        if isinstance(enable_downloads, Optional):
+        if enable_downloads is None:
             enable_downloads = defs.get('repo_enable_downloads')
 
-        clone_uri = Optional.extract(clone_uri)
-        description = Optional.extract(description)
-        landing_rev = Optional.extract(landing_rev)
-        copy_permissions = Optional.extract(copy_permissions)
-
         try:
             repo_name_parts = repo_name.split('/')
             repo_group = None
@@ -1291,13 +1283,13 @@
                 'failed to create repository `%s`' % (repo_name,))
 
     # permission check inside
-    def update_repo(self, repoid, name=Optional(None),
-                    owner=Optional(OAttr('apiuser')),
-                    group=Optional(None),
-                    description=Optional(''), private=Optional(False),
-                    clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
-                    enable_statistics=Optional(False),
-                    enable_downloads=Optional(False)):
+    def update_repo(self, repoid, name=None,
+                    owner=None,
+                    group=None,
+                    description=None, private=None,
+                    clone_uri=None, landing_rev=None,
+                    enable_statistics=None,
+                    enable_downloads=None):
 
         """
         Updates repo
@@ -1324,7 +1316,7 @@
             ):
                 raise JSONRPCError('no permission to create (or move) repositories')
 
-            if not isinstance(owner, Optional):
+            if owner is not None:
                 # forbid setting owner for non-admins
                 raise JSONRPCError(
                     'Only Kallithea admin can specify `owner` param'
@@ -1332,7 +1324,7 @@
 
         updates = {}
         repo_group = group
-        if not isinstance(repo_group, Optional):
+        if repo_group is not None:
             repo_group = get_repo_group_or_error(repo_group)
             repo_group = repo_group.group_id
         try:
@@ -1358,9 +1350,9 @@
 
     @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
     def fork_repo(self, repoid, fork_name,
-                  owner=Optional(OAttr('apiuser')),
-                  description=Optional(''), copy_permissions=Optional(False),
-                  private=Optional(False), landing_rev=Optional('rev:tip')):
+                  owner=None,
+                  description='', copy_permissions=False,
+                  private=False, landing_rev='rev:tip'):
         """
         Creates a fork of given repo. In case of using celery this will
         immediately return success message, while fork is going to be created
@@ -1413,7 +1405,7 @@
         if HasPermissionAny('hg.admin')():
             pass
         elif HasRepoPermissionLevel('read')(repo.repo_name):
-            if not isinstance(owner, Optional):
+            if owner is not None:
                 # forbid setting owner for non-admins
                 raise JSONRPCError(
                     'Only Kallithea admin can specify `owner` param'
@@ -1424,7 +1416,7 @@
         else:
             raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
-        if isinstance(owner, Optional):
+        if owner is None:
             owner = request.authuser.user_id
 
         owner = get_user_or_error(owner)
@@ -1443,10 +1435,10 @@
                 repo_name_full=fork_name,
                 repo_group=repo_group,
                 repo_type=repo.repo_type,
-                description=Optional.extract(description),
-                private=Optional.extract(private),
-                copy_permissions=Optional.extract(copy_permissions),
-                landing_rev=Optional.extract(landing_rev),
+                description=description,
+                private=private,
+                copy_permissions=copy_permissions,
+                landing_rev=landing_rev,
                 update_after_clone=False,
                 fork_parent_id=repo.repo_id,
             )
@@ -1468,7 +1460,7 @@
             )
 
     # permission check inside
-    def delete_repo(self, repoid, forks=Optional('')):
+    def delete_repo(self, repoid, forks=''):
         """
         Deletes a repository. This command can be executed only using api_key belonging
         to user with admin rights or regular user that have admin access to repository.
@@ -1497,7 +1489,7 @@
                 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
         try:
-            handle_forks = Optional.extract(forks)
+            handle_forks = forks
             _forks_msg = ''
             _forks = [f for f in repo.forks]
             if handle_forks == 'detach':
@@ -1768,10 +1760,10 @@
         ]
 
     @HasPermissionAnyDecorator('hg.admin')
-    def create_repo_group(self, group_name, description=Optional(''),
-                          owner=Optional(OAttr('apiuser')),
-                          parent=Optional(None),
-                          copy_permissions=Optional(False)):
+    def create_repo_group(self, group_name, description='',
+                          owner=None,
+                          parent=None,
+                          copy_permissions=False):
         """
         Creates a repository group. This command can be executed only using
         api_key belonging to user with admin rights.
@@ -1808,14 +1800,13 @@
         if RepoGroup.get_by_group_name(group_name):
             raise JSONRPCError("repo group `%s` already exist" % (group_name,))
 
-        if isinstance(owner, Optional):
+        if owner is None:
             owner = request.authuser.user_id
-        group_description = Optional.extract(description)
-        parent_group = Optional.extract(parent)
-        if not isinstance(parent, Optional):
-            parent_group = get_repo_group_or_error(parent_group)
+        group_description = description
+        parent_group = None
+        if parent is not None:
+            parent_group = get_repo_group_or_error(parent)
 
-        copy_permissions = Optional.extract(copy_permissions)
         try:
             repo_group = RepoGroupModel().create(
                 group_name=group_name,
@@ -1835,10 +1826,10 @@
             raise JSONRPCError('failed to create repo group `%s`' % (group_name,))
 
     @HasPermissionAnyDecorator('hg.admin')
-    def update_repo_group(self, repogroupid, group_name=Optional(''),
-                          description=Optional(''),
-                          owner=Optional(OAttr('apiuser')),
-                          parent=Optional(None)):
+    def update_repo_group(self, repogroupid, group_name=None,
+                          description=None,
+                          owner=None,
+                          parent=None):
         repo_group = get_repo_group_or_error(repogroupid)
 
         updates = {}
@@ -1902,7 +1893,7 @@
 
     # permission check inside
     def grant_user_permission_to_repo_group(self, repogroupid, userid,
-                                            perm, apply_to_children=Optional('none')):
+                                            perm, apply_to_children='none'):
         """
         Grant permission for user on given repository group, or update existing
         one if found. This command can be executed only using api_key belonging
@@ -1944,7 +1935,6 @@
 
         user = get_user_or_error(userid)
         perm = get_perm_or_error(perm, prefix='group.')
-        apply_to_children = Optional.extract(apply_to_children)
 
         try:
             RepoGroupModel().add_permission(repo_group=repo_group,
@@ -1967,7 +1957,7 @@
 
     # permission check inside
     def revoke_user_permission_from_repo_group(self, repogroupid, userid,
-                                               apply_to_children=Optional('none')):
+                                               apply_to_children='none'):
         """
         Revoke permission for user on given repository group. This command can
         be executed only using api_key belonging to user with admin rights, or
@@ -2006,7 +1996,6 @@
                 raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 
         user = get_user_or_error(userid)
-        apply_to_children = Optional.extract(apply_to_children)
 
         try:
             RepoGroupModel().delete_permission(repo_group=repo_group,
@@ -2030,7 +2019,7 @@
     # permission check inside
     def grant_user_group_permission_to_repo_group(
             self, repogroupid, usergroupid, perm,
-            apply_to_children=Optional('none')):
+            apply_to_children='none'):
         """
         Grant permission for user group on given repository group, or update
         existing one if found. This command can be executed only using
@@ -2077,8 +2066,6 @@
                 raise JSONRPCError(
                     'user group `%s` does not exist' % (usergroupid,))
 
-        apply_to_children = Optional.extract(apply_to_children)
-
         try:
             RepoGroupModel().add_permission(repo_group=repo_group,
                                             obj=user_group,
@@ -2105,7 +2092,7 @@
     # permission check inside
     def revoke_user_group_permission_from_repo_group(
             self, repogroupid, usergroupid,
-            apply_to_children=Optional('none')):
+            apply_to_children='none'):
         """
         Revoke permission for user group on given repository. This command can be
         executed only using api_key belonging to user with admin rights, or
@@ -2147,8 +2134,6 @@
                 raise JSONRPCError(
                     'user group `%s` does not exist' % (usergroupid,))
 
-        apply_to_children = Optional.extract(apply_to_children)
-
         try:
             RepoGroupModel().delete_permission(repo_group=repo_group,
                                                obj=user_group,
@@ -2182,7 +2167,7 @@
                 raise JSONRPCError('gist `%s` does not exist' % (gistid,))
         return gist.get_api_data()
 
-    def get_gists(self, userid=Optional(OAttr('apiuser'))):
+    def get_gists(self, userid=None):
         """
         Get all gists for given user. If userid is empty returned gists
         are for user who called the api
@@ -2193,12 +2178,12 @@
         if not HasPermissionAny('hg.admin')():
             # make sure normal user does not pass someone else userid,
             # he is not allowed to do that
-            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
+            if userid is not None and userid != request.authuser.user_id:
                 raise JSONRPCError(
                     'userid is not the same as your user'
                 )
 
-        if isinstance(userid, Optional):
+        if userid is None:
             user_id = request.authuser.user_id
         else:
             user_id = get_user_or_error(userid).user_id
@@ -2211,9 +2196,9 @@
                 .order_by(Gist.created_on.desc())
         ]
 
-    def create_gist(self, files, owner=Optional(OAttr('apiuser')),
-                    gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1),
-                    description=Optional('')):
+    def create_gist(self, files, owner=None,
+                    gist_type=Gist.GIST_PUBLIC, lifetime=-1,
+                    description=''):
 
         """
         Creates new Gist
@@ -2250,13 +2235,10 @@
 
         """
         try:
-            if isinstance(owner, Optional):
+            if owner is None:
                 owner = request.authuser.user_id
 
             owner = get_user_or_error(owner)
-            description = Optional.extract(description)
-            gist_type = Optional.extract(gist_type)
-            lifetime = Optional.extract(lifetime)
 
             gist = GistModel().create(description=description,
                                       owner=owner,
@@ -2273,12 +2255,6 @@
             log.error(traceback.format_exc())
             raise JSONRPCError('failed to create gist')
 
-    # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')),
-    #                 gist_type=Optional(Gist.GIST_PUBLIC),
-    #                 gist_lifetime=Optional(-1), gist_description=Optional('')):
-    #     gist = get_gist_or_error(gistid)
-    #     updates = {}
-
     # permission check inside
     def delete_gist(self, gistid):
         """
@@ -2342,7 +2318,7 @@
             raise JSONRPCError('Repository is empty')
 
     # permission check inside
-    def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)):
+    def get_changeset(self, repoid, raw_id, with_reviews=False):
         repo = get_repo_or_error(repoid)
         if not HasRepoPermissionLevel('read')(repo.repo_name):
             raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
@@ -2352,7 +2328,6 @@
 
         info = dict(changeset.as_dict())
 
-        with_reviews = Optional.extract(with_reviews)
         if with_reviews:
             reviews = ChangesetStatusModel().get_statuses(
                                 repo.repo_name, raw_id)