1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17 #
18 # $Id: test_db.py,v 1.45 2002-09-12 04:21:20 richard Exp $
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23 Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28 status = module.Class(db, "status", name=String())
29 status.setkey("name")
30 user = module.Class(db, "user", username=String(), password=Password(),
31 assignable=Boolean(), age=Number(), roles=String())
32 user.setkey("username")
33 file = module.FileClass(db, "file", name=String(), type=String(),
34 comment=String(indexme="yes"))
35 issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36 status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37 foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38 session = module.Class(db, 'session', title=String())
39 session.disableJournalling()
40 db.post_init()
41 if create:
42 status.create(name="unread")
43 status.create(name="in-progress")
44 status.create(name="testing")
45 status.create(name="resolved")
46 db.commit()
48 class MyTestCase(unittest.TestCase):
49 def tearDown(self):
50 self.db.close()
51 if hasattr(self, 'db2'):
52 self.db2.close()
53 if os.path.exists('_test_dir'):
54 shutil.rmtree('_test_dir')
56 class config:
57 DATABASE='_test_dir'
58 MAILHOST = 'localhost'
59 MAIL_DOMAIN = 'fill.me.in.'
60 TRACKER_NAME = 'Roundup issue tracker'
61 TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
62 TRACKER_WEB = 'http://some.useful.url/'
63 ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
64 FILTER_POSITION = 'bottom' # one of 'top', 'bottom', 'top and bottom'
65 ANONYMOUS_ACCESS = 'deny' # either 'deny' or 'allow'
66 ANONYMOUS_REGISTER = 'deny' # either 'deny' or 'allow'
67 MESSAGES_TO_AUTHOR = 'no' # either 'yes' or 'no'
68 EMAIL_SIGNATURE_POSITION = 'bottom'
70 class anydbmDBTestCase(MyTestCase):
71 def setUp(self):
72 from roundup.backends import anydbm
73 # remove previous test, ignore errors
74 if os.path.exists(config.DATABASE):
75 shutil.rmtree(config.DATABASE)
76 os.makedirs(config.DATABASE + '/files')
77 self.db = anydbm.Database(config, 'test')
78 setupSchema(self.db, 1, anydbm)
79 self.db2 = anydbm.Database(config, 'test')
80 setupSchema(self.db2, 0, anydbm)
82 def testStringChange(self):
83 self.db.issue.create(title="spam", status='1')
84 self.assertEqual(self.db.issue.get('1', 'title'), 'spam')
85 self.db.issue.set('1', title='eggs')
86 self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
87 self.db.commit()
88 self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
89 self.db.issue.create(title="spam", status='1')
90 self.db.commit()
91 self.assertEqual(self.db.issue.get('2', 'title'), 'spam')
92 self.db.issue.set('2', title='ham')
93 self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
94 self.db.commit()
95 self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
96 self.db.issue.set('1', title=None)
97 self.assertEqual(self.db.issue.get('1', "title"), None)
99 def testLinkChange(self):
100 self.db.issue.create(title="spam", status='1')
101 self.assertEqual(self.db.issue.get('1', "status"), '1')
102 self.db.issue.set('1', status='2')
103 self.assertEqual(self.db.issue.get('1', "status"), '2')
104 self.db.issue.set('1', status=None)
105 self.assertEqual(self.db.issue.get('1', "status"), None)
107 def testMultilinkChange(self):
108 u1 = self.db.user.create(username='foo')
109 u2 = self.db.user.create(username='bar')
110 self.db.issue.create(title="spam", nosy=[u1])
111 self.assertEqual(self.db.issue.get('1', "nosy"), [u1])
112 self.db.issue.set('1', nosy=[])
113 self.assertEqual(self.db.issue.get('1', "nosy"), [])
114 self.db.issue.set('1', nosy=[u1,u2])
115 self.assertEqual(self.db.issue.get('1', "nosy"), [u1,u2])
117 def testDateChange(self):
118 self.db.issue.create(title="spam", status='1')
119 a = self.db.issue.get('1', "deadline")
120 self.db.issue.set('1', deadline=date.Date())
121 b = self.db.issue.get('1', "deadline")
122 self.db.commit()
123 self.assertNotEqual(a, b)
124 self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
125 self.db.issue.set('1', deadline=date.Date())
126 self.db.issue.set('1', deadline=None)
127 self.assertEqual(self.db.issue.get('1', "deadline"), None)
129 def testIntervalChange(self):
130 self.db.issue.create(title="spam", status='1')
131 a = self.db.issue.get('1', "foo")
132 self.db.issue.set('1', foo=date.Interval('-1d'))
133 self.assertNotEqual(self.db.issue.get('1', "foo"), a)
134 self.db.issue.set('1', foo=None)
135 self.assertEqual(self.db.issue.get('1', "foo"), None)
137 def testBooleanChange(self):
138 userid = self.db.user.create(username='foo', assignable=1)
139 self.db.user.create(username='foo2', assignable=0)
140 a = self.db.user.get(userid, 'assignable')
141 self.db.user.set(userid, assignable=0)
142 self.assertNotEqual(self.db.user.get(userid, 'assignable'), a)
143 self.db.user.set(userid, assignable=0)
144 self.db.user.set(userid, assignable=1)
145 self.db.user.set('1', assignable=None)
146 self.assertEqual(self.db.user.get('1', "assignable"), None)
148 def testNumberChange(self):
149 self.db.user.create(username='foo', age='1')
150 a = self.db.user.get('1', 'age')
151 self.db.user.set('1', age='3')
152 self.assertNotEqual(self.db.user.get('1', 'age'), a)
153 self.db.user.set('1', age='1.0')
154 self.db.user.set('1', age=None)
155 self.assertEqual(self.db.user.get('1', "age"), None)
157 def testNewProperty(self):
158 self.db.issue.create(title="spam", status='1')
159 self.db.issue.addprop(fixer=Link("user"))
160 # force any post-init stuff to happen
161 self.db.post_init()
162 props = self.db.issue.getprops()
163 keys = props.keys()
164 keys.sort()
165 self.assertEqual(keys, ['activity', 'assignedto', 'creation',
166 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
167 'nosy', 'status', 'superseder', 'title'])
168 self.assertEqual(self.db.issue.get('1', "fixer"), None)
170 def testRetire(self):
171 self.db.issue.create(title="spam", status='1')
172 b = self.db.status.get('1', 'name')
173 a = self.db.status.list()
174 self.db.status.retire('1')
175 # make sure the list is different
176 self.assertNotEqual(a, self.db.status.list())
177 # can still access the node if necessary
178 self.assertEqual(self.db.status.get('1', 'name'), b)
179 self.db.commit()
180 self.assertEqual(self.db.status.get('1', 'name'), b)
181 self.assertNotEqual(a, self.db.status.list())
183 def testSerialisation(self):
184 self.db.issue.create(title="spam", status='1',
185 deadline=date.Date(), foo=date.Interval('-1d'))
186 self.db.commit()
187 assert isinstance(self.db.issue.get('1', 'deadline'), date.Date)
188 assert isinstance(self.db.issue.get('1', 'foo'), date.Interval)
189 self.db.user.create(username="fozzy",
190 password=password.Password('t. bear'))
191 self.db.commit()
192 assert isinstance(self.db.user.get('1', 'password'), password.Password)
194 def testTransactions(self):
195 # remember the number of items we started
196 num_issues = len(self.db.issue.list())
197 num_files = self.db.numfiles()
198 self.db.issue.create(title="don't commit me!", status='1')
199 self.assertNotEqual(num_issues, len(self.db.issue.list()))
200 self.db.rollback()
201 self.assertEqual(num_issues, len(self.db.issue.list()))
202 self.db.issue.create(title="please commit me!", status='1')
203 self.assertNotEqual(num_issues, len(self.db.issue.list()))
204 self.db.commit()
205 self.assertNotEqual(num_issues, len(self.db.issue.list()))
206 self.db.rollback()
207 self.assertNotEqual(num_issues, len(self.db.issue.list()))
208 self.db.file.create(name="test", type="text/plain", content="hi")
209 self.db.rollback()
210 self.assertEqual(num_files, self.db.numfiles())
211 for i in range(10):
212 self.db.file.create(name="test", type="text/plain",
213 content="hi %d"%(i))
214 self.db.commit()
215 num_files2 = self.db.numfiles()
216 self.assertNotEqual(num_files, num_files2)
217 self.db.file.create(name="test", type="text/plain", content="hi")
218 self.db.rollback()
219 self.assertNotEqual(num_files, self.db.numfiles())
220 self.assertEqual(num_files2, self.db.numfiles())
222 def testDestroyNoJournalling(self):
223 self.innerTestDestroy(klass=self.db.session)
225 def testDestroyJournalling(self):
226 self.innerTestDestroy(klass=self.db.issue)
228 def innerTestDestroy(self, klass):
229 newid = klass.create(title='Mr Friendly')
230 n = len(klass.list())
231 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
232 klass.destroy(newid)
233 self.assertRaises(IndexError, klass.get, newid, 'title')
234 self.assertNotEqual(len(klass.list()), n)
235 if klass.do_journal:
236 self.assertRaises(IndexError, klass.history, newid)
238 # now with a commit
239 newid = klass.create(title='Mr Friendly')
240 n = len(klass.list())
241 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
242 self.db.commit()
243 klass.destroy(newid)
244 self.assertRaises(IndexError, klass.get, newid, 'title')
245 self.db.commit()
246 self.assertRaises(IndexError, klass.get, newid, 'title')
247 self.assertNotEqual(len(klass.list()), n)
248 if klass.do_journal:
249 self.assertRaises(IndexError, klass.history, newid)
251 # now with a rollback
252 newid = klass.create(title='Mr Friendly')
253 n = len(klass.list())
254 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
255 self.db.commit()
256 klass.destroy(newid)
257 self.assertNotEqual(len(klass.list()), n)
258 self.assertRaises(IndexError, klass.get, newid, 'title')
259 self.db.rollback()
260 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
261 self.assertEqual(len(klass.list()), n)
262 if klass.do_journal:
263 self.assertNotEqual(klass.history(newid), [])
265 def testExceptions(self):
266 # this tests the exceptions that should be raised
267 ar = self.assertRaises
269 #
270 # class create
271 #
272 # string property
273 ar(TypeError, self.db.status.create, name=1)
274 # invalid property name
275 ar(KeyError, self.db.status.create, foo='foo')
276 # key name clash
277 ar(ValueError, self.db.status.create, name='unread')
278 # invalid link index
279 ar(IndexError, self.db.issue.create, title='foo', status='bar')
280 # invalid link value
281 ar(ValueError, self.db.issue.create, title='foo', status=1)
282 # invalid multilink type
283 ar(TypeError, self.db.issue.create, title='foo', status='1',
284 nosy='hello')
285 # invalid multilink index type
286 ar(ValueError, self.db.issue.create, title='foo', status='1',
287 nosy=[1])
288 # invalid multilink index
289 ar(IndexError, self.db.issue.create, title='foo', status='1',
290 nosy=['10'])
292 #
293 # key property
294 #
295 # key must be a String
296 ar(TypeError, self.db.user.setkey, 'password')
297 # key must exist
298 ar(KeyError, self.db.user.setkey, 'fubar')
300 #
301 # class get
302 #
303 # invalid node id
304 ar(IndexError, self.db.issue.get, '1', 'title')
305 # invalid property name
306 ar(KeyError, self.db.status.get, '2', 'foo')
308 #
309 # class set
310 #
311 # invalid node id
312 ar(IndexError, self.db.issue.set, '1', title='foo')
313 # invalid property name
314 ar(KeyError, self.db.status.set, '1', foo='foo')
315 # string property
316 ar(TypeError, self.db.status.set, '1', name=1)
317 # key name clash
318 ar(ValueError, self.db.status.set, '2', name='unread')
319 # set up a valid issue for me to work on
320 self.db.issue.create(title="spam", status='1')
321 # invalid link index
322 ar(IndexError, self.db.issue.set, '6', title='foo', status='bar')
323 # invalid link value
324 ar(ValueError, self.db.issue.set, '6', title='foo', status=1)
325 # invalid multilink type
326 ar(TypeError, self.db.issue.set, '6', title='foo', status='1',
327 nosy='hello')
328 # invalid multilink index type
329 ar(ValueError, self.db.issue.set, '6', title='foo', status='1',
330 nosy=[1])
331 # invalid multilink index
332 ar(IndexError, self.db.issue.set, '6', title='foo', status='1',
333 nosy=['10'])
334 # invalid number value
335 ar(TypeError, self.db.user.create, username='foo', age='a')
336 # invalid boolean value
337 ar(TypeError, self.db.user.create, username='foo', assignable='true')
338 self.db.user.create(username='foo')
339 # invalid number value
340 ar(TypeError, self.db.user.set, '3', username='foo', age='a')
341 # invalid boolean value
342 ar(TypeError, self.db.user.set, '3', username='foo', assignable='true')
344 def testJournals(self):
345 self.db.user.create(username="mary")
346 self.db.user.create(username="pete")
347 self.db.issue.create(title="spam", status='1')
348 self.db.commit()
350 # journal entry for issue create
351 journal = self.db.getjournal('issue', '1')
352 self.assertEqual(1, len(journal))
353 (nodeid, date_stamp, journaltag, action, params) = journal[0]
354 self.assertEqual(nodeid, '1')
355 self.assertEqual(journaltag, 'test')
356 self.assertEqual(action, 'create')
357 keys = params.keys()
358 keys.sort()
359 self.assertEqual(keys, ['assignedto', 'deadline', 'files',
360 'foo', 'messages', 'nosy', 'status', 'superseder', 'title'])
361 self.assertEqual(None,params['deadline'])
362 self.assertEqual(None,params['foo'])
363 self.assertEqual([],params['nosy'])
364 self.assertEqual('1',params['status'])
365 self.assertEqual('spam',params['title'])
367 # journal entry for link
368 journal = self.db.getjournal('user', '1')
369 self.assertEqual(1, len(journal))
370 self.db.issue.set('1', assignedto='1')
371 self.db.commit()
372 journal = self.db.getjournal('user', '1')
373 self.assertEqual(2, len(journal))
374 (nodeid, date_stamp, journaltag, action, params) = journal[1]
375 self.assertEqual('1', nodeid)
376 self.assertEqual('test', journaltag)
377 self.assertEqual('link', action)
378 self.assertEqual(('issue', '1', 'assignedto'), params)
380 # journal entry for unlink
381 self.db.issue.set('1', assignedto='2')
382 self.db.commit()
383 journal = self.db.getjournal('user', '1')
384 self.assertEqual(3, len(journal))
385 (nodeid, date_stamp, journaltag, action, params) = journal[2]
386 self.assertEqual('1', nodeid)
387 self.assertEqual('test', journaltag)
388 self.assertEqual('unlink', action)
389 self.assertEqual(('issue', '1', 'assignedto'), params)
391 # test disabling journalling
392 # ... get the last entry
393 time.sleep(1)
394 entry = self.db.getjournal('issue', '1')[-1]
395 (x, date_stamp, x, x, x) = entry
396 self.db.issue.disableJournalling()
397 self.db.issue.set('1', title='hello world')
398 self.db.commit()
399 entry = self.db.getjournal('issue', '1')[-1]
400 (x, date_stamp2, x, x, x) = entry
401 # see if the change was journalled when it shouldn't have been
402 self.assertEqual(date_stamp, date_stamp2)
403 self.db.issue.enableJournalling()
404 self.db.issue.set('1', title='hello world 2')
405 self.db.commit()
406 entry = self.db.getjournal('issue', '1')[-1]
407 (x, date_stamp2, x, x, x) = entry
408 # see if the change was journalled
409 self.assertNotEqual(date_stamp, date_stamp2)
411 def testPack(self):
412 self.db.issue.create(title="spam", status='1')
413 self.db.commit()
414 self.db.issue.set('1', status='2')
415 self.db.commit()
417 # sleep for at least a second, then get a date to pack at
418 time.sleep(1)
419 pack_before = date.Date('.')
421 # one more entry
422 self.db.issue.set('1', status='3')
423 self.db.commit()
425 # pack
426 self.db.pack(pack_before)
427 journal = self.db.getjournal('issue', '1')
429 # we should have the create and last set entries now
430 self.assertEqual(2, len(journal))
432 def testIDGeneration(self):
433 id1 = self.db.issue.create(title="spam", status='1')
434 id2 = self.db2.issue.create(title="eggs", status='2')
435 self.assertNotEqual(id1, id2)
437 def testSearching(self):
438 self.db.file.create(content='hello', type="text/plain")
439 self.db.file.create(content='world', type="text/frozz",
440 comment='blah blah')
441 self.db.issue.create(files=['1', '2'], title="flebble plop")
442 self.db.issue.create(title="flebble frooz")
443 self.db.commit()
444 self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
445 {'1': {'files': ['1']}})
446 self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
447 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
448 {'2': {}})
449 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
450 {'2': {}, '1': {}})
452 def testReindexing(self):
453 self.db.issue.create(title="frooz")
454 self.db.commit()
455 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
456 {'1': {}})
457 self.db.issue.set('1', title="dooble")
458 self.db.commit()
459 self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
460 {'1': {}})
461 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
463 def testForcedReindexing(self):
464 self.db.issue.create(title="flebble frooz")
465 self.db.commit()
466 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
467 {'1': {}})
468 self.db.indexer.quiet = 1
469 self.db.indexer.force_reindex()
470 self.db.post_init()
471 self.db.indexer.quiet = 9
472 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
473 {'1': {}})
475 class anydbmReadOnlyDBTestCase(MyTestCase):
476 def setUp(self):
477 from roundup.backends import anydbm
478 # remove previous test, ignore errors
479 if os.path.exists(config.DATABASE):
480 shutil.rmtree(config.DATABASE)
481 os.makedirs(config.DATABASE + '/files')
482 db = anydbm.Database(config, 'test')
483 setupSchema(db, 1, anydbm)
484 self.db = anydbm.Database(config)
485 setupSchema(self.db, 0, anydbm)
486 self.db2 = anydbm.Database(config, 'test')
487 setupSchema(self.db2, 0, anydbm)
489 def testExceptions(self):
490 # this tests the exceptions that should be raised
491 ar = self.assertRaises
493 # this tests the exceptions that should be raised
494 ar(DatabaseError, self.db.status.create, name="foo")
495 ar(DatabaseError, self.db.status.set, '1', name="foo")
496 ar(DatabaseError, self.db.status.retire, '1')
499 class bsddbDBTestCase(anydbmDBTestCase):
500 def setUp(self):
501 from roundup.backends import bsddb
502 # remove previous test, ignore errors
503 if os.path.exists(config.DATABASE):
504 shutil.rmtree(config.DATABASE)
505 os.makedirs(config.DATABASE + '/files')
506 self.db = bsddb.Database(config, 'test')
507 setupSchema(self.db, 1, bsddb)
508 self.db2 = bsddb.Database(config, 'test')
509 setupSchema(self.db2, 0, bsddb)
511 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
512 def setUp(self):
513 from roundup.backends import bsddb
514 # remove previous test, ignore errors
515 if os.path.exists(config.DATABASE):
516 shutil.rmtree(config.DATABASE)
517 os.makedirs(config.DATABASE + '/files')
518 db = bsddb.Database(config, 'test')
519 setupSchema(db, 1, bsddb)
520 self.db = bsddb.Database(config)
521 setupSchema(self.db, 0, bsddb)
522 self.db2 = bsddb.Database(config, 'test')
523 setupSchema(self.db2, 0, bsddb)
526 class bsddb3DBTestCase(anydbmDBTestCase):
527 def setUp(self):
528 from roundup.backends import bsddb3
529 # remove previous test, ignore errors
530 if os.path.exists(config.DATABASE):
531 shutil.rmtree(config.DATABASE)
532 os.makedirs(config.DATABASE + '/files')
533 self.db = bsddb3.Database(config, 'test')
534 setupSchema(self.db, 1, bsddb3)
535 self.db2 = bsddb3.Database(config, 'test')
536 setupSchema(self.db2, 0, bsddb3)
538 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
539 def setUp(self):
540 from roundup.backends import bsddb3
541 # remove previous test, ignore errors
542 if os.path.exists(config.DATABASE):
543 shutil.rmtree(config.DATABASE)
544 os.makedirs(config.DATABASE + '/files')
545 db = bsddb3.Database(config, 'test')
546 setupSchema(db, 1, bsddb3)
547 self.db = bsddb3.Database(config)
548 setupSchema(self.db, 0, bsddb3)
549 self.db2 = bsddb3.Database(config, 'test')
550 setupSchema(self.db2, 0, bsddb3)
553 class gadflyDBTestCase(anydbmDBTestCase):
554 ''' Gadfly doesn't support multiple connections to the one local
555 database
556 '''
557 def setUp(self):
558 from roundup.backends import gadfly
559 # remove previous test, ignore errors
560 if os.path.exists(config.DATABASE):
561 shutil.rmtree(config.DATABASE)
562 config.GADFLY_DATABASE = ('test', config.DATABASE)
563 os.makedirs(config.DATABASE + '/files')
564 self.db = gadfly.Database(config, 'test')
565 setupSchema(self.db, 1, gadfly)
567 def testIDGeneration(self):
568 id1 = self.db.issue.create(title="spam", status='1')
569 id2 = self.db.issue.create(title="eggs", status='2')
570 self.assertNotEqual(id1, id2)
572 def testNewProperty(self):
573 # gadfly doesn't have an ALTER TABLE command :(
574 pass
576 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
577 def setUp(self):
578 from roundup.backends import gadfly
579 # remove previous test, ignore errors
580 if os.path.exists(config.DATABASE):
581 shutil.rmtree(config.DATABASE)
582 config.GADFLY_DATABASE = ('test', config.DATABASE)
583 os.makedirs(config.DATABASE + '/files')
584 db = gadfly.Database(config, 'test')
585 setupSchema(db, 1, gadfly)
586 self.db = gadfly.Database(config)
587 setupSchema(self.db, 0, gadfly)
590 class metakitDBTestCase(anydbmDBTestCase):
591 def setUp(self):
592 from roundup.backends import metakit
593 import weakref
594 metakit._instances = weakref.WeakValueDictionary()
595 # remove previous test, ignore errors
596 if os.path.exists(config.DATABASE):
597 shutil.rmtree(config.DATABASE)
598 os.makedirs(config.DATABASE + '/files')
599 self.db = metakit.Database(config, 'test')
600 setupSchema(self.db, 1, metakit)
601 self.db2 = metakit.Database(config, 'test')
602 setupSchema(self.db2, 0, metakit)
604 def testTransactions(self):
605 # remember the number of items we started
606 num_issues = len(self.db.issue.list())
607 self.db.issue.create(title="don't commit me!", status='1')
608 self.assertNotEqual(num_issues, len(self.db.issue.list()))
609 self.db.rollback()
610 self.assertEqual(num_issues, len(self.db.issue.list()))
611 self.db.issue.create(title="please commit me!", status='1')
612 self.assertNotEqual(num_issues, len(self.db.issue.list()))
613 self.db.commit()
614 self.assertNotEqual(num_issues, len(self.db.issue.list()))
615 self.db.rollback()
616 self.assertNotEqual(num_issues, len(self.db.issue.list()))
617 self.db.file.create(name="test", type="text/plain", content="hi")
618 self.db.rollback()
619 for i in range(10):
620 self.db.file.create(name="test", type="text/plain",
621 content="hi %d"%(i))
622 self.db.commit()
623 # TODO: would be good to be able to ensure the file is not on disk after
624 # a rollback...
625 self.assertNotEqual(num_files, num_files2)
626 self.db.file.create(name="test", type="text/plain", content="hi")
627 self.db.rollback()
629 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
630 def setUp(self):
631 from roundup.backends import metakit
632 import weakref
633 metakit._instances = weakref.WeakValueDictionary()
634 # remove previous test, ignore errors
635 if os.path.exists(config.DATABASE):
636 shutil.rmtree(config.DATABASE)
637 os.makedirs(config.DATABASE + '/files')
638 db = metakit.Database(config, 'test')
639 setupSchema(db, 1, metakit)
640 self.db = metakit.Database(config)
641 setupSchema(self.db, 0, metakit)
642 self.db2 = metakit.Database(config, 'test')
643 setupSchema(self.db2, 0, metakit)
645 def suite():
646 l = [
647 unittest.makeSuite(anydbmDBTestCase, 'test'),
648 unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
649 ]
650 #return unittest.TestSuite(l)
652 try:
653 import bsddb
654 l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
655 l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
656 except:
657 print 'bsddb module not found, skipping bsddb DBTestCase'
659 try:
660 import bsddb3
661 l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
662 l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
663 except:
664 print 'bsddb3 module not found, skipping bsddb3 DBTestCase'
666 try:
667 import gadfly
668 l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
669 l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
670 except:
671 print 'gadfly module not found, skipping gadfly DBTestCase'
673 try:
674 import metakit
675 l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
676 l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
677 except:
678 print 'metakit module not found, skipping metakit DBTestCase'
680 return unittest.TestSuite(l)
682 # vim: set filetype=python ts=4 sw=4 et si