Code

c9226d22eb945e1b99fb0f844c04aa75ce4d6c2b
[roundup.git] / test / test_db.py
1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17
18 # $Id: test_db.py,v 1.50 2002-09-19 02:37:41 richard Exp $ 
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23     Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28     status = module.Class(db, "status", name=String())
29     status.setkey("name")
30     user = module.Class(db, "user", username=String(), password=Password(),
31         assignable=Boolean(), age=Number(), roles=String())
32     user.setkey("username")
33     file = module.FileClass(db, "file", name=String(), type=String(),
34         comment=String(indexme="yes"))
35     issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36         status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37         foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38     session = module.Class(db, 'session', title=String())
39     session.disableJournalling()
40     db.post_init()
41     if create:
42         status.create(name="unread")
43         status.create(name="in-progress")
44         status.create(name="testing")
45         status.create(name="resolved")
46     db.commit()
48 class MyTestCase(unittest.TestCase):
49     def tearDown(self):
50         self.db.close()
51         if hasattr(self, 'db2'):
52             self.db2.close()
53         if os.path.exists('_test_dir'):
54             shutil.rmtree('_test_dir')
56 class config:
57     DATABASE='_test_dir'
58     MAILHOST = 'localhost'
59     MAIL_DOMAIN = 'fill.me.in.'
60     TRACKER_NAME = 'Roundup issue tracker'
61     TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
62     TRACKER_WEB = 'http://some.useful.url/'
63     ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
64     FILTER_POSITION = 'bottom'      # one of 'top', 'bottom', 'top and bottom'
65     ANONYMOUS_ACCESS = 'deny'       # either 'deny' or 'allow'
66     ANONYMOUS_REGISTER = 'deny'     # either 'deny' or 'allow'
67     MESSAGES_TO_AUTHOR = 'no'       # either 'yes' or 'no'
68     EMAIL_SIGNATURE_POSITION = 'bottom'
70 class anydbmDBTestCase(MyTestCase):
71     def setUp(self):
72         from roundup.backends import anydbm
73         # remove previous test, ignore errors
74         if os.path.exists(config.DATABASE):
75             shutil.rmtree(config.DATABASE)
76         os.makedirs(config.DATABASE + '/files')
77         self.db = anydbm.Database(config, 'test')
78         setupSchema(self.db, 1, anydbm)
79         self.db2 = anydbm.Database(config, 'test')
80         setupSchema(self.db2, 0, anydbm)
82     def testStringChange(self):
83         # test set & retrieve
84         self.db.issue.create(title="spam", status='1')
85         self.assertEqual(self.db.issue.get('1', 'title'), 'spam')
87         # change and make sure we retrieve the correct value
88         self.db.issue.set('1', title='eggs')
89         self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
91         # do some commit stuff
92         self.db.commit()
93         self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
94         self.db.issue.create(title="spam", status='1')
95         self.db.commit()
96         self.assertEqual(self.db.issue.get('2', 'title'), 'spam')
97         self.db.issue.set('2', title='ham')
98         self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
99         self.db.commit()
100         self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
102         # make sure we can unset
103         self.db.issue.set('1', title=None)
104         self.assertEqual(self.db.issue.get('1', "title"), None)
106     def testLinkChange(self):
107         self.db.issue.create(title="spam", status='1')
108         self.assertEqual(self.db.issue.get('1', "status"), '1')
109         self.db.issue.set('1', status='2')
110         self.assertEqual(self.db.issue.get('1', "status"), '2')
111         self.db.issue.set('1', status=None)
112         self.assertEqual(self.db.issue.get('1', "status"), None)
114     def testMultilinkChange(self):
115         u1 = self.db.user.create(username='foo')
116         u2 = self.db.user.create(username='bar')
117         self.db.issue.create(title="spam", nosy=[u1])
118         self.assertEqual(self.db.issue.get('1', "nosy"), [u1])
119         self.db.issue.set('1', nosy=[])
120         self.assertEqual(self.db.issue.get('1', "nosy"), [])
121         self.db.issue.set('1', nosy=[u1,u2])
122         self.assertEqual(self.db.issue.get('1', "nosy"), [u1,u2])
124     def testDateChange(self):
125         self.db.issue.create(title="spam", status='1')
126         a = self.db.issue.get('1', "deadline")
127         self.db.issue.set('1', deadline=date.Date())
128         b = self.db.issue.get('1', "deadline")
129         self.db.commit()
130         self.assertNotEqual(a, b)
131         self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
132         self.db.issue.set('1', deadline=date.Date())
133         self.db.issue.set('1', deadline=None)
134         self.assertEqual(self.db.issue.get('1', "deadline"), None)
136     def testIntervalChange(self):
137         self.db.issue.create(title="spam", status='1')
138         a = self.db.issue.get('1', "foo")
139         self.db.issue.set('1', foo=date.Interval('-1d'))
140         self.assertNotEqual(self.db.issue.get('1', "foo"), a)
141         self.db.issue.set('1', foo=None)
142         self.assertEqual(self.db.issue.get('1', "foo"), None)
144     def testBooleanChange(self):
145         userid = self.db.user.create(username='foo', assignable=1)
146         self.assertEqual(1, self.db.user.get(userid, 'assignable'))
147         self.db.user.set(userid, assignable=0)
148         self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
149         self.db.user.set(userid, assignable=None)
150         self.assertEqual(self.db.user.get('1', "assignable"), None)
152     def testNumberChange(self):
153         self.db.user.create(username='foo', age=1)
154         self.assertEqual(1, self.db.user.get('1', 'age'))
155         self.db.user.set('1', age=3)
156         self.assertNotEqual(self.db.user.get('1', 'age'), 1)
157         self.db.user.set('1', age=1.0)
158         self.db.user.set('1', age=None)
159         self.assertEqual(self.db.user.get('1', "age"), None)
161     def testNewProperty(self):
162         self.db.issue.create(title="spam", status='1')
163         self.db.issue.addprop(fixer=Link("user"))
164         # force any post-init stuff to happen
165         self.db.post_init()
166         props = self.db.issue.getprops()
167         keys = props.keys()
168         keys.sort()
169         self.assertEqual(keys, ['activity', 'assignedto', 'creation',
170             'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
171             'nosy', 'status', 'superseder', 'title'])
172         self.assertEqual(self.db.issue.get('1', "fixer"), None)
174     def testRetire(self):
175         self.db.issue.create(title="spam", status='1')
176         b = self.db.status.get('1', 'name')
177         a = self.db.status.list()
178         self.db.status.retire('1')
179         # make sure the list is different 
180         self.assertNotEqual(a, self.db.status.list())
181         # can still access the node if necessary
182         self.assertEqual(self.db.status.get('1', 'name'), b)
183         self.db.commit()
184         self.assertEqual(self.db.status.get('1', 'name'), b)
185         self.assertNotEqual(a, self.db.status.list())
187     def testSerialisation(self):
188         self.db.issue.create(title="spam", status='1',
189             deadline=date.Date(), foo=date.Interval('-1d'))
190         self.db.commit()
191         assert isinstance(self.db.issue.get('1', 'deadline'), date.Date)
192         assert isinstance(self.db.issue.get('1', 'foo'), date.Interval)
193         self.db.user.create(username="fozzy",
194             password=password.Password('t. bear'))
195         self.db.commit()
196         assert isinstance(self.db.user.get('1', 'password'), password.Password)
198     def testTransactions(self):
199         # remember the number of items we started
200         num_issues = len(self.db.issue.list())
201         num_files = self.db.numfiles()
202         self.db.issue.create(title="don't commit me!", status='1')
203         self.assertNotEqual(num_issues, len(self.db.issue.list()))
204         self.db.rollback()
205         self.assertEqual(num_issues, len(self.db.issue.list()))
206         self.db.issue.create(title="please commit me!", status='1')
207         self.assertNotEqual(num_issues, len(self.db.issue.list()))
208         self.db.commit()
209         self.assertNotEqual(num_issues, len(self.db.issue.list()))
210         self.db.rollback()
211         self.assertNotEqual(num_issues, len(self.db.issue.list()))
212         self.db.file.create(name="test", type="text/plain", content="hi")
213         self.db.rollback()
214         self.assertEqual(num_files, self.db.numfiles())
215         for i in range(10):
216             self.db.file.create(name="test", type="text/plain", 
217                     content="hi %d"%(i))
218             self.db.commit()
219         num_files2 = self.db.numfiles()
220         self.assertNotEqual(num_files, num_files2)
221         self.db.file.create(name="test", type="text/plain", content="hi")
222         self.db.rollback()
223         self.assertNotEqual(num_files, self.db.numfiles())
224         self.assertEqual(num_files2, self.db.numfiles())
226     def testDestroyNoJournalling(self):
227         self.innerTestDestroy(klass=self.db.session)
229     def testDestroyJournalling(self):
230         self.innerTestDestroy(klass=self.db.issue)
232     def innerTestDestroy(self, klass):
233         newid = klass.create(title='Mr Friendly')
234         n = len(klass.list())
235         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
236         klass.destroy(newid)
237         self.assertRaises(IndexError, klass.get, newid, 'title')
238         self.assertNotEqual(len(klass.list()), n)
239         if klass.do_journal:
240             self.assertRaises(IndexError, klass.history, newid)
242         # now with a commit
243         newid = klass.create(title='Mr Friendly')
244         n = len(klass.list())
245         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
246         self.db.commit()
247         klass.destroy(newid)
248         self.assertRaises(IndexError, klass.get, newid, 'title')
249         self.db.commit()
250         self.assertRaises(IndexError, klass.get, newid, 'title')
251         self.assertNotEqual(len(klass.list()), n)
252         if klass.do_journal:
253             self.assertRaises(IndexError, klass.history, newid)
255         # now with a rollback
256         newid = klass.create(title='Mr Friendly')
257         n = len(klass.list())
258         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
259         self.db.commit()
260         klass.destroy(newid)
261         self.assertNotEqual(len(klass.list()), n)
262         self.assertRaises(IndexError, klass.get, newid, 'title')
263         self.db.rollback()
264         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
265         self.assertEqual(len(klass.list()), n)
266         if klass.do_journal:
267             self.assertNotEqual(klass.history(newid), [])
269     def testExceptions(self):
270         # this tests the exceptions that should be raised
271         ar = self.assertRaises
273         #
274         # class create
275         #
276         # string property
277         ar(TypeError, self.db.status.create, name=1)
278         # invalid property name
279         ar(KeyError, self.db.status.create, foo='foo')
280         # key name clash
281         ar(ValueError, self.db.status.create, name='unread')
282         # invalid link index
283         ar(IndexError, self.db.issue.create, title='foo', status='bar')
284         # invalid link value
285         ar(ValueError, self.db.issue.create, title='foo', status=1)
286         # invalid multilink type
287         ar(TypeError, self.db.issue.create, title='foo', status='1',
288             nosy='hello')
289         # invalid multilink index type
290         ar(ValueError, self.db.issue.create, title='foo', status='1',
291             nosy=[1])
292         # invalid multilink index
293         ar(IndexError, self.db.issue.create, title='foo', status='1',
294             nosy=['10'])
296         #
297         # key property
298         # 
299         # key must be a String
300         ar(TypeError, self.db.user.setkey, 'password')
301         # key must exist
302         ar(KeyError, self.db.user.setkey, 'fubar')
304         #
305         # class get
306         #
307         # invalid node id
308         ar(IndexError, self.db.issue.get, '1', 'title')
309         # invalid property name
310         ar(KeyError, self.db.status.get, '2', 'foo')
312         #
313         # class set
314         #
315         # invalid node id
316         ar(IndexError, self.db.issue.set, '1', title='foo')
317         # invalid property name
318         ar(KeyError, self.db.status.set, '1', foo='foo')
319         # string property
320         ar(TypeError, self.db.status.set, '1', name=1)
321         # key name clash
322         ar(ValueError, self.db.status.set, '2', name='unread')
323         # set up a valid issue for me to work on
324         self.db.issue.create(title="spam", status='1')
325         # invalid link index
326         ar(IndexError, self.db.issue.set, '6', title='foo', status='bar')
327         # invalid link value
328         ar(ValueError, self.db.issue.set, '6', title='foo', status=1)
329         # invalid multilink type
330         ar(TypeError, self.db.issue.set, '6', title='foo', status='1',
331             nosy='hello')
332         # invalid multilink index type
333         ar(ValueError, self.db.issue.set, '6', title='foo', status='1',
334             nosy=[1])
335         # invalid multilink index
336         ar(IndexError, self.db.issue.set, '6', title='foo', status='1',
337             nosy=['10'])
338         # invalid number value
339         ar(TypeError, self.db.user.create, username='foo', age='a')
340         # invalid boolean value
341         ar(TypeError, self.db.user.create, username='foo', assignable='true')
342         self.db.user.create(username='foo')
343         # invalid number value
344         ar(TypeError, self.db.user.set, '3', username='foo', age='a')
345         # invalid boolean value
346         ar(TypeError, self.db.user.set, '3', username='foo', assignable='true')
348     def testJournals(self):
349         self.db.user.create(username="mary")
350         self.db.user.create(username="pete")
351         self.db.issue.create(title="spam", status='1')
352         self.db.commit()
354         # journal entry for issue create
355         journal = self.db.getjournal('issue', '1')
356         self.assertEqual(1, len(journal))
357         (nodeid, date_stamp, journaltag, action, params) = journal[0]
358         self.assertEqual(nodeid, '1')
359         self.assertEqual(journaltag, 'test')
360         self.assertEqual(action, 'create')
361         keys = params.keys()
362         keys.sort()
363         self.assertEqual(keys, ['assignedto', 'deadline', 'files',
364             'foo', 'messages', 'nosy', 'status', 'superseder', 'title'])
365         self.assertEqual(None,params['deadline'])
366         self.assertEqual(None,params['foo'])
367         self.assertEqual([],params['nosy'])
368         self.assertEqual('1',params['status'])
369         self.assertEqual('spam',params['title'])
371         # journal entry for link
372         journal = self.db.getjournal('user', '1')
373         self.assertEqual(1, len(journal))
374         self.db.issue.set('1', assignedto='1')
375         self.db.commit()
376         journal = self.db.getjournal('user', '1')
377         self.assertEqual(2, len(journal))
378         (nodeid, date_stamp, journaltag, action, params) = journal[1]
379         self.assertEqual('1', nodeid)
380         self.assertEqual('test', journaltag)
381         self.assertEqual('link', action)
382         self.assertEqual(('issue', '1', 'assignedto'), params)
384         # journal entry for unlink
385         self.db.issue.set('1', assignedto='2')
386         self.db.commit()
387         journal = self.db.getjournal('user', '1')
388         self.assertEqual(3, len(journal))
389         (nodeid, date_stamp, journaltag, action, params) = journal[2]
390         self.assertEqual('1', nodeid)
391         self.assertEqual('test', journaltag)
392         self.assertEqual('unlink', action)
393         self.assertEqual(('issue', '1', 'assignedto'), params)
395         # test disabling journalling
396         # ... get the last entry
397         time.sleep(1)
398         entry = self.db.getjournal('issue', '1')[-1]
399         (x, date_stamp, x, x, x) = entry
400         self.db.issue.disableJournalling()
401         self.db.issue.set('1', title='hello world')
402         self.db.commit()
403         entry = self.db.getjournal('issue', '1')[-1]
404         (x, date_stamp2, x, x, x) = entry
405         # see if the change was journalled when it shouldn't have been
406         self.assertEqual(date_stamp, date_stamp2)
407         self.db.issue.enableJournalling()
408         self.db.issue.set('1', title='hello world 2')
409         self.db.commit()
410         entry = self.db.getjournal('issue', '1')[-1]
411         (x, date_stamp2, x, x, x) = entry
412         # see if the change was journalled
413         self.assertNotEqual(date_stamp, date_stamp2)
415     def testPack(self):
416         self.db.issue.create(title="spam", status='1')
417         self.db.commit()
418         self.db.issue.set('1', status='2')
419         self.db.commit()
421         # sleep for at least a second, then get a date to pack at
422         time.sleep(1)
423         pack_before = date.Date('.')
425         # one more entry
426         self.db.issue.set('1', status='3')
427         self.db.commit()
429         # pack
430         self.db.pack(pack_before)
431         journal = self.db.getjournal('issue', '1')
433         # we should have the create and last set entries now
434         self.assertEqual(2, len(journal))
436     def testIDGeneration(self):
437         id1 = self.db.issue.create(title="spam", status='1')
438         id2 = self.db2.issue.create(title="eggs", status='2')
439         self.assertNotEqual(id1, id2)
441     def testSearching(self):
442         self.db.file.create(content='hello', type="text/plain")
443         self.db.file.create(content='world', type="text/frozz",
444             comment='blah blah')
445         self.db.issue.create(files=['1', '2'], title="flebble plop")
446         self.db.issue.create(title="flebble frooz")
447         self.db.commit()
448         self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
449             {'1': {'files': ['1']}})
450         self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
451         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
452             {'2': {}})
453         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
454             {'2': {}, '1': {}})
456     def testReindexing(self):
457         self.db.issue.create(title="frooz")
458         self.db.commit()
459         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
460             {'1': {}})
461         self.db.issue.set('1', title="dooble")
462         self.db.commit()
463         self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
464             {'1': {}})
465         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
467     def testForcedReindexing(self):
468         self.db.issue.create(title="flebble frooz")
469         self.db.commit()
470         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
471             {'1': {}})
472         self.db.indexer.quiet = 1
473         self.db.indexer.force_reindex()
474         self.db.post_init()
475         self.db.indexer.quiet = 9
476         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
477             {'1': {}})
479     def filteringSetup(self):
480         for user in (
481                 {'username': 'bleep'},
482                 {'username': 'blop'},
483                 {'username': 'blorp'}):
484             self.db.user.create(**user)
485         iss = self.db.issue
486         for issue in (
487                 {'title': 'issue one', 'status': '2'},
488                 {'title': 'issue two', 'status': '1'},
489                 {'title': 'issue three', 'status': '1', 'nosy': ['1','2']}):
490             self.db.issue.create(**issue)
491         self.db.commit()
492         return self.assertEqual, self.db.issue.filter
494     def testFilteringString(self):
495         ae, filt = self.filteringSetup()
496         ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
497         ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
498             ['1','2','3'])
500     def testFilteringLink(self):
501         ae, filt = self.filteringSetup()
502         ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
504     def testFilteringMultilink(self):
505         ae, filt = self.filteringSetup()
506         ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
508     def testFilteringMany(self):
509         ae, filt = self.filteringSetup()
510         ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
511             ['3'])
513 class anydbmReadOnlyDBTestCase(MyTestCase):
514     def setUp(self):
515         from roundup.backends import anydbm
516         # remove previous test, ignore errors
517         if os.path.exists(config.DATABASE):
518             shutil.rmtree(config.DATABASE)
519         os.makedirs(config.DATABASE + '/files')
520         db = anydbm.Database(config, 'test')
521         setupSchema(db, 1, anydbm)
522         self.db = anydbm.Database(config)
523         setupSchema(self.db, 0, anydbm)
524         self.db2 = anydbm.Database(config, 'test')
525         setupSchema(self.db2, 0, anydbm)
527     def testExceptions(self):
528         # this tests the exceptions that should be raised
529         ar = self.assertRaises
531         # this tests the exceptions that should be raised
532         ar(DatabaseError, self.db.status.create, name="foo")
533         ar(DatabaseError, self.db.status.set, '1', name="foo")
534         ar(DatabaseError, self.db.status.retire, '1')
537 class bsddbDBTestCase(anydbmDBTestCase):
538     def setUp(self):
539         from roundup.backends import bsddb
540         # remove previous test, ignore errors
541         if os.path.exists(config.DATABASE):
542             shutil.rmtree(config.DATABASE)
543         os.makedirs(config.DATABASE + '/files')
544         self.db = bsddb.Database(config, 'test')
545         setupSchema(self.db, 1, bsddb)
546         self.db2 = bsddb.Database(config, 'test')
547         setupSchema(self.db2, 0, bsddb)
549 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
550     def setUp(self):
551         from roundup.backends import bsddb
552         # remove previous test, ignore errors
553         if os.path.exists(config.DATABASE):
554             shutil.rmtree(config.DATABASE)
555         os.makedirs(config.DATABASE + '/files')
556         db = bsddb.Database(config, 'test')
557         setupSchema(db, 1, bsddb)
558         self.db = bsddb.Database(config)
559         setupSchema(self.db, 0, bsddb)
560         self.db2 = bsddb.Database(config, 'test')
561         setupSchema(self.db2, 0, bsddb)
564 class bsddb3DBTestCase(anydbmDBTestCase):
565     def setUp(self):
566         from roundup.backends import bsddb3
567         # remove previous test, ignore errors
568         if os.path.exists(config.DATABASE):
569             shutil.rmtree(config.DATABASE)
570         os.makedirs(config.DATABASE + '/files')
571         self.db = bsddb3.Database(config, 'test')
572         setupSchema(self.db, 1, bsddb3)
573         self.db2 = bsddb3.Database(config, 'test')
574         setupSchema(self.db2, 0, bsddb3)
576 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
577     def setUp(self):
578         from roundup.backends import bsddb3
579         # remove previous test, ignore errors
580         if os.path.exists(config.DATABASE):
581             shutil.rmtree(config.DATABASE)
582         os.makedirs(config.DATABASE + '/files')
583         db = bsddb3.Database(config, 'test')
584         setupSchema(db, 1, bsddb3)
585         self.db = bsddb3.Database(config)
586         setupSchema(self.db, 0, bsddb3)
587         self.db2 = bsddb3.Database(config, 'test')
588         setupSchema(self.db2, 0, bsddb3)
591 class gadflyDBTestCase(anydbmDBTestCase):
592     ''' Gadfly doesn't support multiple connections to the one local
593         database
594     '''
595     def setUp(self):
596         from roundup.backends import gadfly
597         # remove previous test, ignore errors
598         if os.path.exists(config.DATABASE):
599             shutil.rmtree(config.DATABASE)
600         config.GADFLY_DATABASE = ('test', config.DATABASE)
601         os.makedirs(config.DATABASE + '/files')
602         self.db = gadfly.Database(config, 'test')
603         setupSchema(self.db, 1, gadfly)
605     def testIDGeneration(self):
606         id1 = self.db.issue.create(title="spam", status='1')
607         id2 = self.db.issue.create(title="eggs", status='2')
608         self.assertNotEqual(id1, id2)
610 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
611     def setUp(self):
612         from roundup.backends import gadfly
613         # remove previous test, ignore errors
614         if os.path.exists(config.DATABASE):
615             shutil.rmtree(config.DATABASE)
616         config.GADFLY_DATABASE = ('test', config.DATABASE)
617         os.makedirs(config.DATABASE + '/files')
618         db = gadfly.Database(config, 'test')
619         setupSchema(db, 1, gadfly)
620         self.db = gadfly.Database(config)
621         setupSchema(self.db, 0, gadfly)
624 class sqliteDBTestCase(anydbmDBTestCase):
625     def setUp(self):
626         from roundup.backends import sqlite
627         # remove previous test, ignore errors
628         if os.path.exists(config.DATABASE):
629             shutil.rmtree(config.DATABASE)
630         os.makedirs(config.DATABASE + '/files')
631         self.db = sqlite.Database(config, 'test')
632         setupSchema(self.db, 1, sqlite)
634     def testIDGeneration(self):
635         pass
637 class sqliteReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
638     def setUp(self):
639         from roundup.backends import sqlite
640         # remove previous test, ignore errors
641         if os.path.exists(config.DATABASE):
642             shutil.rmtree(config.DATABASE)
643         os.makedirs(config.DATABASE + '/files')
644         db = sqlite.Database(config, 'test')
645         setupSchema(db, 1, sqlite)
646         self.db = sqlite.Database(config)
647         setupSchema(self.db, 0, sqlite)
650 class metakitDBTestCase(anydbmDBTestCase):
651     def setUp(self):
652         from roundup.backends import metakit
653         import weakref
654         metakit._instances = weakref.WeakValueDictionary()
655         # remove previous test, ignore errors
656         if os.path.exists(config.DATABASE):
657             shutil.rmtree(config.DATABASE)
658         os.makedirs(config.DATABASE + '/files')
659         self.db = metakit.Database(config, 'test')
660         setupSchema(self.db, 1, metakit)
661         #self.db2 = metakit.Database(config, 'test')
662         #setupSchema(self.db2, 0, metakit)
664     def testIDGeneration(self):
665         id1 = self.db.issue.create(title="spam", status='1')
666         id2 = self.db.issue.create(title="eggs", status='2')
667         self.assertNotEqual(id1, id2)
669     def testTransactions(self):
670         # remember the number of items we started
671         num_issues = len(self.db.issue.list())
672         self.db.issue.create(title="don't commit me!", status='1')
673         self.assertNotEqual(num_issues, len(self.db.issue.list()))
674         self.db.rollback()
675         self.assertEqual(num_issues, len(self.db.issue.list()))
676         self.db.issue.create(title="please commit me!", status='1')
677         self.assertNotEqual(num_issues, len(self.db.issue.list()))
678         self.db.commit()
679         self.assertNotEqual(num_issues, len(self.db.issue.list()))
680         self.db.rollback()
681         self.assertNotEqual(num_issues, len(self.db.issue.list()))
682         self.db.file.create(name="test", type="text/plain", content="hi")
683         self.db.rollback()
684         for i in range(10):
685             self.db.file.create(name="test", type="text/plain", 
686                     content="hi %d"%(i))
687             self.db.commit()
688         # TODO: would be good to be able to ensure the file is not on disk after
689         # a rollback...
690         self.assertNotEqual(num_files, num_files2)
691         self.db.file.create(name="test", type="text/plain", content="hi")
692         self.db.rollback()
694 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
695     def setUp(self):
696         from roundup.backends import metakit
697         import weakref
698         metakit._instances = weakref.WeakValueDictionary()
699         # remove previous test, ignore errors
700         if os.path.exists(config.DATABASE):
701             shutil.rmtree(config.DATABASE)
702         os.makedirs(config.DATABASE + '/files')
703         db = metakit.Database(config, 'test')
704         setupSchema(db, 1, metakit)
705         self.db = metakit.Database(config)
706         setupSchema(self.db, 0, metakit)
707 #        self.db2 = metakit.Database(config, 'test')
708 #        setupSchema(self.db2, 0, metakit)
710 def suite():
711     l = [
712          unittest.makeSuite(anydbmDBTestCase, 'test'),
713          unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
714     ]
715     #return unittest.TestSuite(l)
717     try:
718         import sqlite
719         l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
720         l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
721     except:
722         print 'sqlite module not found, skipping gadfly DBTestCase'
724     try:
725         import gadfly
726         l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
727         l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
728     except:
729         print 'gadfly module not found, skipping gadfly DBTestCase'
731     try:
732         import bsddb
733         l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
734         l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
735     except:
736         print 'bsddb module not found, skipping bsddb DBTestCase'
738     try:
739         import bsddb3
740         l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
741         l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
742     except:
743         print 'bsddb3 module not found, skipping bsddb3 DBTestCase'
745     try:
746         import metakit
747         l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
748         l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
749     except:
750         print 'metakit module not found, skipping metakit DBTestCase'
752     return unittest.TestSuite(l)
754 # vim: set filetype=python ts=4 sw=4 et si