Code

ed5d718e9b201b11f31ad589c0562c912a90f667
[roundup.git] / test / test_db.py
1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17
18 # $Id: test_db.py,v 1.46 2002-09-13 08:20:13 richard Exp $ 
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23     Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28     status = module.Class(db, "status", name=String())
29     status.setkey("name")
30     user = module.Class(db, "user", username=String(), password=Password(),
31         assignable=Boolean(), age=Number(), roles=String())
32     user.setkey("username")
33     file = module.FileClass(db, "file", name=String(), type=String(),
34         comment=String(indexme="yes"))
35     issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36         status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37         foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38     session = module.Class(db, 'session', title=String())
39     session.disableJournalling()
40     db.post_init()
41     if create:
42         status.create(name="unread")
43         status.create(name="in-progress")
44         status.create(name="testing")
45         status.create(name="resolved")
46     db.commit()
48 class MyTestCase(unittest.TestCase):
49     def tearDown(self):
50         self.db.close()
51         if hasattr(self, 'db2'):
52             self.db2.close()
53         if os.path.exists('_test_dir'):
54             shutil.rmtree('_test_dir')
56 class config:
57     DATABASE='_test_dir'
58     MAILHOST = 'localhost'
59     MAIL_DOMAIN = 'fill.me.in.'
60     TRACKER_NAME = 'Roundup issue tracker'
61     TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
62     TRACKER_WEB = 'http://some.useful.url/'
63     ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
64     FILTER_POSITION = 'bottom'      # one of 'top', 'bottom', 'top and bottom'
65     ANONYMOUS_ACCESS = 'deny'       # either 'deny' or 'allow'
66     ANONYMOUS_REGISTER = 'deny'     # either 'deny' or 'allow'
67     MESSAGES_TO_AUTHOR = 'no'       # either 'yes' or 'no'
68     EMAIL_SIGNATURE_POSITION = 'bottom'
70 class anydbmDBTestCase(MyTestCase):
71     def setUp(self):
72         from roundup.backends import anydbm
73         # remove previous test, ignore errors
74         if os.path.exists(config.DATABASE):
75             shutil.rmtree(config.DATABASE)
76         os.makedirs(config.DATABASE + '/files')
77         self.db = anydbm.Database(config, 'test')
78         setupSchema(self.db, 1, anydbm)
79         self.db2 = anydbm.Database(config, 'test')
80         setupSchema(self.db2, 0, anydbm)
82     def testStringChange(self):
83         # test set & retrieve
84         self.db.issue.create(title="spam", status='1')
85         self.assertEqual(self.db.issue.get('1', 'title'), 'spam')
87         # change and make sure we retrieve the correct value
88         self.db.issue.set('1', title='eggs')
89         self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
91         # do some commit stuff
92         self.db.commit()
93         self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
94         self.db.issue.create(title="spam", status='1')
95         self.db.commit()
96         self.assertEqual(self.db.issue.get('2', 'title'), 'spam')
97         self.db.issue.set('2', title='ham')
98         self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
99         self.db.commit()
100         self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
102         # make sure we can unset
103         self.db.issue.set('1', title=None)
104         self.assertEqual(self.db.issue.get('1', "title"), None)
106     def testLinkChange(self):
107         self.db.issue.create(title="spam", status='1')
108         self.assertEqual(self.db.issue.get('1', "status"), '1')
109         self.db.issue.set('1', status='2')
110         self.assertEqual(self.db.issue.get('1', "status"), '2')
111         self.db.issue.set('1', status=None)
112         self.assertEqual(self.db.issue.get('1', "status"), None)
114     def testMultilinkChange(self):
115         u1 = self.db.user.create(username='foo')
116         u2 = self.db.user.create(username='bar')
117         self.db.issue.create(title="spam", nosy=[u1])
118         self.assertEqual(self.db.issue.get('1', "nosy"), [u1])
119         self.db.issue.set('1', nosy=[])
120         self.assertEqual(self.db.issue.get('1', "nosy"), [])
121         self.db.issue.set('1', nosy=[u1,u2])
122         self.assertEqual(self.db.issue.get('1', "nosy"), [u1,u2])
124     def testDateChange(self):
125         self.db.issue.create(title="spam", status='1')
126         a = self.db.issue.get('1', "deadline")
127         self.db.issue.set('1', deadline=date.Date())
128         b = self.db.issue.get('1', "deadline")
129         self.db.commit()
130         self.assertNotEqual(a, b)
131         self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
132         self.db.issue.set('1', deadline=date.Date())
133         self.db.issue.set('1', deadline=None)
134         self.assertEqual(self.db.issue.get('1', "deadline"), None)
136     def testIntervalChange(self):
137         self.db.issue.create(title="spam", status='1')
138         a = self.db.issue.get('1', "foo")
139         self.db.issue.set('1', foo=date.Interval('-1d'))
140         self.assertNotEqual(self.db.issue.get('1', "foo"), a)
141         self.db.issue.set('1', foo=None)
142         self.assertEqual(self.db.issue.get('1', "foo"), None)
144     def testBooleanChange(self):
145         userid = self.db.user.create(username='foo', assignable=1)
146         self.db.user.create(username='foo2', assignable=0)
147         a = self.db.user.get(userid, 'assignable')
148         self.db.user.set(userid, assignable=0)
149         self.assertNotEqual(self.db.user.get(userid, 'assignable'), a)
150         self.db.user.set(userid, assignable=0)
151         self.db.user.set(userid, assignable=1)
152         self.db.user.set('1', assignable=None)
153         self.assertEqual(self.db.user.get('1', "assignable"), None)
155     def testNumberChange(self):
156         self.db.user.create(username='foo', age='1')
157         a = self.db.user.get('1', 'age')
158         self.db.user.set('1', age='3')
159         self.assertNotEqual(self.db.user.get('1', 'age'), a)
160         self.db.user.set('1', age='1.0')
161         self.db.user.set('1', age=None)
162         self.assertEqual(self.db.user.get('1', "age"), None)
164     def testNewProperty(self):
165         self.db.issue.create(title="spam", status='1')
166         self.db.issue.addprop(fixer=Link("user"))
167         # force any post-init stuff to happen
168         self.db.post_init()
169         props = self.db.issue.getprops()
170         keys = props.keys()
171         keys.sort()
172         self.assertEqual(keys, ['activity', 'assignedto', 'creation',
173             'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
174             'nosy', 'status', 'superseder', 'title'])
175         self.assertEqual(self.db.issue.get('1', "fixer"), None)
177     def testRetire(self):
178         self.db.issue.create(title="spam", status='1')
179         b = self.db.status.get('1', 'name')
180         a = self.db.status.list()
181         self.db.status.retire('1')
182         # make sure the list is different 
183         self.assertNotEqual(a, self.db.status.list())
184         # can still access the node if necessary
185         self.assertEqual(self.db.status.get('1', 'name'), b)
186         self.db.commit()
187         self.assertEqual(self.db.status.get('1', 'name'), b)
188         self.assertNotEqual(a, self.db.status.list())
190     def testSerialisation(self):
191         self.db.issue.create(title="spam", status='1',
192             deadline=date.Date(), foo=date.Interval('-1d'))
193         self.db.commit()
194         assert isinstance(self.db.issue.get('1', 'deadline'), date.Date)
195         assert isinstance(self.db.issue.get('1', 'foo'), date.Interval)
196         self.db.user.create(username="fozzy",
197             password=password.Password('t. bear'))
198         self.db.commit()
199         assert isinstance(self.db.user.get('1', 'password'), password.Password)
201     def testTransactions(self):
202         # remember the number of items we started
203         num_issues = len(self.db.issue.list())
204         num_files = self.db.numfiles()
205         self.db.issue.create(title="don't commit me!", status='1')
206         self.assertNotEqual(num_issues, len(self.db.issue.list()))
207         self.db.rollback()
208         self.assertEqual(num_issues, len(self.db.issue.list()))
209         self.db.issue.create(title="please commit me!", status='1')
210         self.assertNotEqual(num_issues, len(self.db.issue.list()))
211         self.db.commit()
212         self.assertNotEqual(num_issues, len(self.db.issue.list()))
213         self.db.rollback()
214         self.assertNotEqual(num_issues, len(self.db.issue.list()))
215         self.db.file.create(name="test", type="text/plain", content="hi")
216         self.db.rollback()
217         self.assertEqual(num_files, self.db.numfiles())
218         for i in range(10):
219             self.db.file.create(name="test", type="text/plain", 
220                     content="hi %d"%(i))
221             self.db.commit()
222         num_files2 = self.db.numfiles()
223         self.assertNotEqual(num_files, num_files2)
224         self.db.file.create(name="test", type="text/plain", content="hi")
225         self.db.rollback()
226         self.assertNotEqual(num_files, self.db.numfiles())
227         self.assertEqual(num_files2, self.db.numfiles())
229     def testDestroyNoJournalling(self):
230         self.innerTestDestroy(klass=self.db.session)
232     def testDestroyJournalling(self):
233         self.innerTestDestroy(klass=self.db.issue)
235     def innerTestDestroy(self, klass):
236         newid = klass.create(title='Mr Friendly')
237         n = len(klass.list())
238         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
239         klass.destroy(newid)
240         self.assertRaises(IndexError, klass.get, newid, 'title')
241         self.assertNotEqual(len(klass.list()), n)
242         if klass.do_journal:
243             self.assertRaises(IndexError, klass.history, newid)
245         # now with a commit
246         newid = klass.create(title='Mr Friendly')
247         n = len(klass.list())
248         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
249         self.db.commit()
250         klass.destroy(newid)
251         self.assertRaises(IndexError, klass.get, newid, 'title')
252         self.db.commit()
253         self.assertRaises(IndexError, klass.get, newid, 'title')
254         self.assertNotEqual(len(klass.list()), n)
255         if klass.do_journal:
256             self.assertRaises(IndexError, klass.history, newid)
258         # now with a rollback
259         newid = klass.create(title='Mr Friendly')
260         n = len(klass.list())
261         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
262         self.db.commit()
263         klass.destroy(newid)
264         self.assertNotEqual(len(klass.list()), n)
265         self.assertRaises(IndexError, klass.get, newid, 'title')
266         self.db.rollback()
267         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
268         self.assertEqual(len(klass.list()), n)
269         if klass.do_journal:
270             self.assertNotEqual(klass.history(newid), [])
272     def testExceptions(self):
273         # this tests the exceptions that should be raised
274         ar = self.assertRaises
276         #
277         # class create
278         #
279         # string property
280         ar(TypeError, self.db.status.create, name=1)
281         # invalid property name
282         ar(KeyError, self.db.status.create, foo='foo')
283         # key name clash
284         ar(ValueError, self.db.status.create, name='unread')
285         # invalid link index
286         ar(IndexError, self.db.issue.create, title='foo', status='bar')
287         # invalid link value
288         ar(ValueError, self.db.issue.create, title='foo', status=1)
289         # invalid multilink type
290         ar(TypeError, self.db.issue.create, title='foo', status='1',
291             nosy='hello')
292         # invalid multilink index type
293         ar(ValueError, self.db.issue.create, title='foo', status='1',
294             nosy=[1])
295         # invalid multilink index
296         ar(IndexError, self.db.issue.create, title='foo', status='1',
297             nosy=['10'])
299         #
300         # key property
301         # 
302         # key must be a String
303         ar(TypeError, self.db.user.setkey, 'password')
304         # key must exist
305         ar(KeyError, self.db.user.setkey, 'fubar')
307         #
308         # class get
309         #
310         # invalid node id
311         ar(IndexError, self.db.issue.get, '1', 'title')
312         # invalid property name
313         ar(KeyError, self.db.status.get, '2', 'foo')
315         #
316         # class set
317         #
318         # invalid node id
319         ar(IndexError, self.db.issue.set, '1', title='foo')
320         # invalid property name
321         ar(KeyError, self.db.status.set, '1', foo='foo')
322         # string property
323         ar(TypeError, self.db.status.set, '1', name=1)
324         # key name clash
325         ar(ValueError, self.db.status.set, '2', name='unread')
326         # set up a valid issue for me to work on
327         self.db.issue.create(title="spam", status='1')
328         # invalid link index
329         ar(IndexError, self.db.issue.set, '6', title='foo', status='bar')
330         # invalid link value
331         ar(ValueError, self.db.issue.set, '6', title='foo', status=1)
332         # invalid multilink type
333         ar(TypeError, self.db.issue.set, '6', title='foo', status='1',
334             nosy='hello')
335         # invalid multilink index type
336         ar(ValueError, self.db.issue.set, '6', title='foo', status='1',
337             nosy=[1])
338         # invalid multilink index
339         ar(IndexError, self.db.issue.set, '6', title='foo', status='1',
340             nosy=['10'])
341         # invalid number value
342         ar(TypeError, self.db.user.create, username='foo', age='a')
343         # invalid boolean value
344         ar(TypeError, self.db.user.create, username='foo', assignable='true')
345         self.db.user.create(username='foo')
346         # invalid number value
347         ar(TypeError, self.db.user.set, '3', username='foo', age='a')
348         # invalid boolean value
349         ar(TypeError, self.db.user.set, '3', username='foo', assignable='true')
351     def testJournals(self):
352         self.db.user.create(username="mary")
353         self.db.user.create(username="pete")
354         self.db.issue.create(title="spam", status='1')
355         self.db.commit()
357         # journal entry for issue create
358         journal = self.db.getjournal('issue', '1')
359         self.assertEqual(1, len(journal))
360         (nodeid, date_stamp, journaltag, action, params) = journal[0]
361         self.assertEqual(nodeid, '1')
362         self.assertEqual(journaltag, 'test')
363         self.assertEqual(action, 'create')
364         keys = params.keys()
365         keys.sort()
366         self.assertEqual(keys, ['assignedto', 'deadline', 'files',
367             'foo', 'messages', 'nosy', 'status', 'superseder', 'title'])
368         self.assertEqual(None,params['deadline'])
369         self.assertEqual(None,params['foo'])
370         self.assertEqual([],params['nosy'])
371         self.assertEqual('1',params['status'])
372         self.assertEqual('spam',params['title'])
374         # journal entry for link
375         journal = self.db.getjournal('user', '1')
376         self.assertEqual(1, len(journal))
377         self.db.issue.set('1', assignedto='1')
378         self.db.commit()
379         journal = self.db.getjournal('user', '1')
380         self.assertEqual(2, len(journal))
381         (nodeid, date_stamp, journaltag, action, params) = journal[1]
382         self.assertEqual('1', nodeid)
383         self.assertEqual('test', journaltag)
384         self.assertEqual('link', action)
385         self.assertEqual(('issue', '1', 'assignedto'), params)
387         # journal entry for unlink
388         self.db.issue.set('1', assignedto='2')
389         self.db.commit()
390         journal = self.db.getjournal('user', '1')
391         self.assertEqual(3, len(journal))
392         (nodeid, date_stamp, journaltag, action, params) = journal[2]
393         self.assertEqual('1', nodeid)
394         self.assertEqual('test', journaltag)
395         self.assertEqual('unlink', action)
396         self.assertEqual(('issue', '1', 'assignedto'), params)
398         # test disabling journalling
399         # ... get the last entry
400         time.sleep(1)
401         entry = self.db.getjournal('issue', '1')[-1]
402         (x, date_stamp, x, x, x) = entry
403         self.db.issue.disableJournalling()
404         self.db.issue.set('1', title='hello world')
405         self.db.commit()
406         entry = self.db.getjournal('issue', '1')[-1]
407         (x, date_stamp2, x, x, x) = entry
408         # see if the change was journalled when it shouldn't have been
409         self.assertEqual(date_stamp, date_stamp2)
410         self.db.issue.enableJournalling()
411         self.db.issue.set('1', title='hello world 2')
412         self.db.commit()
413         entry = self.db.getjournal('issue', '1')[-1]
414         (x, date_stamp2, x, x, x) = entry
415         # see if the change was journalled
416         self.assertNotEqual(date_stamp, date_stamp2)
418     def testPack(self):
419         self.db.issue.create(title="spam", status='1')
420         self.db.commit()
421         self.db.issue.set('1', status='2')
422         self.db.commit()
424         # sleep for at least a second, then get a date to pack at
425         time.sleep(1)
426         pack_before = date.Date('.')
428         # one more entry
429         self.db.issue.set('1', status='3')
430         self.db.commit()
432         # pack
433         self.db.pack(pack_before)
434         journal = self.db.getjournal('issue', '1')
436         # we should have the create and last set entries now
437         self.assertEqual(2, len(journal))
439     def testIDGeneration(self):
440         id1 = self.db.issue.create(title="spam", status='1')
441         id2 = self.db2.issue.create(title="eggs", status='2')
442         self.assertNotEqual(id1, id2)
444     def testSearching(self):
445         self.db.file.create(content='hello', type="text/plain")
446         self.db.file.create(content='world', type="text/frozz",
447             comment='blah blah')
448         self.db.issue.create(files=['1', '2'], title="flebble plop")
449         self.db.issue.create(title="flebble frooz")
450         self.db.commit()
451         self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
452             {'1': {'files': ['1']}})
453         self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
454         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
455             {'2': {}})
456         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
457             {'2': {}, '1': {}})
459     def testReindexing(self):
460         self.db.issue.create(title="frooz")
461         self.db.commit()
462         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
463             {'1': {}})
464         self.db.issue.set('1', title="dooble")
465         self.db.commit()
466         self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
467             {'1': {}})
468         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
470     def testForcedReindexing(self):
471         self.db.issue.create(title="flebble frooz")
472         self.db.commit()
473         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
474             {'1': {}})
475         self.db.indexer.quiet = 1
476         self.db.indexer.force_reindex()
477         self.db.post_init()
478         self.db.indexer.quiet = 9
479         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
480             {'1': {}})
482 class anydbmReadOnlyDBTestCase(MyTestCase):
483     def setUp(self):
484         from roundup.backends import anydbm
485         # remove previous test, ignore errors
486         if os.path.exists(config.DATABASE):
487             shutil.rmtree(config.DATABASE)
488         os.makedirs(config.DATABASE + '/files')
489         db = anydbm.Database(config, 'test')
490         setupSchema(db, 1, anydbm)
491         self.db = anydbm.Database(config)
492         setupSchema(self.db, 0, anydbm)
493         self.db2 = anydbm.Database(config, 'test')
494         setupSchema(self.db2, 0, anydbm)
496     def testExceptions(self):
497         # this tests the exceptions that should be raised
498         ar = self.assertRaises
500         # this tests the exceptions that should be raised
501         ar(DatabaseError, self.db.status.create, name="foo")
502         ar(DatabaseError, self.db.status.set, '1', name="foo")
503         ar(DatabaseError, self.db.status.retire, '1')
506 class bsddbDBTestCase(anydbmDBTestCase):
507     def setUp(self):
508         from roundup.backends import bsddb
509         # remove previous test, ignore errors
510         if os.path.exists(config.DATABASE):
511             shutil.rmtree(config.DATABASE)
512         os.makedirs(config.DATABASE + '/files')
513         self.db = bsddb.Database(config, 'test')
514         setupSchema(self.db, 1, bsddb)
515         self.db2 = bsddb.Database(config, 'test')
516         setupSchema(self.db2, 0, bsddb)
518 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
519     def setUp(self):
520         from roundup.backends import bsddb
521         # remove previous test, ignore errors
522         if os.path.exists(config.DATABASE):
523             shutil.rmtree(config.DATABASE)
524         os.makedirs(config.DATABASE + '/files')
525         db = bsddb.Database(config, 'test')
526         setupSchema(db, 1, bsddb)
527         self.db = bsddb.Database(config)
528         setupSchema(self.db, 0, bsddb)
529         self.db2 = bsddb.Database(config, 'test')
530         setupSchema(self.db2, 0, bsddb)
533 class bsddb3DBTestCase(anydbmDBTestCase):
534     def setUp(self):
535         from roundup.backends import bsddb3
536         # remove previous test, ignore errors
537         if os.path.exists(config.DATABASE):
538             shutil.rmtree(config.DATABASE)
539         os.makedirs(config.DATABASE + '/files')
540         self.db = bsddb3.Database(config, 'test')
541         setupSchema(self.db, 1, bsddb3)
542         self.db2 = bsddb3.Database(config, 'test')
543         setupSchema(self.db2, 0, bsddb3)
545 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
546     def setUp(self):
547         from roundup.backends import bsddb3
548         # remove previous test, ignore errors
549         if os.path.exists(config.DATABASE):
550             shutil.rmtree(config.DATABASE)
551         os.makedirs(config.DATABASE + '/files')
552         db = bsddb3.Database(config, 'test')
553         setupSchema(db, 1, bsddb3)
554         self.db = bsddb3.Database(config)
555         setupSchema(self.db, 0, bsddb3)
556         self.db2 = bsddb3.Database(config, 'test')
557         setupSchema(self.db2, 0, bsddb3)
560 class gadflyDBTestCase(anydbmDBTestCase):
561     ''' Gadfly doesn't support multiple connections to the one local
562         database
563     '''
564     def setUp(self):
565         from roundup.backends import gadfly
566         # remove previous test, ignore errors
567         if os.path.exists(config.DATABASE):
568             shutil.rmtree(config.DATABASE)
569         config.GADFLY_DATABASE = ('test', config.DATABASE)
570         os.makedirs(config.DATABASE + '/files')
571         self.db = gadfly.Database(config, 'test')
572         setupSchema(self.db, 1, gadfly)
574     def testIDGeneration(self):
575         id1 = self.db.issue.create(title="spam", status='1')
576         id2 = self.db.issue.create(title="eggs", status='2')
577         self.assertNotEqual(id1, id2)
579     def testNewProperty(self):
580         # gadfly doesn't have an ALTER TABLE command :(
581         pass
583 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
584     def setUp(self):
585         from roundup.backends import gadfly
586         # remove previous test, ignore errors
587         if os.path.exists(config.DATABASE):
588             shutil.rmtree(config.DATABASE)
589         config.GADFLY_DATABASE = ('test', config.DATABASE)
590         os.makedirs(config.DATABASE + '/files')
591         db = gadfly.Database(config, 'test')
592         setupSchema(db, 1, gadfly)
593         self.db = gadfly.Database(config)
594         setupSchema(self.db, 0, gadfly)
597 class metakitDBTestCase(anydbmDBTestCase):
598     def setUp(self):
599         from roundup.backends import metakit
600         import weakref
601         metakit._instances = weakref.WeakValueDictionary()
602         # remove previous test, ignore errors
603         if os.path.exists(config.DATABASE):
604             shutil.rmtree(config.DATABASE)
605         os.makedirs(config.DATABASE + '/files')
606         self.db = metakit.Database(config, 'test')
607         setupSchema(self.db, 1, metakit)
608         #self.db2 = metakit.Database(config, 'test')
609         #setupSchema(self.db2, 0, metakit)
611     def testIDGeneration(self):
612         id1 = self.db.issue.create(title="spam", status='1')
613         id2 = self.db.issue.create(title="eggs", status='2')
614         self.assertNotEqual(id1, id2)
616     def testTransactions(self):
617         # remember the number of items we started
618         num_issues = len(self.db.issue.list())
619         self.db.issue.create(title="don't commit me!", status='1')
620         self.assertNotEqual(num_issues, len(self.db.issue.list()))
621         self.db.rollback()
622         self.assertEqual(num_issues, len(self.db.issue.list()))
623         self.db.issue.create(title="please commit me!", status='1')
624         self.assertNotEqual(num_issues, len(self.db.issue.list()))
625         self.db.commit()
626         self.assertNotEqual(num_issues, len(self.db.issue.list()))
627         self.db.rollback()
628         self.assertNotEqual(num_issues, len(self.db.issue.list()))
629         self.db.file.create(name="test", type="text/plain", content="hi")
630         self.db.rollback()
631         for i in range(10):
632             self.db.file.create(name="test", type="text/plain", 
633                     content="hi %d"%(i))
634             self.db.commit()
635         # TODO: would be good to be able to ensure the file is not on disk after
636         # a rollback...
637         self.assertNotEqual(num_files, num_files2)
638         self.db.file.create(name="test", type="text/plain", content="hi")
639         self.db.rollback()
641 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
642     def setUp(self):
643         from roundup.backends import metakit
644         import weakref
645         metakit._instances = weakref.WeakValueDictionary()
646         # remove previous test, ignore errors
647         if os.path.exists(config.DATABASE):
648             shutil.rmtree(config.DATABASE)
649         os.makedirs(config.DATABASE + '/files')
650         db = metakit.Database(config, 'test')
651         setupSchema(db, 1, metakit)
652         self.db = metakit.Database(config)
653         setupSchema(self.db, 0, metakit)
654 #        self.db2 = metakit.Database(config, 'test')
655 #        setupSchema(self.db2, 0, metakit)
657 def suite():
658     l = [
659          unittest.makeSuite(anydbmDBTestCase, 'test'),
660          unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
661     ]
662     #return unittest.TestSuite(l)
664     try:
665         import bsddb
666         l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
667         l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
668     except:
669         print 'bsddb module not found, skipping bsddb DBTestCase'
671     try:
672         import bsddb3
673         l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
674         l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
675     except:
676         print 'bsddb3 module not found, skipping bsddb3 DBTestCase'
678     try:
679         import gadfly
680         l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
681         l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
682     except:
683         print 'gadfly module not found, skipping gadfly DBTestCase'
685     try:
686         import metakit
687         l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
688         l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
689     except:
690         print 'metakit module not found, skipping metakit DBTestCase'
692     return unittest.TestSuite(l)
694 # vim: set filetype=python ts=4 sw=4 et si