Code

add in-memory hyperdb implementation to speed up testing
[roundup.git] / test / test_mailgw.py
index a3c95af95100dc1ec0638df8c894710e7295892b..e15e2928585e441f7fc3c6eb08000e3d6e056c6b 100644 (file)
@@ -26,7 +26,8 @@ from roundup.mailgw import MailGW, Unauthorized, uidFromAddress, \
 from roundup import init, instance, password, rfc2822, __version__
 from roundup.anypy.sets_ import set
 
-import db_test_base
+#import db_test_base
+import memorydb
 
 class Message(rfc822.Message):
     """String-based Message class with equivalence test."""
@@ -37,6 +38,10 @@ class Message(rfc822.Message):
         return (self.dict == other.dict and
                 self.fp.read() == other.fp.read())
 
+class Tracker(object):
+    def open(self, journaltag):
+        return self.db
+
 class DiffHelper:
     def compareMessages(self, new, old):
         """Compare messages for semantic equivalence."""
@@ -115,12 +120,14 @@ class MailgwTestCase(unittest.TestCase, DiffHelper):
     schema = 'classic'
     def setUp(self):
         MailgwTestCase.count = MailgwTestCase.count + 1
-        self.dirname = '_test_mailgw_%s'%self.count
-        # set up and open a tracker
-        self.instance = db_test_base.setupTracker(self.dirname)
 
-        # and open the database
-        self.db = self.instance.open('admin')
+        # and open the database / "instance"
+        self.db = memorydb.create('admin')
+        self.instance = Tracker()
+        self.instance.db = self.db
+        self.instance.config = self.db.config
+        self.instance.MailGW = MailGW
+
         self.chef_id = self.db.user.create(username='Chef',
             address='chef@bork.bork.bork', realname='Bork, Chef', roles='User')
         self.richard_id = self.db.user.create(username='richard',
@@ -135,22 +142,11 @@ class MailgwTestCase(unittest.TestCase, DiffHelper):
         if os.path.exists(SENDMAILDEBUG):
             os.remove(SENDMAILDEBUG)
         self.db.close()
-        try:
-            shutil.rmtree(self.dirname)
-        except OSError, error:
-            if error.errno not in (errno.ENOENT, errno.ESRCH): raise
 
     def _handle_mail(self, message):
-        # handler will open a new db handle. On single-threaded
-        # databases we'll have to close our current connection
-        self.db.commit()
-        self.db.close()
         handler = self.instance.MailGW(self.instance)
         handler.trapExceptions = 0
-        ret = handler.main(StringIO(message))
-        # handler had its own database, open new connection
-        self.db = self.instance.open('admin')
-        return ret
+        return handler.main(StringIO(message))
 
     def _get_mail(self):
         f = open(SENDMAILDEBUG)
@@ -173,6 +169,22 @@ Subject: [issue] Testing...
         assert not os.path.exists(SENDMAILDEBUG)
         self.assertEqual(self.db.issue.get(nodeid, 'title'), 'Testing...')
 
+    def testMessageWithFromInIt(self):
+        nodeid = self._handle_mail('''Content-Type: text/plain;
+  charset="iso-8859-1"
+From: Chef <chef@bork.bork.bork>
+To: issue_tracker@your.tracker.email.domain.example
+Cc: richard@test.test
+Reply-To: chef@bork.bork.bork
+Message-Id: <dummy_test_message_id>
+Subject: [issue] Testing...
+
+From here to there!
+''')
+        assert not os.path.exists(SENDMAILDEBUG)
+        msgid = self.db.issue.get(nodeid, 'msg')[0]
+        self.assertEqual(self.db.issue.get(msgid, 'content'), 'From here to there!')
+
     def doNewIssue(self):
         nodeid = self._handle_mail('''Content-Type: text/plain;
   charset="iso-8859-1"
@@ -1020,7 +1032,6 @@ Subject: [issue1] Testing... [nosy=-richard]
         assert not os.path.exists(SENDMAILDEBUG)
 
     def testNewUserAuthor(self):
-
         l = self.db.user.list()
         l.sort()
         message = '''Content-Type: text/plain;
@@ -1032,12 +1043,9 @@ Subject: [issue] Testing...
 
 This is a test submission of a new issue.
 '''
-        def hook (db, **kw):
-            ''' set up callback for db open '''
-            db.security.role['anonymous'].permissions=[]
-            anonid = db.user.lookup('anonymous')
-            db.user.set(anonid, roles='Anonymous')
-        self.instance.schema_hook = hook
+        self.db.security.role['anonymous'].permissions=[]
+        anonid = self.db.user.lookup('anonymous')
+        self.db.user.set(anonid, roles='Anonymous')
         try:
             self._handle_mail(message)
         except Unauthorized, value:
@@ -1046,23 +1054,17 @@ You are not a registered user.
 
 Unknown address: fubar@bork.bork.bork
 """)
-
             assert not body_diff, body_diff
-
         else:
             raise AssertionError, "Unathorized not raised when handling mail"
 
-
-        def hook (db, **kw):
-            ''' set up callback for db open '''
-            # Add Web Access role to anonymous, and try again to make sure
-            # we get a "please register at:" message this time.
-            p = [
-                db.security.getPermission('Register', 'user'),
-                db.security.getPermission('Web Access', None),
-            ]
-            db.security.role['anonymous'].permissions=p
-        self.instance.schema_hook = hook
+        # Add Web Access role to anonymous, and try again to make sure
+        # we get a "please register at:" message this time.
+        p = [
+            self.db.security.getPermission('Register', 'user'),
+            self.db.security.getPermission('Web Access', None),
+        ]
+        self.db.security.role['anonymous'].permissions=p
         try:
             self._handle_mail(message)
         except Unauthorized, value:
@@ -1075,9 +1077,7 @@ http://tracker.example/cgi-bin/roundup.cgi/bugs/user?template=register
 
 Unknown address: fubar@bork.bork.bork
 """)
-
             assert not body_diff, body_diff
-
         else:
             raise AssertionError, "Unathorized not raised when handling mail"
 
@@ -1086,15 +1086,12 @@ Unknown address: fubar@bork.bork.bork
         m.sort()
         self.assertEqual(l, m)
 
-        def hook (db, **kw):
-            ''' set up callback for db open '''
-            # now with the permission
-            p = [
-                db.security.getPermission('Register', 'user'),
-                db.security.getPermission('Email Access', None),
-            ]
-            db.security.role['anonymous'].permissions=p
-        self.instance.schema_hook = hook
+        # now with the permission
+        p = [
+            self.db.security.getPermission('Register', 'user'),
+            self.db.security.getPermission('Email Access', None),
+        ]
+        self.db.security.role['anonymous'].permissions=p
         self._handle_mail(message)
         m = self.db.user.list()
         m.sort()
@@ -1112,16 +1109,13 @@ Subject: [issue] Testing...
 
 This is a test submission of a new issue.
 '''
-        def hook (db, **kw):
-            ''' set up callback for db open '''
-            p = [
-                db.security.getPermission('Register', 'user'),
-                db.security.getPermission('Email Access', None),
-                db.security.getPermission('Create', 'issue'),
-                db.security.getPermission('Create', 'msg'),
-            ]
-            db.security.role['anonymous'].permissions = p
-        self.instance.schema_hook = hook
+        p = [
+            self.db.security.getPermission('Register', 'user'),
+            self.db.security.getPermission('Email Access', None),
+            self.db.security.getPermission('Create', 'issue'),
+            self.db.security.getPermission('Create', 'msg'),
+        ]
+        self.db.security.role['anonymous'].permissions = p
         self._handle_mail(message)
         m = set(self.db.user.list())
         new = list(m - l)[0]