changeset 491:fefffd6fd5f4 celery

Added some more tests, rewrite testing schema, to autogenerate fresh db, new index. cleaned up some codes that involves testing.
author Marcin Kuzminski <marcin@python-works.com>
date Tue, 21 Sep 2010 01:08:01 +0200
parents 74b9bed279ae
children a5a17000e45b
files pylons_app/config/environment.py pylons_app/lib/db_manage.py pylons_app/lib/indexers/daemon.py pylons_app/lib/utils.py pylons_app/tests/__init__.py pylons_app/tests/functional/test_admin_settings.py pylons_app/websetup.py
diffstat 7 files changed, 159 insertions(+), 108 deletions(-) [+]
line wrap: on
line diff
--- a/pylons_app/config/environment.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/config/environment.py	Tue Sep 21 01:08:01 2010 +0200
@@ -51,8 +51,10 @@
     config['pylons.strict_tmpl_context'] = True
     test = os.path.split(config['__file__'])[-1] == 'test.ini'
     if test:
-        from pylons_app.lib.utils import make_test_env
-        make_test_env()
+        from pylons_app.lib.utils import create_test_env, create_test_index
+        create_test_env('/tmp', config)
+        create_test_index('/tmp/*', True)
+        
     #MULTIPLE DB configs
     # Setup the SQLAlchemy database engine
     if config['debug'] and not test:
--- a/pylons_app/lib/db_manage.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/lib/db_manage.py	Tue Sep 21 01:08:01 2010 +0200
@@ -43,7 +43,7 @@
 log = logging.getLogger(__name__)
 
 class DbManage(object):
-    def __init__(self, log_sql, dbname,tests=False):
+    def __init__(self, log_sql, dbname, tests=False):
         self.dbname = dbname
         self.tests = tests
         dburi = 'sqlite:////%s' % jn(ROOT, self.dbname)
@@ -68,7 +68,7 @@
         if override:
             log.info("database exisist and it's going to be destroyed")
             if self.tests:
-                destroy=True
+                destroy = True
             else:
                 destroy = ask_ok('Are you sure to destroy old database ? [y/n]')
             if not destroy:
@@ -84,15 +84,17 @@
             import getpass
             username = raw_input('Specify admin username:')
             password = getpass.getpass('Specify admin password:')
-            self.create_user(username, password, True)
+            email = raw_input('Specify admin email:')
+            self.create_user(username, password, email, True)
         else:
             log.info('creating admin and regular test users')
-            self.create_user('test_admin', 'test', True)
-            self.create_user('test_regular', 'test', False)
+            self.create_user('test_admin', 'test', 'test_admin@mail.com', True)
+            self.create_user('test_regular', 'test', 'test_regular@mail.com', False)
+            self.create_user('test_regular2', 'test', 'test_regular2@mail.com', False)
             
         
     
-    def config_prompt(self,test_repo_path=''):
+    def config_prompt(self, test_repo_path=''):
         log.info('Setting up repositories config')
         
         if not self.tests and not test_repo_path:
@@ -102,7 +104,7 @@
             path = test_repo_path
             
         if not os.path.isdir(path):
-            log.error('You entered wrong path: %s',path)
+            log.error('You entered wrong path: %s', path)
             sys.exit()
         
         hooks1 = HgAppUi()
@@ -166,14 +168,14 @@
             raise        
         log.info('created ui config')
                     
-    def create_user(self, username, password, admin=False):
+    def create_user(self, username, password, email='', admin=False):
         log.info('creating administrator user %s', username)
         new_user = User()
         new_user.username = username
         new_user.password = get_crypt_password(password)
         new_user.name = 'Hg'
         new_user.lastname = 'Admin'
-        new_user.email = 'admin@localhost.com'
+        new_user.email = email
         new_user.admin = admin
         new_user.active = True
         
--- a/pylons_app/lib/indexers/daemon.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/lib/indexers/daemon.py	Tue Sep 21 01:08:01 2010 +0200
@@ -44,7 +44,7 @@
 log = logging.getLogger('whooshIndexer')
 # create logger
 log.setLevel(logging.DEBUG)
-
+log.propagate = False
 # create console handler and set level to debug
 ch = logging.StreamHandler()
 ch.setLevel(logging.DEBUG)
--- a/pylons_app/lib/utils.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/lib/utils.py	Tue Sep 21 01:08:01 2010 +0200
@@ -16,7 +16,6 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 # MA  02110-1301, USA.
-import shutil
 
 """
 Created on April 18, 2010
@@ -32,8 +31,7 @@
 from vcs.utils.lazy import LazyProperty
 import logging
 import os
-from os.path import dirname as dn, join as jn
-import tarfile
+
 log = logging.getLogger(__name__)
 
 
@@ -367,67 +365,75 @@
     def __ne__(self, other):
         return not self == other
 
-def make_test_env():
-    """Makes a fresh database from base64+zlib dump and 
+
+#===============================================================================
+# TEST FUNCTIONS
+#===============================================================================
+def create_test_index(repo_location, full_index):
+    """Makes default test index
+    @param repo_location:
+    @param full_index:
+    """
+    from pylons_app.lib.indexers import daemon
+    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
+    from pylons_app.lib.indexers.pidlock import DaemonLock, LockHeld
+    from pylons_app.lib.indexers import IDX_LOCATION
+    import shutil
+    
+    if os.path.exists(IDX_LOCATION):
+        shutil.rmtree(IDX_LOCATION)
+         
+    try:
+        l = DaemonLock()
+        WhooshIndexingDaemon(repo_location=repo_location)\
+            .run(full_index=full_index)
+        l.release()
+    except LockHeld:
+        pass    
+    
+def create_test_env(repos_test_path, config):
+    """Makes a fresh database and 
     install test repository into tmp dir
     """
-    new_db_dump = """
-eJztXN1vE8sVn9nxR+wAIXDpFiiXjSAXfEOc2ElwQkVLPjYf5NNOAklUydrYG3tv1t5ldx0nuUJV\noL
-cPrVr1X7jSfUJ96nMfK1Xty23VqlWlPlRIlahUXbXqFUL0pTNjx5614xAoKEDmJ3t2zpkzM2fO\neHe+
-zno+PqU5qrRmWDnFkXqAB0AIbkkSAKANf8+BKprwFzI0G28ECXQ+PufFEYT+Tehz6L/oaSnK\nwcFxGP
-igFQfHjuMg4CehH7UA9Af0Y2ShWdSPLmOSg+N9x7U9eKf9PiC2nIWm4mTtri4nZ3Z9DE/5\nfOD0+RZY
-VFdXFVstWHoXPOPFvDbKU3TdKCbNgp39GLZ5MPtKW5WtWKmstqFmtqVtzZRWt6NQRFjk\ngkhESJ6kbe
-trim6rcFTAdcfuwqxhrNuprJLPqBnLKJhhSzWNpK1tq+aWkzXyN8wt3cjbScU0w7q2\nGqbyVSHYAXE5
-kSv15RTMtOKo2YxUikjf+SgKg4Dc/38C6Dn6Gn2FnqDH6K+Y5ODgeGfhRRD6/ST0\n+Ujo9ZLQ4yEhQi
-QUBBJCeFy4BLywHaCfCEXM+AJHOWpx39sMrux4IbzQ3gMc1XaSlpop6IoVvRxV\nLke6L4/cmx7vjedG
-4qmVmXvTW5nl7PDaSmFEXR6ejC+YVrpnsNi1fn17fHldj06p6YH84tzaGKBF\n5ZWcSq66Uorn8Iih5W
-/ZBolqejhl5O57mkEPqf6sOFCq3lRsu2hYaayHrTplJeJD/Uu3p7u3Er19\nS4sb26PmemQiE54vLKfn
-I8Wx2/Nd+XurmbH4TOpupHdk25I/sYbmCgDQstK0oHLdpWGmc1U3MqR6\nbICF123RHb/QDNpIm1rFnk
-HaJiWd0/Llpgzq41lzIJMrjMXi2/JmdyGxMDKnjs1FR9WMcduMb3TZ\nfZuZTXVs1uiS53NxY9yan4Vw
-PDNICqEl3dKNlKJnDdshbYh2R7o7uwc6I1EpGr3RHbvREwn3D/T3\nd/fuBFAzaHdpUu7csi6Tw4ou94
-zOLt3JxTNZo7g8muvV1Lg6sNj/SX4dD7srqenpfCJ6d3g5vKRM\njq/Ob3VHIXgJXaKx8PWBvoHrvfdg
-MzhPVDl/vgek1TWloO927tbUdsqeNzfurK5Frq+v5NbHZ1bG\nCnZxdnxxbGStmOudnwub6+rQYNxZku
-Wh28Ph9Nos2C3EfblVvhJlyPjvRY+Z8f91dzUHB8fhYf/x\nv3T/PwL47v87+iX6I45ycHC8dWhFV6Br
-7ukVUQ/cYzroOYnaXZLoBGqD1TmW0IzOw/IUAJL9v6Dg\nA+jP6Ofo+yiBelFA+IvwC2EFMzmOCBJBD/
-huMZsJ41+MZjuqFVYKjpFUUo62oThqosyV8mpRKtg4\nUtScrJTNdCqmSeNGwZFIFqmcRTPydwIeMPwp
-W2ZOyRcU/SVLLWViym1v8oDOLrbcvJGvFpbWbGVV\nV9NhvweEZCyWslRcWVnINGzNMawtiXJxaRX5kM
-8D+rqq8lZFtjaX+i2vB1zoxKL0dhrPSHSmj6u3\nFCzV4cH6fbuavSTFFEJp3KCUatsdqEa4aGkOqyel
-y8IhwQM6BhhhrE2akSVkWfQKxKJ9jGhN8/NG\nWZCM/0H0q5r9P/Q79FvM5ODgeOtBZvLBIAkDARI2Nb
-3E/h/O7wdDAAzBj+Cy8IXwpfAc/eZlat9R\noF+8eBE+bHXIgzSbIQcTyYJWiQjDCXlwQZYWBoemZKnC
-lq4GAwUtqaWliZkFeUxOSDOzC9LM4tTU\nNYmm2GqKPqEX5KWFMmtd3WLJDUUvqCyDjhKqNDQ7OyUPzh
-DmXGJiejCxLE3Ky9JVWl2IsBdnJuKL\nMssZHpeHJymjXMjEjHS1+5oUCYWCoYjgE+WLEGj5tLpp39Px
-MzlJhjtKJytNSkYqUfRgHPlFUYQ/\nMKhZyPhm08DjMgdlUVPgSENj4DSyN1hp6u6Er8Kob3hplGEYrg
-J2dxsrDLrZ6EpO6kYGlzCCdV2Y\nmJbrjVlS2G1Ohlc2aJ012TSqozuJLYpoiK0f8vjEm2Ij61MLJiP0
-4g15XywapRffzpTPL166BB8k\naQeZqpXTbBv/4Gwm6nd1FpNAuqxKNuo4RsLdf1W+buQzrjSXkV1VuO
-zjTgmG+vw+ceJSo5Yzmicj\nDNFE7n8BfQnQ33DAwcHxLqMFLxHEs47mkIGYrKM+xAsBMYZXBnquvLDC
-D4Wsmne0FF3/kPm/gL6m\n8//DVp6Dg+PNo3b+7wOPAHgEH8F/CFfRT9GvD1u/vbFzv8kvdnTAhxF2nW
-GrjqPlM3YNGdxrzbGb\nSOZuLN1o9uaScc3RXCnuVYhr+lZTi2sCd+C08iz4ZsAnxjtesAapZIrUMJpv
-Bl8me7SGcfxBqtkv\ntrfDzwLU+pWdJU212fgJl93ZFGJ06qPWwNg0rWLkuuVPwxm2RfcS2YVOWrVTlm
-a61o6uXimr4bJ4\npfp67r6So7MJeWJshhRcWf1ICXlUTsgzw/L87vpuj4XRrubsOjN2zCdOtjfqJNac
-yQhLtcSOHzhj\nlKVOlsb/fwL0FAccHBzvLQJIhHRpIJAYXRPQ8R+i3wP84eDgeNfRCX3gAoRjGyk7Sc
-78BUDPZdlJ\n0ZphSbvJZPyH6D8Afzg4ON5/HEMX4O7tD0v3/3OAPxwcHEcG1f0/hJ4A9Az9C184ODje
-Q/gQ+WcP\nKPgEevX5IL0GyPiP0Fdl/7/D1pKDg+PNYe/3f+j4/wSP/88OWz8ODo43Ab+H3O0CKl19Qu
-kaoPN/\nD/gcgM+FD4W7ws8OW886PNg+UTp4jlX8aJOOQR0a2XhrnVftbkrFubZM7+dkewA/zgYS9a6x
-1erq\nXWRr0thDZLdfJ3uU7PI+rXcMfYWT6Bq33WtSrVNprGW/Y2VXUyIsdSp28sAZoyx1+kGulXqTfx
-aq\ndrduZOxK5Ex9RxN2pZcx8So9XEozKw4D1Vdn6v0RFLdfeolM0r/U2d9buqRbvekZ/iv0IpulqrYr
-\nl9sRo+rBEAyR+x8/ADg4OI4gyPyf3/8cHEcTJf+fpwB/ODg4jgSaoBfQ/QB+/s/BcSRR3f+H6Bng\n
-e/8cHEcHpf1/CI+jHwEP3AToLtx8e9/9e//w8Hun6bHGDz+tvE+3uwfOxsW69+nYYw2WfjPHGtX9\n5A
-MdfNQo9P+eS7youNdyVuJq4ot2zRsdnLgLCYYip/b7w5jKqUX51IREv4F/FJ7YBy96ja963sJS\n34yd
-OXDGKEud/R8efZUt\n
-"""    
-    newdb = open('test.db', 'wb')
-    newdb.write(new_db_dump.decode('base64').decode('zlib'))
-    newdb.close()
+    from pylons_app.lib.db_manage import DbManage
+    import tarfile
+    import shutil
+    from os.path import dirname as dn, join as jn, abspath
+    
+    log = logging.getLogger('TestEnvCreator')
+    # create logger
+    log.setLevel(logging.DEBUG)
+    log.propagate = True
+    # create console handler and set level to debug
+    ch = logging.StreamHandler()
+    ch.setLevel(logging.DEBUG)
+    
+    # create formatter
+    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
     
+    # add formatter to ch
+    ch.setFormatter(formatter)
+    
+    # add ch to logger
+    log.addHandler(ch)
+    
+    #PART ONE create db
+    log.debug('making test db')
+    dbname = config['sqlalchemy.db1.url'].split('/')[-1]
+    dbmanage = DbManage(log_sql=True, dbname=dbname, tests=True)
+    dbmanage.create_tables(override=True)
+    dbmanage.config_prompt(repos_test_path)
+    dbmanage.create_default_user()
+    dbmanage.admin_prompt()
+    dbmanage.create_permissions()
+    dbmanage.populate_default_permissions()
     
     #PART TWO make test repo
+    log.debug('making test vcs repo')
     if os.path.isdir('/tmp/vcs_test'):
         shutil.rmtree('/tmp/vcs_test')
         
-    cur_dir = dn(dn(os.path.abspath(__file__)))
+    cur_dir = dn(dn(abspath(__file__)))
     tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test.tar.gz"))
     tar.extractall('/tmp')
     tar.close()
-    
-    
-    
--- a/pylons_app/tests/__init__.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/tests/__init__.py	Tue Sep 21 01:08:01 2010 +0200
@@ -16,9 +16,9 @@
 from webtest import TestApp
 import os
 from pylons_app.model import meta
-from pylons_app.lib.indexers import IDX_LOCATION
 import logging
-import shutil
+
+
 log = logging.getLogger(__name__) 
 
 import pylons.test
@@ -26,24 +26,8 @@
 __all__ = ['environ', 'url', 'TestController']
 
 # Invoke websetup with the current config file
-#SetupCommand('setup-app').run([pylons.test.pylonsapp.config['__file__']])
-def create_index(repo_location, full_index):
-    from pylons_app.lib.indexers import daemon
-    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
-    from pylons_app.lib.indexers.pidlock import DaemonLock, LockHeld
-    
-    try:
-        l = DaemonLock()
-        WhooshIndexingDaemon(repo_location=repo_location)\
-            .run(full_index=full_index)
-        l.release()
-    except LockHeld:
-        pass    
-    
-if os.path.exists(IDX_LOCATION):
-    shutil.rmtree(IDX_LOCATION)
-    
-create_index('/tmp/*', True)    
+#SetupCommand('setup-app').run([config_file])
+
 
 environ = {}
 
@@ -57,14 +41,11 @@
         self.sa = meta.Session
 
         TestCase.__init__(self, *args, **kwargs)
-
     
-    def log_user(self):
+    def log_user(self, username='test_admin', password='test'):
         response = self.app.post(url(controller='login', action='index'),
-                                 {'username':'test_admin',
-                                  'password':'test'})
+                                 {'username':username,
+                                  'password':password})
         assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
         assert response.session['hg_app_user'].username == 'test_admin', 'wrong logged in user'
         return response.follow()
-
- 
--- a/pylons_app/tests/functional/test_admin_settings.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/tests/functional/test_admin_settings.py	Tue Sep 21 01:08:01 2010 +0200
@@ -1,4 +1,5 @@
 from pylons_app.tests import *
+from pylons_app.model.db import User
 
 class TestSettingsController(TestController):
 
@@ -41,3 +42,75 @@
 
     def test_edit_as_xml(self):
         response = self.app.get(url('formatted_admin_edit_setting', setting_id=1, format='xml'))
+
+    def test_my_account(self):
+        self.log_user()
+        response = self.app.get(url('admin_settings_my_account'))
+        print response
+        assert 'value="test_admin' in response.body
+        
+        
+            
+    def test_my_account_update(self):
+        self.log_user()
+        new_email = 'new@mail.pl'
+        response = self.app.post(url('admin_settings_my_account_update'), params=dict(
+                                                            _method='put',
+                                                            username='test_admin',
+                                                            new_password='test',
+                                                            password='',
+                                                            name='NewName',
+                                                            lastname='NewLastname',
+                                                            email=new_email,))
+        response.follow()
+        print response
+    
+        print 'x' * 100
+        print response.session
+        assert 'Your account was updated succesfully' in response.session['flash'][0][1], 'no flash message about success of change'
+        user = self.sa.query(User).filter(User.username == 'test_admin').one()
+        assert user.email == new_email , 'incorrect user email after update got %s vs %s' % (user.email, new_email)
+    
+    def test_my_account_update_own_email_ok(self):
+        self.log_user()
+                
+        new_email = 'new@mail.pl'
+        response = self.app.post(url('admin_settings_my_account_update'), params=dict(
+                                                            _method='put',
+                                                            username='test_admin',
+                                                            new_password='test',
+                                                            name='NewName',
+                                                            lastname='NewLastname',
+                                                            email=new_email,))
+        print response
+                
+    def test_my_account_update_err_email_exists(self):
+        self.log_user()
+                
+        new_email = 'test_regular@mail.com'#already exisitn email
+        response = self.app.post(url('admin_settings_my_account_update'), params=dict(
+                                                            _method='put',
+                                                            username='test_admin',
+                                                            new_password='test',
+                                                            name='NewName',
+                                                            lastname='NewLastname',
+                                                            email=new_email,))
+        print response
+        
+        assert 'That e-mail address is already taken' in response.body, 'Missing error message about existing email'
+        
+        
+    def test_my_account_update_err(self):
+        self.log_user()
+                
+        new_email = 'newmail.pl'
+        response = self.app.post(url('admin_settings_my_account_update'), params=dict(
+                                                            _method='put',
+                                                            username='test_regular2',
+                                                            new_password='test',
+                                                            name='NewName',
+                                                            lastname='NewLastname',
+                                                            email=new_email,))
+        print response
+        assert 'An email address must contain a single @' in response.body, 'Missing error message about wrong email'
+        assert 'This username already exists' in response.body, 'Missing error message about existing user'
--- a/pylons_app/websetup.py	Mon Sep 20 22:55:55 2010 +0200
+++ b/pylons_app/websetup.py	Tue Sep 21 01:08:01 2010 +0200
@@ -19,20 +19,7 @@
     tests = False
     REPO_TEST_PATH = None
     
-    dbname = os.path.split(conf['sqlalchemy.db1.url'])[-1]
-#    filename = os.path.split(conf.filename)[-1]
-#    if filename == 'test.ini':
-#        uniq_suffix = str(int(mktime(datetime.datetime.now().timetuple())))
-#        REPO_TEST_PATH = '/tmp/hg_app_test_%s' % uniq_suffix
-#        
-#        if not os.path.isdir(REPO_TEST_PATH):
-#            os.mkdir(REPO_TEST_PATH)
-#            cur_dir = dn(os.path.abspath(__file__))
-#            tar = tarfile.open(jn(cur_dir,'tests',"vcs_test.tar.gz"))
-#            tar.extractall(REPO_TEST_PATH)
-#            tar.close()
-#            
-#        tests = True    
+    dbname = os.path.split(conf['sqlalchemy.db1.url'])[-1] 
     
     dbmanage = DbManage(log_sql, dbname, tests)
     dbmanage.create_tables(override=True)