1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17 #
18 # $Id: test_db.py,v 1.55 2002-09-24 01:59:44 richard Exp $
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23 Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28 status = module.Class(db, "status", name=String())
29 status.setkey("name")
30 user = module.Class(db, "user", username=String(), password=Password(),
31 assignable=Boolean(), age=Number(), roles=String())
32 user.setkey("username")
33 file = module.FileClass(db, "file", name=String(), type=String(),
34 comment=String(indexme="yes"), fooz=Password())
35 issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36 status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37 foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38 session = module.Class(db, 'session', title=String())
39 session.disableJournalling()
40 db.post_init()
41 if create:
42 user.create(username="admin", roles='Admin')
43 status.create(name="unread")
44 status.create(name="in-progress")
45 status.create(name="testing")
46 status.create(name="resolved")
47 db.commit()
49 class MyTestCase(unittest.TestCase):
50 def tearDown(self):
51 self.db.close()
52 if hasattr(self, 'db2'):
53 self.db2.close()
54 if os.path.exists('_test_dir'):
55 shutil.rmtree('_test_dir')
57 class config:
58 DATABASE='_test_dir'
59 MAILHOST = 'localhost'
60 MAIL_DOMAIN = 'fill.me.in.'
61 TRACKER_NAME = 'Roundup issue tracker'
62 TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
63 TRACKER_WEB = 'http://some.useful.url/'
64 ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
65 FILTER_POSITION = 'bottom' # one of 'top', 'bottom', 'top and bottom'
66 ANONYMOUS_ACCESS = 'deny' # either 'deny' or 'allow'
67 ANONYMOUS_REGISTER = 'deny' # either 'deny' or 'allow'
68 MESSAGES_TO_AUTHOR = 'no' # either 'yes' or 'no'
69 EMAIL_SIGNATURE_POSITION = 'bottom'
71 class anydbmDBTestCase(MyTestCase):
72 def setUp(self):
73 from roundup.backends import anydbm
74 # remove previous test, ignore errors
75 if os.path.exists(config.DATABASE):
76 shutil.rmtree(config.DATABASE)
77 os.makedirs(config.DATABASE + '/files')
78 self.db = anydbm.Database(config, 'admin')
79 setupSchema(self.db, 1, anydbm)
80 self.db2 = anydbm.Database(config, 'admin')
81 setupSchema(self.db2, 0, anydbm)
83 def testStringChange(self):
84 # test set & retrieve
85 self.db.issue.create(title="spam", status='1')
86 self.assertEqual(self.db.issue.get('1', 'title'), 'spam')
88 # change and make sure we retrieve the correct value
89 self.db.issue.set('1', title='eggs')
90 self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
92 # do some commit stuff
93 self.db.commit()
94 self.assertEqual(self.db.issue.get('1', 'title'), 'eggs')
95 self.db.issue.create(title="spam", status='1')
96 self.db.commit()
97 self.assertEqual(self.db.issue.get('2', 'title'), 'spam')
98 self.db.issue.set('2', title='ham')
99 self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
100 self.db.commit()
101 self.assertEqual(self.db.issue.get('2', 'title'), 'ham')
103 # make sure we can unset
104 self.db.issue.set('1', title=None)
105 self.assertEqual(self.db.issue.get('1', "title"), None)
107 def testLinkChange(self):
108 self.db.issue.create(title="spam", status='1')
109 self.assertEqual(self.db.issue.get('1', "status"), '1')
110 self.db.issue.set('1', status='2')
111 self.assertEqual(self.db.issue.get('1', "status"), '2')
112 self.db.issue.set('1', status=None)
113 self.assertEqual(self.db.issue.get('1', "status"), None)
115 def testMultilinkChange(self):
116 u1 = self.db.user.create(username='foo')
117 u2 = self.db.user.create(username='bar')
118 self.db.issue.create(title="spam", nosy=[u1])
119 self.assertEqual(self.db.issue.get('1', "nosy"), [u1])
120 self.db.issue.set('1', nosy=[])
121 self.assertEqual(self.db.issue.get('1', "nosy"), [])
122 self.db.issue.set('1', nosy=[u1,u2])
123 self.assertEqual(self.db.issue.get('1', "nosy"), [u1,u2])
125 def testDateChange(self):
126 self.db.issue.create(title="spam", status='1')
127 a = self.db.issue.get('1', "deadline")
128 self.db.issue.set('1', deadline=date.Date())
129 b = self.db.issue.get('1', "deadline")
130 self.db.commit()
131 self.assertNotEqual(a, b)
132 self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
133 self.db.issue.set('1', deadline=date.Date())
134 self.db.issue.set('1', deadline=None)
135 self.assertEqual(self.db.issue.get('1', "deadline"), None)
137 def testIntervalChange(self):
138 self.db.issue.create(title="spam", status='1')
139 a = self.db.issue.get('1', "foo")
140 self.db.issue.set('1', foo=date.Interval('-1d'))
141 self.assertNotEqual(self.db.issue.get('1', "foo"), a)
142 self.db.issue.set('1', foo=None)
143 self.assertEqual(self.db.issue.get('1', "foo"), None)
145 def testBooleanChange(self):
146 userid = self.db.user.create(username='foo', assignable=1)
147 self.assertEqual(1, self.db.user.get(userid, 'assignable'))
148 self.db.user.set(userid, assignable=0)
149 self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
150 self.db.user.set(userid, assignable=None)
151 self.assertEqual(self.db.user.get('1', "assignable"), None)
153 def testNumberChange(self):
154 nid = self.db.user.create(username='foo', age=1)
155 self.assertEqual(1, self.db.user.get(nid, 'age'))
156 self.db.user.set('1', age=3)
157 self.assertNotEqual(self.db.user.get('1', 'age'), 1)
158 self.db.user.set('1', age=1.0)
159 self.db.user.set('1', age=None)
160 self.assertEqual(self.db.user.get('1', "age"), None)
162 def testKeyValue(self):
163 newid = self.db.user.create(username="spam")
164 self.assertEqual(self.db.user.lookup('spam'), newid)
165 self.db.commit()
166 self.assertEqual(self.db.user.lookup('spam'), newid)
167 self.db.user.retire(newid)
168 self.assertRaises(KeyError, self.db.user.lookup, 'spam')
170 def testNewProperty(self):
171 self.db.issue.create(title="spam", status='1')
172 self.db.issue.addprop(fixer=Link("user"))
173 # force any post-init stuff to happen
174 self.db.post_init()
175 props = self.db.issue.getprops()
176 keys = props.keys()
177 keys.sort()
178 self.assertEqual(keys, ['activity', 'assignedto', 'creation',
179 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
180 'nosy', 'status', 'superseder', 'title'])
181 self.assertEqual(self.db.issue.get('1', "fixer"), None)
183 def testRetire(self):
184 self.db.issue.create(title="spam", status='1')
185 b = self.db.status.get('1', 'name')
186 a = self.db.status.list()
187 self.db.status.retire('1')
188 # make sure the list is different
189 self.assertNotEqual(a, self.db.status.list())
190 # can still access the node if necessary
191 self.assertEqual(self.db.status.get('1', 'name'), b)
192 self.db.commit()
193 self.assertEqual(self.db.status.get('1', 'name'), b)
194 self.assertNotEqual(a, self.db.status.list())
196 def testSerialisation(self):
197 self.db.issue.create(title="spam", status='1',
198 deadline=date.Date(), foo=date.Interval('-1d'))
199 self.db.commit()
200 assert isinstance(self.db.issue.get('1', 'deadline'), date.Date)
201 assert isinstance(self.db.issue.get('1', 'foo'), date.Interval)
202 self.db.user.create(username="fozzy",
203 password=password.Password('t. bear'))
204 self.db.commit()
205 assert isinstance(self.db.user.get('1', 'password'), password.Password)
207 def testTransactions(self):
208 # remember the number of items we started
209 num_issues = len(self.db.issue.list())
210 num_files = self.db.numfiles()
211 self.db.issue.create(title="don't commit me!", status='1')
212 self.assertNotEqual(num_issues, len(self.db.issue.list()))
213 self.db.rollback()
214 self.assertEqual(num_issues, len(self.db.issue.list()))
215 self.db.issue.create(title="please commit me!", status='1')
216 self.assertNotEqual(num_issues, len(self.db.issue.list()))
217 self.db.commit()
218 self.assertNotEqual(num_issues, len(self.db.issue.list()))
219 self.db.rollback()
220 self.assertNotEqual(num_issues, len(self.db.issue.list()))
221 self.db.file.create(name="test", type="text/plain", content="hi")
222 self.db.rollback()
223 self.assertEqual(num_files, self.db.numfiles())
224 for i in range(10):
225 self.db.file.create(name="test", type="text/plain",
226 content="hi %d"%(i))
227 self.db.commit()
228 num_files2 = self.db.numfiles()
229 self.assertNotEqual(num_files, num_files2)
230 self.db.file.create(name="test", type="text/plain", content="hi")
231 self.db.rollback()
232 self.assertNotEqual(num_files, self.db.numfiles())
233 self.assertEqual(num_files2, self.db.numfiles())
235 def testDestroyNoJournalling(self):
236 self.innerTestDestroy(klass=self.db.session)
238 def testDestroyJournalling(self):
239 self.innerTestDestroy(klass=self.db.issue)
241 def innerTestDestroy(self, klass):
242 newid = klass.create(title='Mr Friendly')
243 n = len(klass.list())
244 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
245 klass.destroy(newid)
246 self.assertRaises(IndexError, klass.get, newid, 'title')
247 self.assertNotEqual(len(klass.list()), n)
248 if klass.do_journal:
249 self.assertRaises(IndexError, klass.history, newid)
251 # now with a commit
252 newid = klass.create(title='Mr Friendly')
253 n = len(klass.list())
254 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
255 self.db.commit()
256 klass.destroy(newid)
257 self.assertRaises(IndexError, klass.get, newid, 'title')
258 self.db.commit()
259 self.assertRaises(IndexError, klass.get, newid, 'title')
260 self.assertNotEqual(len(klass.list()), n)
261 if klass.do_journal:
262 self.assertRaises(IndexError, klass.history, newid)
264 # now with a rollback
265 newid = klass.create(title='Mr Friendly')
266 n = len(klass.list())
267 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
268 self.db.commit()
269 klass.destroy(newid)
270 self.assertNotEqual(len(klass.list()), n)
271 self.assertRaises(IndexError, klass.get, newid, 'title')
272 self.db.rollback()
273 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
274 self.assertEqual(len(klass.list()), n)
275 if klass.do_journal:
276 self.assertNotEqual(klass.history(newid), [])
278 def testExceptions(self):
279 # this tests the exceptions that should be raised
280 ar = self.assertRaises
282 #
283 # class create
284 #
285 # string property
286 ar(TypeError, self.db.status.create, name=1)
287 # invalid property name
288 ar(KeyError, self.db.status.create, foo='foo')
289 # key name clash
290 ar(ValueError, self.db.status.create, name='unread')
291 # invalid link index
292 ar(IndexError, self.db.issue.create, title='foo', status='bar')
293 # invalid link value
294 ar(ValueError, self.db.issue.create, title='foo', status=1)
295 # invalid multilink type
296 ar(TypeError, self.db.issue.create, title='foo', status='1',
297 nosy='hello')
298 # invalid multilink index type
299 ar(ValueError, self.db.issue.create, title='foo', status='1',
300 nosy=[1])
301 # invalid multilink index
302 ar(IndexError, self.db.issue.create, title='foo', status='1',
303 nosy=['10'])
305 #
306 # key property
307 #
308 # key must be a String
309 ar(TypeError, self.db.file.setkey, 'fooz')
310 # key must exist
311 ar(KeyError, self.db.file.setkey, 'fubar')
313 #
314 # class get
315 #
316 # invalid node id
317 ar(IndexError, self.db.issue.get, '99', 'title')
318 # invalid property name
319 ar(KeyError, self.db.status.get, '2', 'foo')
321 #
322 # class set
323 #
324 # invalid node id
325 ar(IndexError, self.db.issue.set, '99', title='foo')
326 # invalid property name
327 ar(KeyError, self.db.status.set, '1', foo='foo')
328 # string property
329 ar(TypeError, self.db.status.set, '1', name=1)
330 # key name clash
331 ar(ValueError, self.db.status.set, '2', name='unread')
332 # set up a valid issue for me to work on
333 id = self.db.issue.create(title="spam", status='1')
334 # invalid link index
335 ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
336 # invalid link value
337 ar(ValueError, self.db.issue.set, id, title='foo', status=1)
338 # invalid multilink type
339 ar(TypeError, self.db.issue.set, id, title='foo', status='1',
340 nosy='hello')
341 # invalid multilink index type
342 ar(ValueError, self.db.issue.set, id, title='foo', status='1',
343 nosy=[1])
344 # invalid multilink index
345 ar(IndexError, self.db.issue.set, id, title='foo', status='1',
346 nosy=['10'])
347 # invalid number value
348 ar(TypeError, self.db.user.create, username='foo', age='a')
349 # invalid boolean value
350 ar(TypeError, self.db.user.create, username='foo', assignable='true')
351 nid = self.db.user.create(username='foo')
352 # invalid number value
353 ar(TypeError, self.db.user.set, nid, username='foo', age='a')
354 # invalid boolean value
355 ar(TypeError, self.db.user.set, nid, username='foo', assignable='true')
357 def testJournals(self):
358 self.db.user.create(username="mary")
359 self.db.user.create(username="pete")
360 self.db.issue.create(title="spam", status='1')
361 self.db.commit()
363 # journal entry for issue create
364 journal = self.db.getjournal('issue', '1')
365 self.assertEqual(1, len(journal))
366 (nodeid, date_stamp, journaltag, action, params) = journal[0]
367 self.assertEqual(nodeid, '1')
368 self.assertEqual(journaltag, self.db.user.lookup('admin'))
369 self.assertEqual(action, 'create')
370 keys = params.keys()
371 keys.sort()
372 self.assertEqual(keys, ['assignedto', 'deadline', 'files',
373 'foo', 'messages', 'nosy', 'status', 'superseder', 'title'])
374 self.assertEqual(None,params['deadline'])
375 self.assertEqual(None,params['foo'])
376 self.assertEqual([],params['nosy'])
377 self.assertEqual('1',params['status'])
378 self.assertEqual('spam',params['title'])
380 # journal entry for link
381 journal = self.db.getjournal('user', '1')
382 self.assertEqual(1, len(journal))
383 self.db.issue.set('1', assignedto='1')
384 self.db.commit()
385 journal = self.db.getjournal('user', '1')
386 self.assertEqual(2, len(journal))
387 (nodeid, date_stamp, journaltag, action, params) = journal[1]
388 self.assertEqual('1', nodeid)
389 self.assertEqual('1', journaltag)
390 self.assertEqual('link', action)
391 self.assertEqual(('issue', '1', 'assignedto'), params)
393 # journal entry for unlink
394 self.db.issue.set('1', assignedto='2')
395 self.db.commit()
396 journal = self.db.getjournal('user', '1')
397 self.assertEqual(3, len(journal))
398 (nodeid, date_stamp, journaltag, action, params) = journal[2]
399 self.assertEqual('1', nodeid)
400 self.assertEqual('1', journaltag)
401 self.assertEqual('unlink', action)
402 self.assertEqual(('issue', '1', 'assignedto'), params)
404 # test disabling journalling
405 # ... get the last entry
406 time.sleep(1)
407 entry = self.db.getjournal('issue', '1')[-1]
408 (x, date_stamp, x, x, x) = entry
409 self.db.issue.disableJournalling()
410 self.db.issue.set('1', title='hello world')
411 self.db.commit()
412 entry = self.db.getjournal('issue', '1')[-1]
413 (x, date_stamp2, x, x, x) = entry
414 # see if the change was journalled when it shouldn't have been
415 self.assertEqual(date_stamp, date_stamp2)
416 time.sleep(1)
417 self.db.issue.enableJournalling()
418 self.db.issue.set('1', title='hello world 2')
419 self.db.commit()
420 entry = self.db.getjournal('issue', '1')[-1]
421 (x, date_stamp2, x, x, x) = entry
422 # see if the change was journalled
423 self.assertNotEqual(date_stamp, date_stamp2)
425 def testPack(self):
426 id = self.db.issue.create(title="spam", status='1')
427 self.db.commit()
428 self.db.issue.set(id, status='2')
429 self.db.commit()
431 # sleep for at least a second, then get a date to pack at
432 time.sleep(2)
433 pack_before = date.Date('.')
434 time.sleep(2)
436 # one more entry
437 self.db.issue.set(id, status='3')
438 self.db.commit()
440 # pack
441 self.db.pack(pack_before)
442 journal = self.db.getjournal('issue', id)
444 # we should have the create and last set entries now
445 self.assertEqual(2, len(journal))
447 def testIDGeneration(self):
448 id1 = self.db.issue.create(title="spam", status='1')
449 id2 = self.db2.issue.create(title="eggs", status='2')
450 self.assertNotEqual(id1, id2)
452 def testSearching(self):
453 self.db.file.create(content='hello', type="text/plain")
454 self.db.file.create(content='world', type="text/frozz",
455 comment='blah blah')
456 self.db.issue.create(files=['1', '2'], title="flebble plop")
457 self.db.issue.create(title="flebble frooz")
458 self.db.commit()
459 self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
460 {'1': {'files': ['1']}})
461 self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
462 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
463 {'2': {}})
464 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
465 {'2': {}, '1': {}})
467 def testReindexing(self):
468 self.db.issue.create(title="frooz")
469 self.db.commit()
470 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
471 {'1': {}})
472 self.db.issue.set('1', title="dooble")
473 self.db.commit()
474 self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
475 {'1': {}})
476 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
478 def testForcedReindexing(self):
479 self.db.issue.create(title="flebble frooz")
480 self.db.commit()
481 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
482 {'1': {}})
483 self.db.indexer.quiet = 1
484 self.db.indexer.force_reindex()
485 self.db.post_init()
486 self.db.indexer.quiet = 9
487 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
488 {'1': {}})
490 def testStringFind(self):
491 ids = []
492 ids.append(self.db.issue.create(title="spam"))
493 self.db.issue.create(title="not spam")
494 ids.append(self.db.issue.create(title="spam"))
495 ids.sort()
496 got = self.db.issue.stringFind(title='spam')
497 got.sort()
498 self.assertEqual(got, ids)
499 self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
501 def filteringSetup(self):
502 for user in (
503 {'username': 'bleep'},
504 {'username': 'blop'},
505 {'username': 'blorp'}):
506 self.db.user.create(**user)
507 iss = self.db.issue
508 for issue in (
509 {'title': 'issue one', 'status': '2'},
510 {'title': 'issue two', 'status': '1'},
511 {'title': 'issue three', 'status': '1', 'nosy': ['1','2']}):
512 self.db.issue.create(**issue)
513 self.db.commit()
514 return self.assertEqual, self.db.issue.filter
516 def testFilteringString(self):
517 ae, filt = self.filteringSetup()
518 ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
519 ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
520 ['1','2','3'])
522 def testFilteringLink(self):
523 ae, filt = self.filteringSetup()
524 ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
526 def testFilteringMultilink(self):
527 ae, filt = self.filteringSetup()
528 ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
530 def testFilteringMany(self):
531 ae, filt = self.filteringSetup()
532 ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
533 ['3'])
535 class anydbmReadOnlyDBTestCase(MyTestCase):
536 def setUp(self):
537 from roundup.backends import anydbm
538 # remove previous test, ignore errors
539 if os.path.exists(config.DATABASE):
540 shutil.rmtree(config.DATABASE)
541 os.makedirs(config.DATABASE + '/files')
542 db = anydbm.Database(config, 'admin')
543 setupSchema(db, 1, anydbm)
544 self.db = anydbm.Database(config)
545 setupSchema(self.db, 0, anydbm)
546 self.db2 = anydbm.Database(config, 'admin')
547 setupSchema(self.db2, 0, anydbm)
549 def testExceptions(self):
550 # this tests the exceptions that should be raised
551 ar = self.assertRaises
553 # this tests the exceptions that should be raised
554 ar(DatabaseError, self.db.status.create, name="foo")
555 ar(DatabaseError, self.db.status.set, '1', name="foo")
556 ar(DatabaseError, self.db.status.retire, '1')
559 class bsddbDBTestCase(anydbmDBTestCase):
560 def setUp(self):
561 from roundup.backends import bsddb
562 # remove previous test, ignore errors
563 if os.path.exists(config.DATABASE):
564 shutil.rmtree(config.DATABASE)
565 os.makedirs(config.DATABASE + '/files')
566 self.db = bsddb.Database(config, 'admin')
567 setupSchema(self.db, 1, bsddb)
568 self.db2 = bsddb.Database(config, 'admin')
569 setupSchema(self.db2, 0, bsddb)
571 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
572 def setUp(self):
573 from roundup.backends import bsddb
574 # remove previous test, ignore errors
575 if os.path.exists(config.DATABASE):
576 shutil.rmtree(config.DATABASE)
577 os.makedirs(config.DATABASE + '/files')
578 db = bsddb.Database(config, 'admin')
579 setupSchema(db, 1, bsddb)
580 self.db = bsddb.Database(config)
581 setupSchema(self.db, 0, bsddb)
582 self.db2 = bsddb.Database(config, 'admin')
583 setupSchema(self.db2, 0, bsddb)
586 class bsddb3DBTestCase(anydbmDBTestCase):
587 def setUp(self):
588 from roundup.backends import bsddb3
589 # remove previous test, ignore errors
590 if os.path.exists(config.DATABASE):
591 shutil.rmtree(config.DATABASE)
592 os.makedirs(config.DATABASE + '/files')
593 self.db = bsddb3.Database(config, 'admin')
594 setupSchema(self.db, 1, bsddb3)
595 self.db2 = bsddb3.Database(config, 'admin')
596 setupSchema(self.db2, 0, bsddb3)
598 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
599 def setUp(self):
600 from roundup.backends import bsddb3
601 # remove previous test, ignore errors
602 if os.path.exists(config.DATABASE):
603 shutil.rmtree(config.DATABASE)
604 os.makedirs(config.DATABASE + '/files')
605 db = bsddb3.Database(config, 'admin')
606 setupSchema(db, 1, bsddb3)
607 self.db = bsddb3.Database(config)
608 setupSchema(self.db, 0, bsddb3)
609 self.db2 = bsddb3.Database(config, 'admin')
610 setupSchema(self.db2, 0, bsddb3)
613 class gadflyDBTestCase(anydbmDBTestCase):
614 ''' Gadfly doesn't support multiple connections to the one local
615 database
616 '''
617 def setUp(self):
618 from roundup.backends import gadfly
619 # remove previous test, ignore errors
620 if os.path.exists(config.DATABASE):
621 shutil.rmtree(config.DATABASE)
622 config.GADFLY_DATABASE = ('test', config.DATABASE)
623 os.makedirs(config.DATABASE + '/files')
624 self.db = gadfly.Database(config, 'admin')
625 setupSchema(self.db, 1, gadfly)
627 def testIDGeneration(self):
628 id1 = self.db.issue.create(title="spam", status='1')
629 id2 = self.db.issue.create(title="eggs", status='2')
630 self.assertNotEqual(id1, id2)
632 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
633 def setUp(self):
634 from roundup.backends import gadfly
635 # remove previous test, ignore errors
636 if os.path.exists(config.DATABASE):
637 shutil.rmtree(config.DATABASE)
638 config.GADFLY_DATABASE = ('test', config.DATABASE)
639 os.makedirs(config.DATABASE + '/files')
640 db = gadfly.Database(config, 'admin')
641 setupSchema(db, 1, gadfly)
642 self.db = gadfly.Database(config)
643 setupSchema(self.db, 0, gadfly)
646 class sqliteDBTestCase(anydbmDBTestCase):
647 def setUp(self):
648 from roundup.backends import sqlite
649 # remove previous test, ignore errors
650 if os.path.exists(config.DATABASE):
651 shutil.rmtree(config.DATABASE)
652 os.makedirs(config.DATABASE + '/files')
653 self.db = sqlite.Database(config, 'admin')
654 setupSchema(self.db, 1, sqlite)
656 def testIDGeneration(self):
657 pass
659 class sqliteReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
660 def setUp(self):
661 from roundup.backends import sqlite
662 # remove previous test, ignore errors
663 if os.path.exists(config.DATABASE):
664 shutil.rmtree(config.DATABASE)
665 os.makedirs(config.DATABASE + '/files')
666 db = sqlite.Database(config, 'admin')
667 setupSchema(db, 1, sqlite)
668 self.db = sqlite.Database(config)
669 setupSchema(self.db, 0, sqlite)
672 class metakitDBTestCase(anydbmDBTestCase):
673 def setUp(self):
674 from roundup.backends import metakit
675 import weakref
676 metakit._instances = weakref.WeakValueDictionary()
677 # remove previous test, ignore errors
678 if os.path.exists(config.DATABASE):
679 shutil.rmtree(config.DATABASE)
680 os.makedirs(config.DATABASE + '/files')
681 self.db = metakit.Database(config, 'admin')
682 setupSchema(self.db, 1, metakit)
684 def testIDGeneration(self):
685 id1 = self.db.issue.create(title="spam", status='1')
686 id2 = self.db.issue.create(title="eggs", status='2')
687 self.assertNotEqual(id1, id2)
689 def testTransactions(self):
690 # remember the number of items we started
691 num_issues = len(self.db.issue.list())
692 self.db.issue.create(title="don't commit me!", status='1')
693 self.assertNotEqual(num_issues, len(self.db.issue.list()))
694 self.db.rollback()
695 self.assertEqual(num_issues, len(self.db.issue.list()))
696 self.db.issue.create(title="please commit me!", status='1')
697 self.assertNotEqual(num_issues, len(self.db.issue.list()))
698 self.db.commit()
699 self.assertNotEqual(num_issues, len(self.db.issue.list()))
700 self.db.rollback()
701 self.assertNotEqual(num_issues, len(self.db.issue.list()))
702 self.db.file.create(name="test", type="text/plain", content="hi")
703 self.db.rollback()
704 for i in range(10):
705 self.db.file.create(name="test", type="text/plain",
706 content="hi %d"%(i))
707 self.db.commit()
708 # TODO: would be good to be able to ensure the file is not on disk after
709 # a rollback...
710 self.assertNotEqual(num_files, num_files2)
711 self.db.file.create(name="test", type="text/plain", content="hi")
712 self.db.rollback()
714 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
715 def setUp(self):
716 from roundup.backends import metakit
717 import weakref
718 metakit._instances = weakref.WeakValueDictionary()
719 # remove previous test, ignore errors
720 if os.path.exists(config.DATABASE):
721 shutil.rmtree(config.DATABASE)
722 os.makedirs(config.DATABASE + '/files')
723 db = metakit.Database(config, 'admin')
724 setupSchema(db, 1, metakit)
725 self.db = metakit.Database(config)
726 setupSchema(self.db, 0, metakit)
728 def suite():
729 l = [
730 unittest.makeSuite(anydbmDBTestCase, 'test'),
731 unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
732 ]
733 # return unittest.TestSuite(l)
735 from roundup import backends
736 if hasattr(backends, 'gadfly'):
737 l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
738 l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
740 if hasattr(backends, 'sqlite'):
741 l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
742 l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
744 if hasattr(backends, 'bsddb'):
745 l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
746 l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
748 if hasattr(backends, 'bsddb3'):
749 l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
750 l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
752 if hasattr(backends, 'metakit'):
753 l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
754 l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
756 return unittest.TestSuite(l)
758 # vim: set filetype=python ts=4 sw=4 et si