changeset 2329:20e307d5250f codereview

Added email-map for alternative email addresses for users
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 23 May 2012 00:11:45 +0200
parents d3f1b71099ab
children b0fef8a77568
files rhodecode/model/db.py rhodecode/tests/test_models.py
diffstat 2 files changed, 98 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/model/db.py	Tue May 22 22:15:29 2012 +0200
+++ b/rhodecode/model/db.py	Wed May 23 00:11:45 2012 +0200
@@ -48,6 +48,7 @@
 
 from rhodecode.model.meta import Base, Session
 import hashlib
+from sqlalchemy.exc import DatabaseError
 
 
 log = logging.getLogger(__name__)
@@ -381,8 +382,23 @@
 
         if cache:
             q = q.options(FromCache("sql_cache_short",
-                                    "get_api_key_%s" % email))
-        return q.scalar()
+                                    "get_email_key_%s" % email))
+
+        ret = q.scalar()
+        if ret is None:
+            q = UserEmailMap.query()
+            # try fetching in alternate email map
+            if case_insensitive:
+                q = q.filter(UserEmailMap.email.ilike(email))
+            else:
+                q = q.filter(UserEmailMap.email == email)
+            q = q.options(joinedload(UserEmailMap.user))
+            if cache:
+                q = q.options(FromCache("sql_cache_short",
+                                        "get_email_map_key_%s" % email))
+            ret = getattr(q.scalar(), 'user', None)
+
+        return ret
 
     def update_lastlogin(self):
         """Update user lastlogin"""
@@ -403,6 +419,38 @@
         )
 
 
+class UserEmailMap(Base, BaseModel):
+    __tablename__ = 'user_email_map'
+    __table_args__ = (
+        UniqueConstraint('email'),
+        {'extend_existing': True, 'mysql_engine':'InnoDB',
+         'mysql_charset': 'utf8'}
+    )
+    __mapper_args__ = {}
+
+    email_id = Column("email_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
+    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
+    _email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
+
+    user = relationship('User')
+
+    @validates('_email')
+    def validate_email(self, key, email):
+        # check if this email is not main one
+        main_email = Session.query(User).filter(User.email == email).scalar()
+        if main_email is not None:
+            raise AttributeError('email %s is present is user table' % email)
+        return email
+
+    @hybrid_property
+    def email(self):
+        return self._email
+
+    @email.setter
+    def email(self, val):
+        self._email = val.lower() if val else None
+
+
 class UserLog(Base, BaseModel):
     __tablename__ = 'user_logs'
     __table_args__ = (
--- a/rhodecode/tests/test_models.py	Tue May 22 22:15:29 2012 +0200
+++ b/rhodecode/tests/test_models.py	Wed May 23 00:11:45 2012 +0200
@@ -6,8 +6,8 @@
 from rhodecode.model.repo import RepoModel
 from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
     UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
-    Repository
-from sqlalchemy.exc import IntegrityError
+    Repository, UserEmailMap
+from sqlalchemy.exc import IntegrityError, DatabaseError
 from rhodecode.model.user import UserModel
 
 from rhodecode.model.meta import Session
@@ -181,7 +181,8 @@
         super(TestUser, self).__init__(methodName=methodName)
 
     def test_create_and_remove(self):
-        usr = UserModel().create_or_update(username=u'test_user', password=u'qweqwe',
+        usr = UserModel().create_or_update(username=u'test_user', 
+                                           password=u'qweqwe',
                                      email=u'u232@rhodecode.org',
                                      name=u'u1', lastname=u'u1')
         Session.commit()
@@ -201,6 +202,50 @@
 
         self.assertEqual(UsersGroupMember.query().all(), [])
 
+    def test_additonal_email_as_main(self):
+        usr = UserModel().create_or_update(username=u'test_user', 
+                                           password=u'qweqwe',
+                                     email=u'main_email@rhodecode.org',
+                                     name=u'u1', lastname=u'u1')
+        Session.commit()
+
+        def do():
+            m = UserEmailMap()
+            m.email = u'main_email@rhodecode.org'
+            m.user = usr
+            Session.add(m)
+            Session.commit()
+        self.assertRaises(AttributeError, do)
+
+        UserModel().delete(usr.user_id)
+        Session.commit()
+
+    def test_extra_email_map(self):
+        usr = UserModel().create_or_update(username=u'test_user', 
+                                           password=u'qweqwe',
+                                     email=u'main_email@rhodecode.org',
+                                     name=u'u1', lastname=u'u1')
+        Session.commit()
+
+        m = UserEmailMap()
+        m.email = u'main_email2@rhodecode.org'
+        m.user = usr
+        Session.add(m)
+        Session.commit()
+
+        u = User.get_by_email(email='main_email@rhodecode.org')
+        self.assertEqual(usr.user_id, u.user_id)
+        self.assertEqual(usr.username, u.username)
+
+        u = User.get_by_email(email='main_email2@rhodecode.org')
+        self.assertEqual(usr.user_id, u.user_id)
+        self.assertEqual(usr.username, u.username)
+        u = User.get_by_email(email='main_email3@rhodecode.org')
+        self.assertEqual(None, u)
+
+        UserModel().delete(usr.user_id)
+        Session.commit()
+
 
 class TestNotifications(unittest.TestCase):