6d5cabc0cd6cbc9bafe2c7101cba30e14c5a55cc
1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17 #
18 # $Id: test_db.py,v 1.49 2002-09-18 07:04:39 richard Exp $
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23 Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28 status = module.Class(db, "status", name=String())
29 status.setkey("name")
30 user = module.Class(db, "user", username=String(), password=Password(),
31 assignable=Boolean(), age=Number(), roles=String())
32 user.setkey("username")
33 file = module.FileClass(db, "file", name=String(), type=String(),
34 comment=String(indexme="yes"))
35 issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36 status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37 foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38 session = module.Class(db, 'session', title=String())
39 session.disableJournalling()
40 db.post_init()
41 if create:
42 status.create(name="unread")
43 status.create(name="in-progress")
44 status.create(name="testing")
45 status.create(name="resolved")
46 db.commit()
48 class MyTestCase(unittest.TestCase):
49 def tearDown(self):
50 self.db.close()
51 if hasattr(self, 'db2'):
52 self.db2.close()
53 if os.path.exists('_test_dir'):
54 shutil.rmtree('_test_dir')
56 class config:
57 DATABASE='_test_dir'
58 MAILHOST = 'localhost'
59 MAIL_DOMAIN = 'fill.me.in.'
60 TRACKER_NAME = 'Roundup issue tracker'
61 TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
62 TRACKER_WEB = 'http://some.useful.url/'
63 ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
64 FILTER_POSITION = 'bottom' # one of 'top', 'bottom', 'top and bottom'
65 ANONYMOUS_ACCESS = 'deny' # either 'deny' or 'allow'
66 ANONYMOUS_REGISTER = 'deny' # either 'deny' or 'allow'
67 MESSAGES_TO_AUTHOR = 'no' # either 'yes' or 'no'
68 EMAIL_SIGNATURE_POSITION = 'bottom'
70 class anydbmDBTestCase(MyTestCase):
71 def setUp(self):
72 from roundup.backends import anydbm
73 # remove previous test, ignore errors
74 if os.path.exists(config.DATABASE):
75 shutil.rmtree(config.DATABASE)
76 os.makedirs(config.DATABASE + '/files')
77 self.db = anydbm.Database(config, 'test')
78 setupSchema(self.db, 1, anydbm)
79 self.db2 = anydbm.Database(config, 'test')
80 setupSchema(self.db2, 0, anydbm)
82 def testStringChange(self):
83 # test set & retrieve
84 self.db.issue.create(title="spam", status='1')
85 self.assertEqual(self.db.issue.get('1', 'title'), 'spam')
87 # change and make sure we retrieve the correct value
88 self.db.issue.set('1', title='eggs')
89 self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
91 # do some commit stuff
92 self.db.commit()
93 self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
94 self.db.issue.create(title="spam", status='1')
95 self.db.commit()
96 self.assertEqual(self.db.issue.get('2', 'title'), 'spam')
97 self.db.issue.set('2', title='ham')
98 self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
99 self.db.commit()
100 self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
102 # make sure we can unset
103 self.db.issue.set('1', title=None)
104 self.assertEqual(self.db.issue.get('1', "title"), None)
106 def testLinkChange(self):
107 self.db.issue.create(title="spam", status='1')
108 self.assertEqual(self.db.issue.get('1', "status"), '1')
109 self.db.issue.set('1', status='2')
110 self.assertEqual(self.db.issue.get('1', "status"), '2')
111 self.db.issue.set('1', status=None)
112 self.assertEqual(self.db.issue.get('1', "status"), None)
114 def testMultilinkChange(self):
115 u1 = self.db.user.create(username='foo')
116 u2 = self.db.user.create(username='bar')
117 self.db.issue.create(title="spam", nosy=[u1])
118 self.assertEqual(self.db.issue.get('1', "nosy"), [u1])
119 self.db.issue.set('1', nosy=[])
120 self.assertEqual(self.db.issue.get('1', "nosy"), [])
121 self.db.issue.set('1', nosy=[u1,u2])
122 self.assertEqual(self.db.issue.get('1', "nosy"), [u1,u2])
124 def testDateChange(self):
125 self.db.issue.create(title="spam", status='1')
126 a = self.db.issue.get('1', "deadline")
127 self.db.issue.set('1', deadline=date.Date())
128 b = self.db.issue.get('1', "deadline")
129 self.db.commit()
130 self.assertNotEqual(a, b)
131 self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
132 self.db.issue.set('1', deadline=date.Date())
133 self.db.issue.set('1', deadline=None)
134 self.assertEqual(self.db.issue.get('1', "deadline"), None)
136 def testIntervalChange(self):
137 self.db.issue.create(title="spam", status='1')
138 a = self.db.issue.get('1', "foo")
139 self.db.issue.set('1', foo=date.Interval('-1d'))
140 self.assertNotEqual(self.db.issue.get('1', "foo"), a)
141 self.db.issue.set('1', foo=None)
142 self.assertEqual(self.db.issue.get('1', "foo"), None)
144 def testBooleanChange(self):
145 userid = self.db.user.create(username='foo', assignable=1)
146 self.assertEqual(1, self.db.user.get(userid, 'assignable'))
147 self.db.user.set(userid, assignable=0)
148 self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
149 self.db.user.set(userid, assignable=None)
150 self.assertEqual(self.db.user.get('1', "assignable"), None)
152 def testNumberChange(self):
153 self.db.user.create(username='foo', age=1)
154 self.assertEqual(1, self.db.user.get('1', 'age'))
155 self.db.user.set('1', age=3)
156 self.assertNotEqual(self.db.user.get('1', 'age'), 1)
157 self.db.user.set('1', age=1.0)
158 self.db.user.set('1', age=None)
159 self.assertEqual(self.db.user.get('1', "age"), None)
161 def testNewProperty(self):
162 self.db.issue.create(title="spam", status='1')
163 self.db.issue.addprop(fixer=Link("user"))
164 # force any post-init stuff to happen
165 self.db.post_init()
166 props = self.db.issue.getprops()
167 keys = props.keys()
168 keys.sort()
169 self.assertEqual(keys, ['activity', 'assignedto', 'creation',
170 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
171 'nosy', 'status', 'superseder', 'title'])
172 self.assertEqual(self.db.issue.get('1', "fixer"), None)
174 def testRetire(self):
175 self.db.issue.create(title="spam", status='1')
176 b = self.db.status.get('1', 'name')
177 a = self.db.status.list()
178 self.db.status.retire('1')
179 # make sure the list is different
180 self.assertNotEqual(a, self.db.status.list())
181 # can still access the node if necessary
182 self.assertEqual(self.db.status.get('1', 'name'), b)
183 self.db.commit()
184 self.assertEqual(self.db.status.get('1', 'name'), b)
185 self.assertNotEqual(a, self.db.status.list())
187 def testSerialisation(self):
188 self.db.issue.create(title="spam", status='1',
189 deadline=date.Date(), foo=date.Interval('-1d'))
190 self.db.commit()
191 assert isinstance(self.db.issue.get('1', 'deadline'), date.Date)
192 assert isinstance(self.db.issue.get('1', 'foo'), date.Interval)
193 self.db.user.create(username="fozzy",
194 password=password.Password('t. bear'))
195 self.db.commit()
196 assert isinstance(self.db.user.get('1', 'password'), password.Password)
198 def testTransactions(self):
199 # remember the number of items we started
200 num_issues = len(self.db.issue.list())
201 num_files = self.db.numfiles()
202 self.db.issue.create(title="don't commit me!", status='1')
203 self.assertNotEqual(num_issues, len(self.db.issue.list()))
204 self.db.rollback()
205 self.assertEqual(num_issues, len(self.db.issue.list()))
206 self.db.issue.create(title="please commit me!", status='1')
207 self.assertNotEqual(num_issues, len(self.db.issue.list()))
208 self.db.commit()
209 self.assertNotEqual(num_issues, len(self.db.issue.list()))
210 self.db.rollback()
211 self.assertNotEqual(num_issues, len(self.db.issue.list()))
212 self.db.file.create(name="test", type="text/plain", content="hi")
213 self.db.rollback()
214 self.assertEqual(num_files, self.db.numfiles())
215 for i in range(10):
216 self.db.file.create(name="test", type="text/plain",
217 content="hi %d"%(i))
218 self.db.commit()
219 num_files2 = self.db.numfiles()
220 self.assertNotEqual(num_files, num_files2)
221 self.db.file.create(name="test", type="text/plain", content="hi")
222 self.db.rollback()
223 self.assertNotEqual(num_files, self.db.numfiles())
224 self.assertEqual(num_files2, self.db.numfiles())
226 def testDestroyNoJournalling(self):
227 self.innerTestDestroy(klass=self.db.session)
229 def testDestroyJournalling(self):
230 self.innerTestDestroy(klass=self.db.issue)
232 def innerTestDestroy(self, klass):
233 newid = klass.create(title='Mr Friendly')
234 n = len(klass.list())
235 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
236 klass.destroy(newid)
237 self.assertRaises(IndexError, klass.get, newid, 'title')
238 self.assertNotEqual(len(klass.list()), n)
239 if klass.do_journal:
240 self.assertRaises(IndexError, klass.history, newid)
242 # now with a commit
243 newid = klass.create(title='Mr Friendly')
244 n = len(klass.list())
245 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
246 self.db.commit()
247 klass.destroy(newid)
248 self.assertRaises(IndexError, klass.get, newid, 'title')
249 self.db.commit()
250 self.assertRaises(IndexError, klass.get, newid, 'title')
251 self.assertNotEqual(len(klass.list()), n)
252 if klass.do_journal:
253 self.assertRaises(IndexError, klass.history, newid)
255 # now with a rollback
256 newid = klass.create(title='Mr Friendly')
257 n = len(klass.list())
258 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
259 self.db.commit()
260 klass.destroy(newid)
261 self.assertNotEqual(len(klass.list()), n)
262 self.assertRaises(IndexError, klass.get, newid, 'title')
263 self.db.rollback()
264 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
265 self.assertEqual(len(klass.list()), n)
266 if klass.do_journal:
267 self.assertNotEqual(klass.history(newid), [])
269 def testExceptions(self):
270 # this tests the exceptions that should be raised
271 ar = self.assertRaises
273 #
274 # class create
275 #
276 # string property
277 ar(TypeError, self.db.status.create, name=1)
278 # invalid property name
279 ar(KeyError, self.db.status.create, foo='foo')
280 # key name clash
281 ar(ValueError, self.db.status.create, name='unread')
282 # invalid link index
283 ar(IndexError, self.db.issue.create, title='foo', status='bar')
284 # invalid link value
285 ar(ValueError, self.db.issue.create, title='foo', status=1)
286 # invalid multilink type
287 ar(TypeError, self.db.issue.create, title='foo', status='1',
288 nosy='hello')
289 # invalid multilink index type
290 ar(ValueError, self.db.issue.create, title='foo', status='1',
291 nosy=[1])
292 # invalid multilink index
293 ar(IndexError, self.db.issue.create, title='foo', status='1',
294 nosy=['10'])
296 #
297 # key property
298 #
299 # key must be a String
300 ar(TypeError, self.db.user.setkey, 'password')
301 # key must exist
302 ar(KeyError, self.db.user.setkey, 'fubar')
304 #
305 # class get
306 #
307 # invalid node id
308 ar(IndexError, self.db.issue.get, '1', 'title')
309 # invalid property name
310 ar(KeyError, self.db.status.get, '2', 'foo')
312 #
313 # class set
314 #
315 # invalid node id
316 ar(IndexError, self.db.issue.set, '1', title='foo')
317 # invalid property name
318 ar(KeyError, self.db.status.set, '1', foo='foo')
319 # string property
320 ar(TypeError, self.db.status.set, '1', name=1)
321 # key name clash
322 ar(ValueError, self.db.status.set, '2', name='unread')
323 # set up a valid issue for me to work on
324 self.db.issue.create(title="spam", status='1')
325 # invalid link index
326 ar(IndexError, self.db.issue.set, '6', title='foo', status='bar')
327 # invalid link value
328 ar(ValueError, self.db.issue.set, '6', title='foo', status=1)
329 # invalid multilink type
330 ar(TypeError, self.db.issue.set, '6', title='foo', status='1',
331 nosy='hello')
332 # invalid multilink index type
333 ar(ValueError, self.db.issue.set, '6', title='foo', status='1',
334 nosy=[1])
335 # invalid multilink index
336 ar(IndexError, self.db.issue.set, '6', title='foo', status='1',
337 nosy=['10'])
338 # invalid number value
339 ar(TypeError, self.db.user.create, username='foo', age='a')
340 # invalid boolean value
341 ar(TypeError, self.db.user.create, username='foo', assignable='true')
342 self.db.user.create(username='foo')
343 # invalid number value
344 ar(TypeError, self.db.user.set, '3', username='foo', age='a')
345 # invalid boolean value
346 ar(TypeError, self.db.user.set, '3', username='foo', assignable='true')
348 def testJournals(self):
349 self.db.user.create(username="mary")
350 self.db.user.create(username="pete")
351 self.db.issue.create(title="spam", status='1')
352 self.db.commit()
354 # journal entry for issue create
355 journal = self.db.getjournal('issue', '1')
356 self.assertEqual(1, len(journal))
357 (nodeid, date_stamp, journaltag, action, params) = journal[0]
358 self.assertEqual(nodeid, '1')
359 self.assertEqual(journaltag, 'test')
360 self.assertEqual(action, 'create')
361 keys = params.keys()
362 keys.sort()
363 self.assertEqual(keys, ['assignedto', 'deadline', 'files',
364 'foo', 'messages', 'nosy', 'status', 'superseder', 'title'])
365 self.assertEqual(None,params['deadline'])
366 self.assertEqual(None,params['foo'])
367 self.assertEqual([],params['nosy'])
368 self.assertEqual('1',params['status'])
369 self.assertEqual('spam',params['title'])
371 # journal entry for link
372 journal = self.db.getjournal('user', '1')
373 self.assertEqual(1, len(journal))
374 self.db.issue.set('1', assignedto='1')
375 self.db.commit()
376 journal = self.db.getjournal('user', '1')
377 self.assertEqual(2, len(journal))
378 (nodeid, date_stamp, journaltag, action, params) = journal[1]
379 self.assertEqual('1', nodeid)
380 self.assertEqual('test', journaltag)
381 self.assertEqual('link', action)
382 self.assertEqual(('issue', '1', 'assignedto'), params)
384 # journal entry for unlink
385 self.db.issue.set('1', assignedto='2')
386 self.db.commit()
387 journal = self.db.getjournal('user', '1')
388 self.assertEqual(3, len(journal))
389 (nodeid, date_stamp, journaltag, action, params) = journal[2]
390 self.assertEqual('1', nodeid)
391 self.assertEqual('test', journaltag)
392 self.assertEqual('unlink', action)
393 self.assertEqual(('issue', '1', 'assignedto'), params)
395 # test disabling journalling
396 # ... get the last entry
397 time.sleep(1)
398 entry = self.db.getjournal('issue', '1')[-1]
399 (x, date_stamp, x, x, x) = entry
400 self.db.issue.disableJournalling()
401 self.db.issue.set('1', title='hello world')
402 self.db.commit()
403 entry = self.db.getjournal('issue', '1')[-1]
404 (x, date_stamp2, x, x, x) = entry
405 # see if the change was journalled when it shouldn't have been
406 self.assertEqual(date_stamp, date_stamp2)
407 self.db.issue.enableJournalling()
408 self.db.issue.set('1', title='hello world 2')
409 self.db.commit()
410 entry = self.db.getjournal('issue', '1')[-1]
411 (x, date_stamp2, x, x, x) = entry
412 # see if the change was journalled
413 self.assertNotEqual(date_stamp, date_stamp2)
415 def testPack(self):
416 self.db.issue.create(title="spam", status='1')
417 self.db.commit()
418 self.db.issue.set('1', status='2')
419 self.db.commit()
421 # sleep for at least a second, then get a date to pack at
422 time.sleep(1)
423 pack_before = date.Date('.')
425 # one more entry
426 self.db.issue.set('1', status='3')
427 self.db.commit()
429 # pack
430 self.db.pack(pack_before)
431 journal = self.db.getjournal('issue', '1')
433 # we should have the create and last set entries now
434 self.assertEqual(2, len(journal))
436 def testIDGeneration(self):
437 id1 = self.db.issue.create(title="spam", status='1')
438 id2 = self.db2.issue.create(title="eggs", status='2')
439 self.assertNotEqual(id1, id2)
441 def testSearching(self):
442 self.db.file.create(content='hello', type="text/plain")
443 self.db.file.create(content='world', type="text/frozz",
444 comment='blah blah')
445 self.db.issue.create(files=['1', '2'], title="flebble plop")
446 self.db.issue.create(title="flebble frooz")
447 self.db.commit()
448 self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
449 {'1': {'files': ['1']}})
450 self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
451 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
452 {'2': {}})
453 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
454 {'2': {}, '1': {}})
456 def testReindexing(self):
457 self.db.issue.create(title="frooz")
458 self.db.commit()
459 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
460 {'1': {}})
461 self.db.issue.set('1', title="dooble")
462 self.db.commit()
463 self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
464 {'1': {}})
465 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
467 def testForcedReindexing(self):
468 self.db.issue.create(title="flebble frooz")
469 self.db.commit()
470 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
471 {'1': {}})
472 self.db.indexer.quiet = 1
473 self.db.indexer.force_reindex()
474 self.db.post_init()
475 self.db.indexer.quiet = 9
476 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
477 {'1': {}})
479 class anydbmReadOnlyDBTestCase(MyTestCase):
480 def setUp(self):
481 from roundup.backends import anydbm
482 # remove previous test, ignore errors
483 if os.path.exists(config.DATABASE):
484 shutil.rmtree(config.DATABASE)
485 os.makedirs(config.DATABASE + '/files')
486 db = anydbm.Database(config, 'test')
487 setupSchema(db, 1, anydbm)
488 self.db = anydbm.Database(config)
489 setupSchema(self.db, 0, anydbm)
490 self.db2 = anydbm.Database(config, 'test')
491 setupSchema(self.db2, 0, anydbm)
493 def testExceptions(self):
494 # this tests the exceptions that should be raised
495 ar = self.assertRaises
497 # this tests the exceptions that should be raised
498 ar(DatabaseError, self.db.status.create, name="foo")
499 ar(DatabaseError, self.db.status.set, '1', name="foo")
500 ar(DatabaseError, self.db.status.retire, '1')
503 class bsddbDBTestCase(anydbmDBTestCase):
504 def setUp(self):
505 from roundup.backends import bsddb
506 # remove previous test, ignore errors
507 if os.path.exists(config.DATABASE):
508 shutil.rmtree(config.DATABASE)
509 os.makedirs(config.DATABASE + '/files')
510 self.db = bsddb.Database(config, 'test')
511 setupSchema(self.db, 1, bsddb)
512 self.db2 = bsddb.Database(config, 'test')
513 setupSchema(self.db2, 0, bsddb)
515 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
516 def setUp(self):
517 from roundup.backends import bsddb
518 # remove previous test, ignore errors
519 if os.path.exists(config.DATABASE):
520 shutil.rmtree(config.DATABASE)
521 os.makedirs(config.DATABASE + '/files')
522 db = bsddb.Database(config, 'test')
523 setupSchema(db, 1, bsddb)
524 self.db = bsddb.Database(config)
525 setupSchema(self.db, 0, bsddb)
526 self.db2 = bsddb.Database(config, 'test')
527 setupSchema(self.db2, 0, bsddb)
530 class bsddb3DBTestCase(anydbmDBTestCase):
531 def setUp(self):
532 from roundup.backends import bsddb3
533 # remove previous test, ignore errors
534 if os.path.exists(config.DATABASE):
535 shutil.rmtree(config.DATABASE)
536 os.makedirs(config.DATABASE + '/files')
537 self.db = bsddb3.Database(config, 'test')
538 setupSchema(self.db, 1, bsddb3)
539 self.db2 = bsddb3.Database(config, 'test')
540 setupSchema(self.db2, 0, bsddb3)
542 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
543 def setUp(self):
544 from roundup.backends import bsddb3
545 # remove previous test, ignore errors
546 if os.path.exists(config.DATABASE):
547 shutil.rmtree(config.DATABASE)
548 os.makedirs(config.DATABASE + '/files')
549 db = bsddb3.Database(config, 'test')
550 setupSchema(db, 1, bsddb3)
551 self.db = bsddb3.Database(config)
552 setupSchema(self.db, 0, bsddb3)
553 self.db2 = bsddb3.Database(config, 'test')
554 setupSchema(self.db2, 0, bsddb3)
557 class gadflyDBTestCase(anydbmDBTestCase):
558 ''' Gadfly doesn't support multiple connections to the one local
559 database
560 '''
561 def setUp(self):
562 from roundup.backends import gadfly
563 # remove previous test, ignore errors
564 if os.path.exists(config.DATABASE):
565 shutil.rmtree(config.DATABASE)
566 config.GADFLY_DATABASE = ('test', config.DATABASE)
567 os.makedirs(config.DATABASE + '/files')
568 self.db = gadfly.Database(config, 'test')
569 setupSchema(self.db, 1, gadfly)
571 def testIDGeneration(self):
572 id1 = self.db.issue.create(title="spam", status='1')
573 id2 = self.db.issue.create(title="eggs", status='2')
574 self.assertNotEqual(id1, id2)
576 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
577 def setUp(self):
578 from roundup.backends import gadfly
579 # remove previous test, ignore errors
580 if os.path.exists(config.DATABASE):
581 shutil.rmtree(config.DATABASE)
582 config.GADFLY_DATABASE = ('test', config.DATABASE)
583 os.makedirs(config.DATABASE + '/files')
584 db = gadfly.Database(config, 'test')
585 setupSchema(db, 1, gadfly)
586 self.db = gadfly.Database(config)
587 setupSchema(self.db, 0, gadfly)
590 class sqliteDBTestCase(anydbmDBTestCase):
591 def setUp(self):
592 from roundup.backends import sqlite
593 # remove previous test, ignore errors
594 if os.path.exists(config.DATABASE):
595 shutil.rmtree(config.DATABASE)
596 os.makedirs(config.DATABASE + '/files')
597 self.db = sqlite.Database(config, 'test')
598 setupSchema(self.db, 1, sqlite)
600 def testIDGeneration(self):
601 pass
603 class sqliteReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
604 def setUp(self):
605 from roundup.backends import sqlite
606 # remove previous test, ignore errors
607 if os.path.exists(config.DATABASE):
608 shutil.rmtree(config.DATABASE)
609 os.makedirs(config.DATABASE + '/files')
610 db = sqlite.Database(config, 'test')
611 setupSchema(db, 1, sqlite)
612 self.db = sqlite.Database(config)
613 setupSchema(self.db, 0, sqlite)
616 class metakitDBTestCase(anydbmDBTestCase):
617 def setUp(self):
618 from roundup.backends import metakit
619 import weakref
620 metakit._instances = weakref.WeakValueDictionary()
621 # remove previous test, ignore errors
622 if os.path.exists(config.DATABASE):
623 shutil.rmtree(config.DATABASE)
624 os.makedirs(config.DATABASE + '/files')
625 self.db = metakit.Database(config, 'test')
626 setupSchema(self.db, 1, metakit)
627 #self.db2 = metakit.Database(config, 'test')
628 #setupSchema(self.db2, 0, metakit)
630 def testIDGeneration(self):
631 id1 = self.db.issue.create(title="spam", status='1')
632 id2 = self.db.issue.create(title="eggs", status='2')
633 self.assertNotEqual(id1, id2)
635 def testTransactions(self):
636 # remember the number of items we started
637 num_issues = len(self.db.issue.list())
638 self.db.issue.create(title="don't commit me!", status='1')
639 self.assertNotEqual(num_issues, len(self.db.issue.list()))
640 self.db.rollback()
641 self.assertEqual(num_issues, len(self.db.issue.list()))
642 self.db.issue.create(title="please commit me!", status='1')
643 self.assertNotEqual(num_issues, len(self.db.issue.list()))
644 self.db.commit()
645 self.assertNotEqual(num_issues, len(self.db.issue.list()))
646 self.db.rollback()
647 self.assertNotEqual(num_issues, len(self.db.issue.list()))
648 self.db.file.create(name="test", type="text/plain", content="hi")
649 self.db.rollback()
650 for i in range(10):
651 self.db.file.create(name="test", type="text/plain",
652 content="hi %d"%(i))
653 self.db.commit()
654 # TODO: would be good to be able to ensure the file is not on disk after
655 # a rollback...
656 self.assertNotEqual(num_files, num_files2)
657 self.db.file.create(name="test", type="text/plain", content="hi")
658 self.db.rollback()
660 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
661 def setUp(self):
662 from roundup.backends import metakit
663 import weakref
664 metakit._instances = weakref.WeakValueDictionary()
665 # remove previous test, ignore errors
666 if os.path.exists(config.DATABASE):
667 shutil.rmtree(config.DATABASE)
668 os.makedirs(config.DATABASE + '/files')
669 db = metakit.Database(config, 'test')
670 setupSchema(db, 1, metakit)
671 self.db = metakit.Database(config)
672 setupSchema(self.db, 0, metakit)
673 # self.db2 = metakit.Database(config, 'test')
674 # setupSchema(self.db2, 0, metakit)
676 def suite():
677 l = [
678 unittest.makeSuite(anydbmDBTestCase, 'test'),
679 unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
680 ]
681 # return unittest.TestSuite(l)
683 try:
684 import bsddb
685 l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
686 l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
687 except:
688 print 'bsddb module not found, skipping bsddb DBTestCase'
690 try:
691 import bsddb3
692 l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
693 l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
694 except:
695 print 'bsddb3 module not found, skipping bsddb3 DBTestCase'
697 try:
698 import gadfly
699 l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
700 l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
701 except:
702 print 'gadfly module not found, skipping gadfly DBTestCase'
704 try:
705 import sqlite
706 l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
707 l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
708 except:
709 print 'sqlite module not found, skipping gadfly DBTestCase'
711 try:
712 import metakit
713 l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
714 l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
715 except:
716 print 'metakit module not found, skipping metakit DBTestCase'
718 return unittest.TestSuite(l)
720 # vim: set filetype=python ts=4 sw=4 et si