1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17 #
18 # $Id: test_db.py,v 1.71 2003-02-15 23:19:01 kedder Exp $
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23 Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28 status = module.Class(db, "status", name=String())
29 status.setkey("name")
30 user = module.Class(db, "user", username=String(), password=Password(),
31 assignable=Boolean(), age=Number(), roles=String())
32 user.setkey("username")
33 file = module.FileClass(db, "file", name=String(), type=String(),
34 comment=String(indexme="yes"), fooz=Password())
35 issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36 status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37 foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38 session = module.Class(db, 'session', title=String())
39 session.disableJournalling()
40 db.post_init()
41 if create:
42 user.create(username="admin", roles='Admin')
43 status.create(name="unread")
44 status.create(name="in-progress")
45 status.create(name="testing")
46 status.create(name="resolved")
47 db.commit()
49 class MyTestCase(unittest.TestCase):
50 def tearDown(self):
51 self.db.close()
52 if os.path.exists('_test_dir'):
53 shutil.rmtree('_test_dir')
55 class config:
56 DATABASE='_test_dir'
57 MAILHOST = 'localhost'
58 MAIL_DOMAIN = 'fill.me.in.'
59 TRACKER_NAME = 'Roundup issue tracker'
60 TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
61 TRACKER_WEB = 'http://some.useful.url/'
62 ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
63 FILTER_POSITION = 'bottom' # one of 'top', 'bottom', 'top and bottom'
64 ANONYMOUS_ACCESS = 'deny' # either 'deny' or 'allow'
65 ANONYMOUS_REGISTER = 'deny' # either 'deny' or 'allow'
66 MESSAGES_TO_AUTHOR = 'no' # either 'yes' or 'no'
67 EMAIL_SIGNATURE_POSITION = 'bottom'
68 # Mysql connection data
69 MYSQL_DBHOST = 'localhost'
70 MYSQL_DBUSER = 'rounduptest'
71 MYSQL_DBPASSWORD = 'rounduptest'
72 MYSQL_DBNAME = 'rounduptest'
73 MYSQL_DATABASE = (MYSQL_DBHOST, MYSQL_DBUSER, MYSQL_DBPASSWORD, MYSQL_DBNAME)
75 class nodbconfig(config):
76 MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER, config.MYSQL_DBPASSWORD)
78 class anydbmDBTestCase(MyTestCase):
79 def setUp(self):
80 from roundup.backends import anydbm
81 # remove previous test, ignore errors
82 if os.path.exists(config.DATABASE):
83 shutil.rmtree(config.DATABASE)
84 os.makedirs(config.DATABASE + '/files')
85 self.db = anydbm.Database(config, 'admin')
86 setupSchema(self.db, 1, anydbm)
88 def testIDGeneration(self):
89 id1 = self.db.issue.create(title="spam", status='1')
90 id2 = self.db.issue.create(title="eggs", status='2')
91 self.assertNotEqual(id1, id2)
93 def testStringChange(self):
94 for commit in (0,1):
95 # test set & retrieve
96 nid = self.db.issue.create(title="spam", status='1')
97 self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
99 # change and make sure we retrieve the correct value
100 self.db.issue.set(nid, title='eggs')
101 if commit: self.db.commit()
102 self.assertEqual(self.db.issue.get(nid, 'title'), 'eggs')
104 def testStringUnset(self):
105 for commit in (0,1):
106 nid = self.db.issue.create(title="spam", status='1')
107 if commit: self.db.commit()
108 self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
109 # make sure we can unset
110 self.db.issue.set(nid, title=None)
111 if commit: self.db.commit()
112 self.assertEqual(self.db.issue.get(nid, "title"), None)
114 def testLinkChange(self):
115 for commit in (0,1):
116 nid = self.db.issue.create(title="spam", status='1')
117 if commit: self.db.commit()
118 self.assertEqual(self.db.issue.get(nid, "status"), '1')
119 self.db.issue.set(nid, status='2')
120 if commit: self.db.commit()
121 self.assertEqual(self.db.issue.get(nid, "status"), '2')
123 def testLinkUnset(self):
124 for commit in (0,1):
125 nid = self.db.issue.create(title="spam", status='1')
126 if commit: self.db.commit()
127 self.db.issue.set(nid, status=None)
128 if commit: self.db.commit()
129 self.assertEqual(self.db.issue.get(nid, "status"), None)
131 def testMultilinkChange(self):
132 for commit in (0,1):
133 u1 = self.db.user.create(username='foo%s'%commit)
134 u2 = self.db.user.create(username='bar%s'%commit)
135 nid = self.db.issue.create(title="spam", nosy=[u1])
136 if commit: self.db.commit()
137 self.assertEqual(self.db.issue.get(nid, "nosy"), [u1])
138 self.db.issue.set(nid, nosy=[])
139 if commit: self.db.commit()
140 self.assertEqual(self.db.issue.get(nid, "nosy"), [])
141 self.db.issue.set(nid, nosy=[u1,u2])
142 if commit: self.db.commit()
143 self.assertEqual(self.db.issue.get(nid, "nosy"), [u1,u2])
145 def testDateChange(self):
146 for commit in (0,1):
147 nid = self.db.issue.create(title="spam", status='1')
148 a = self.db.issue.get(nid, "deadline")
149 if commit: self.db.commit()
150 self.db.issue.set(nid, deadline=date.Date())
151 b = self.db.issue.get(nid, "deadline")
152 if commit: self.db.commit()
153 self.assertNotEqual(a, b)
154 self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
156 def testDateUnset(self):
157 for commit in (0,1):
158 nid = self.db.issue.create(title="spam", status='1')
159 self.db.issue.set(nid, deadline=date.Date())
160 if commit: self.db.commit()
161 self.assertNotEqual(self.db.issue.get(nid, "deadline"), None)
162 self.db.issue.set(nid, deadline=None)
163 if commit: self.db.commit()
164 self.assertEqual(self.db.issue.get(nid, "deadline"), None)
166 def testIntervalChange(self):
167 for commit in (0,1):
168 nid = self.db.issue.create(title="spam", status='1')
169 if commit: self.db.commit()
170 a = self.db.issue.get(nid, "foo")
171 i = date.Interval('-1d')
172 self.db.issue.set(nid, foo=i)
173 if commit: self.db.commit()
174 self.assertNotEqual(self.db.issue.get(nid, "foo"), a)
175 self.assertEqual(i, self.db.issue.get(nid, "foo"))
176 j = date.Interval('1y')
177 self.db.issue.set(nid, foo=j)
178 if commit: self.db.commit()
179 self.assertNotEqual(self.db.issue.get(nid, "foo"), i)
180 self.assertEqual(j, self.db.issue.get(nid, "foo"))
182 def testIntervalUnset(self):
183 for commit in (0,1):
184 nid = self.db.issue.create(title="spam", status='1')
185 self.db.issue.set(nid, foo=date.Interval('-1d'))
186 if commit: self.db.commit()
187 self.assertNotEqual(self.db.issue.get(nid, "foo"), None)
188 self.db.issue.set(nid, foo=None)
189 if commit: self.db.commit()
190 self.assertEqual(self.db.issue.get(nid, "foo"), None)
192 def testBooleanChange(self):
193 userid = self.db.user.create(username='foo', assignable=1)
194 self.assertEqual(1, self.db.user.get(userid, 'assignable'))
195 self.db.user.set(userid, assignable=0)
196 self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
198 def testBooleanUnset(self):
199 nid = self.db.user.create(username='foo', assignable=1)
200 self.db.user.set(nid, assignable=None)
201 self.assertEqual(self.db.user.get(nid, "assignable"), None)
203 def testNumberChange(self):
204 nid = self.db.user.create(username='foo', age=1)
205 self.assertEqual(1, self.db.user.get(nid, 'age'))
206 self.db.user.set(nid, age=3)
207 self.assertNotEqual(self.db.user.get(nid, 'age'), 1)
208 self.db.user.set(nid, age=1.0)
210 def testNumberUnset(self):
211 nid = self.db.user.create(username='foo', age=1)
212 self.db.user.set(nid, age=None)
213 self.assertEqual(self.db.user.get(nid, "age"), None)
215 def testKeyValue(self):
216 newid = self.db.user.create(username="spam")
217 self.assertEqual(self.db.user.lookup('spam'), newid)
218 self.db.commit()
219 self.assertEqual(self.db.user.lookup('spam'), newid)
220 self.db.user.retire(newid)
221 self.assertRaises(KeyError, self.db.user.lookup, 'spam')
223 # use the key again now that the old is retired
224 newid2 = self.db.user.create(username="spam")
225 self.assertNotEqual(newid, newid2)
227 def testNewProperty(self):
228 self.db.issue.create(title="spam", status='1')
229 self.db.issue.addprop(fixer=Link("user"))
230 # force any post-init stuff to happen
231 self.db.post_init()
232 props = self.db.issue.getprops()
233 keys = props.keys()
234 keys.sort()
235 self.assertEqual(keys, ['activity', 'assignedto', 'creation',
236 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
237 'nosy', 'status', 'superseder', 'title'])
238 self.assertEqual(self.db.issue.get('1', "fixer"), None)
240 def testRetire(self):
241 self.db.issue.create(title="spam", status='1')
242 b = self.db.status.get('1', 'name')
243 a = self.db.status.list()
244 self.db.status.retire('1')
245 # make sure the list is different
246 self.assertNotEqual(a, self.db.status.list())
247 # can still access the node if necessary
248 self.assertEqual(self.db.status.get('1', 'name'), b)
249 self.db.commit()
250 self.assertEqual(self.db.status.get('1', 'name'), b)
251 self.assertNotEqual(a, self.db.status.list())
253 def testSerialisation(self):
254 nid = self.db.issue.create(title="spam", status='1',
255 deadline=date.Date(), foo=date.Interval('-1d'))
256 self.db.commit()
257 assert isinstance(self.db.issue.get(nid, 'deadline'), date.Date)
258 assert isinstance(self.db.issue.get(nid, 'foo'), date.Interval)
259 uid = self.db.user.create(username="fozzy",
260 password=password.Password('t. bear'))
261 self.db.commit()
262 assert isinstance(self.db.user.get(uid, 'password'), password.Password)
264 def testTransactions(self):
265 # remember the number of items we started
266 num_issues = len(self.db.issue.list())
267 num_files = self.db.numfiles()
268 self.db.issue.create(title="don't commit me!", status='1')
269 self.assertNotEqual(num_issues, len(self.db.issue.list()))
270 self.db.rollback()
271 self.assertEqual(num_issues, len(self.db.issue.list()))
272 self.db.issue.create(title="please commit me!", status='1')
273 self.assertNotEqual(num_issues, len(self.db.issue.list()))
274 self.db.commit()
275 self.assertNotEqual(num_issues, len(self.db.issue.list()))
276 self.db.rollback()
277 self.assertNotEqual(num_issues, len(self.db.issue.list()))
278 self.db.file.create(name="test", type="text/plain", content="hi")
279 self.db.rollback()
280 self.assertEqual(num_files, self.db.numfiles())
281 for i in range(10):
282 self.db.file.create(name="test", type="text/plain",
283 content="hi %d"%(i))
284 self.db.commit()
285 num_files2 = self.db.numfiles()
286 self.assertNotEqual(num_files, num_files2)
287 self.db.file.create(name="test", type="text/plain", content="hi")
288 self.db.rollback()
289 self.assertNotEqual(num_files, self.db.numfiles())
290 self.assertEqual(num_files2, self.db.numfiles())
292 def testDestroyNoJournalling(self):
293 self.innerTestDestroy(klass=self.db.session)
295 def testDestroyJournalling(self):
296 self.innerTestDestroy(klass=self.db.issue)
298 def innerTestDestroy(self, klass):
299 newid = klass.create(title='Mr Friendly')
300 n = len(klass.list())
301 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
302 klass.destroy(newid)
303 self.assertRaises(IndexError, klass.get, newid, 'title')
304 self.assertNotEqual(len(klass.list()), n)
305 if klass.do_journal:
306 self.assertRaises(IndexError, klass.history, newid)
308 # now with a commit
309 newid = klass.create(title='Mr Friendly')
310 n = len(klass.list())
311 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
312 self.db.commit()
313 klass.destroy(newid)
314 self.assertRaises(IndexError, klass.get, newid, 'title')
315 self.db.commit()
316 self.assertRaises(IndexError, klass.get, newid, 'title')
317 self.assertNotEqual(len(klass.list()), n)
318 if klass.do_journal:
319 self.assertRaises(IndexError, klass.history, newid)
321 # now with a rollback
322 newid = klass.create(title='Mr Friendly')
323 n = len(klass.list())
324 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
325 self.db.commit()
326 klass.destroy(newid)
327 self.assertNotEqual(len(klass.list()), n)
328 self.assertRaises(IndexError, klass.get, newid, 'title')
329 self.db.rollback()
330 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
331 self.assertEqual(len(klass.list()), n)
332 if klass.do_journal:
333 self.assertNotEqual(klass.history(newid), [])
335 def testExceptions(self):
336 # this tests the exceptions that should be raised
337 ar = self.assertRaises
339 #
340 # class create
341 #
342 # string property
343 ar(TypeError, self.db.status.create, name=1)
344 # invalid property name
345 ar(KeyError, self.db.status.create, foo='foo')
346 # key name clash
347 ar(ValueError, self.db.status.create, name='unread')
348 # invalid link index
349 ar(IndexError, self.db.issue.create, title='foo', status='bar')
350 # invalid link value
351 ar(ValueError, self.db.issue.create, title='foo', status=1)
352 # invalid multilink type
353 ar(TypeError, self.db.issue.create, title='foo', status='1',
354 nosy='hello')
355 # invalid multilink index type
356 ar(ValueError, self.db.issue.create, title='foo', status='1',
357 nosy=[1])
358 # invalid multilink index
359 ar(IndexError, self.db.issue.create, title='foo', status='1',
360 nosy=['10'])
362 #
363 # key property
364 #
365 # key must be a String
366 ar(TypeError, self.db.file.setkey, 'fooz')
367 # key must exist
368 ar(KeyError, self.db.file.setkey, 'fubar')
370 #
371 # class get
372 #
373 # invalid node id
374 ar(IndexError, self.db.issue.get, '99', 'title')
375 # invalid property name
376 ar(KeyError, self.db.status.get, '2', 'foo')
378 #
379 # class set
380 #
381 # invalid node id
382 ar(IndexError, self.db.issue.set, '99', title='foo')
383 # invalid property name
384 ar(KeyError, self.db.status.set, '1', foo='foo')
385 # string property
386 ar(TypeError, self.db.status.set, '1', name=1)
387 # key name clash
388 ar(ValueError, self.db.status.set, '2', name='unread')
389 # set up a valid issue for me to work on
390 id = self.db.issue.create(title="spam", status='1')
391 # invalid link index
392 ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
393 # invalid link value
394 ar(ValueError, self.db.issue.set, id, title='foo', status=1)
395 # invalid multilink type
396 ar(TypeError, self.db.issue.set, id, title='foo', status='1',
397 nosy='hello')
398 # invalid multilink index type
399 ar(ValueError, self.db.issue.set, id, title='foo', status='1',
400 nosy=[1])
401 # invalid multilink index
402 ar(IndexError, self.db.issue.set, id, title='foo', status='1',
403 nosy=['10'])
404 # NOTE: the following increment the username to avoid problems
405 # within metakit's backend (it creates the node, and then sets the
406 # info, so the create (and by a fluke the username set) go through
407 # before the age/assignable/etc. set, which raises the exception)
408 # invalid number value
409 ar(TypeError, self.db.user.create, username='foo', age='a')
410 # invalid boolean value
411 ar(TypeError, self.db.user.create, username='foo2', assignable='true')
412 nid = self.db.user.create(username='foo3')
413 # invalid number value
414 ar(TypeError, self.db.user.set, nid, age='a')
415 # invalid boolean value
416 ar(TypeError, self.db.user.set, nid, assignable='true')
418 def testJournals(self):
419 self.db.user.create(username="mary")
420 self.db.user.create(username="pete")
421 self.db.issue.create(title="spam", status='1')
422 self.db.commit()
424 # journal entry for issue create
425 journal = self.db.getjournal('issue', '1')
426 self.assertEqual(1, len(journal))
427 (nodeid, date_stamp, journaltag, action, params) = journal[0]
428 self.assertEqual(nodeid, '1')
429 self.assertEqual(journaltag, self.db.user.lookup('admin'))
430 self.assertEqual(action, 'create')
431 keys = params.keys()
432 keys.sort()
433 self.assertEqual(keys, [])
435 # journal entry for link
436 journal = self.db.getjournal('user', '1')
437 self.assertEqual(1, len(journal))
438 self.db.issue.set('1', assignedto='1')
439 self.db.commit()
440 journal = self.db.getjournal('user', '1')
441 self.assertEqual(2, len(journal))
442 (nodeid, date_stamp, journaltag, action, params) = journal[1]
443 self.assertEqual('1', nodeid)
444 self.assertEqual('1', journaltag)
445 self.assertEqual('link', action)
446 self.assertEqual(('issue', '1', 'assignedto'), params)
448 # journal entry for unlink
449 self.db.issue.set('1', assignedto='2')
450 self.db.commit()
451 journal = self.db.getjournal('user', '1')
452 self.assertEqual(3, len(journal))
453 (nodeid, date_stamp, journaltag, action, params) = journal[2]
454 self.assertEqual('1', nodeid)
455 self.assertEqual('1', journaltag)
456 self.assertEqual('unlink', action)
457 self.assertEqual(('issue', '1', 'assignedto'), params)
459 # test disabling journalling
460 # ... get the last entry
461 time.sleep(1)
462 entry = self.db.getjournal('issue', '1')[-1]
463 (x, date_stamp, x, x, x) = entry
464 self.db.issue.disableJournalling()
465 self.db.issue.set('1', title='hello world')
466 self.db.commit()
467 entry = self.db.getjournal('issue', '1')[-1]
468 (x, date_stamp2, x, x, x) = entry
469 # see if the change was journalled when it shouldn't have been
470 self.assertEqual(date_stamp, date_stamp2)
471 time.sleep(1)
472 self.db.issue.enableJournalling()
473 self.db.issue.set('1', title='hello world 2')
474 self.db.commit()
475 entry = self.db.getjournal('issue', '1')[-1]
476 (x, date_stamp2, x, x, x) = entry
477 # see if the change was journalled
478 self.assertNotEqual(date_stamp, date_stamp2)
480 def testPack(self):
481 id = self.db.issue.create(title="spam", status='1')
482 self.db.commit()
483 self.db.issue.set(id, status='2')
484 self.db.commit()
486 # sleep for at least a second, then get a date to pack at
487 time.sleep(1)
488 pack_before = date.Date('.')
490 # wait another second and add one more entry
491 time.sleep(1)
492 self.db.issue.set(id, status='3')
493 self.db.commit()
494 jlen = len(self.db.getjournal('issue', id))
496 # pack
497 self.db.pack(pack_before)
499 # we should have the create and last set entries now
500 self.assertEqual(jlen-1, len(self.db.getjournal('issue', id)))
502 def testSearching(self):
503 self.db.file.create(content='hello', type="text/plain")
504 self.db.file.create(content='world', type="text/frozz",
505 comment='blah blah')
506 self.db.issue.create(files=['1', '2'], title="flebble plop")
507 self.db.issue.create(title="flebble frooz")
508 self.db.commit()
509 self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
510 {'1': {'files': ['1']}})
511 self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
512 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
513 {'2': {}})
514 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
515 {'2': {}, '1': {}})
517 def testReindexing(self):
518 self.db.issue.create(title="frooz")
519 self.db.commit()
520 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
521 {'1': {}})
522 self.db.issue.set('1', title="dooble")
523 self.db.commit()
524 self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
525 {'1': {}})
526 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
528 def testForcedReindexing(self):
529 self.db.issue.create(title="flebble frooz")
530 self.db.commit()
531 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
532 {'1': {}})
533 self.db.indexer.quiet = 1
534 self.db.indexer.force_reindex()
535 self.db.post_init()
536 self.db.indexer.quiet = 9
537 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
538 {'1': {}})
540 #
541 # searching tests follow
542 #
543 def testFind(self):
544 self.db.user.create(username='test')
545 ids = []
546 ids.append(self.db.issue.create(status="1", nosy=['1']))
547 oddid = self.db.issue.create(status="2", nosy=['2'])
548 ids.append(self.db.issue.create(status="1", nosy=['1','2']))
549 self.db.issue.create(status="3", nosy=['1'])
550 ids.sort()
552 # should match first and third
553 got = self.db.issue.find(status='1')
554 got.sort()
555 self.assertEqual(got, ids)
557 # none
558 self.assertEqual(self.db.issue.find(status='4'), [])
560 # should match first three
561 got = self.db.issue.find(status='1', nosy='2')
562 got.sort()
563 ids.append(oddid)
564 ids.sort()
565 self.assertEqual(got, ids)
567 # none
568 self.assertEqual(self.db.issue.find(status='4', nosy='3'), [])
570 def testStringFind(self):
571 ids = []
572 ids.append(self.db.issue.create(title="spam"))
573 self.db.issue.create(title="not spam")
574 ids.append(self.db.issue.create(title="spam"))
575 ids.sort()
576 got = self.db.issue.stringFind(title='spam')
577 got.sort()
578 self.assertEqual(got, ids)
579 self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
581 def filteringSetup(self):
582 for user in (
583 {'username': 'bleep'},
584 {'username': 'blop'},
585 {'username': 'blorp'}):
586 self.db.user.create(**user)
587 iss = self.db.issue
588 for issue in (
589 {'title': 'issue one', 'status': '2'},
590 {'title': 'issue two', 'status': '1'},
591 {'title': 'issue three', 'status': '1', 'nosy': ['1','2']}):
592 self.db.issue.create(**issue)
593 self.db.commit()
594 return self.assertEqual, self.db.issue.filter
596 def testFilteringID(self):
597 ae, filt = self.filteringSetup()
598 ae(filt(None, {'id': '1'}, ('+','id'), (None,None)), ['1'])
600 def testFilteringString(self):
601 ae, filt = self.filteringSetup()
602 ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
603 ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
604 ['1','2','3'])
606 def testFilteringLink(self):
607 ae, filt = self.filteringSetup()
608 ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
610 def testFilteringMultilink(self):
611 ae, filt = self.filteringSetup()
612 ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
614 def testFilteringMany(self):
615 ae, filt = self.filteringSetup()
616 ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
617 ['3'])
619 # TODO test auditors and reactors
621 class anydbmReadOnlyDBTestCase(MyTestCase):
622 def setUp(self):
623 from roundup.backends import anydbm
624 # remove previous test, ignore errors
625 if os.path.exists(config.DATABASE):
626 shutil.rmtree(config.DATABASE)
627 os.makedirs(config.DATABASE + '/files')
628 db = anydbm.Database(config, 'admin')
629 setupSchema(db, 1, anydbm)
630 db.close()
631 self.db = anydbm.Database(config)
632 setupSchema(self.db, 0, anydbm)
634 def testExceptions(self):
635 # this tests the exceptions that should be raised
636 ar = self.assertRaises
638 # this tests the exceptions that should be raised
639 ar(DatabaseError, self.db.status.create, name="foo")
640 ar(DatabaseError, self.db.status.set, '1', name="foo")
641 ar(DatabaseError, self.db.status.retire, '1')
644 class bsddbDBTestCase(anydbmDBTestCase):
645 def setUp(self):
646 from roundup.backends import bsddb
647 # remove previous test, ignore errors
648 if os.path.exists(config.DATABASE):
649 shutil.rmtree(config.DATABASE)
650 os.makedirs(config.DATABASE + '/files')
651 self.db = bsddb.Database(config, 'admin')
652 setupSchema(self.db, 1, bsddb)
654 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
655 def setUp(self):
656 from roundup.backends import bsddb
657 # remove previous test, ignore errors
658 if os.path.exists(config.DATABASE):
659 shutil.rmtree(config.DATABASE)
660 os.makedirs(config.DATABASE + '/files')
661 db = bsddb.Database(config, 'admin')
662 setupSchema(db, 1, bsddb)
663 db.close()
664 self.db = bsddb.Database(config)
665 setupSchema(self.db, 0, bsddb)
668 class bsddb3DBTestCase(anydbmDBTestCase):
669 def setUp(self):
670 from roundup.backends import bsddb3
671 # remove previous test, ignore errors
672 if os.path.exists(config.DATABASE):
673 shutil.rmtree(config.DATABASE)
674 os.makedirs(config.DATABASE + '/files')
675 self.db = bsddb3.Database(config, 'admin')
676 setupSchema(self.db, 1, bsddb3)
678 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
679 def setUp(self):
680 from roundup.backends import bsddb3
681 # remove previous test, ignore errors
682 if os.path.exists(config.DATABASE):
683 shutil.rmtree(config.DATABASE)
684 os.makedirs(config.DATABASE + '/files')
685 db = bsddb3.Database(config, 'admin')
686 setupSchema(db, 1, bsddb3)
687 db.close()
688 self.db = bsddb3.Database(config)
689 setupSchema(self.db, 0, bsddb3)
692 class gadflyDBTestCase(anydbmDBTestCase):
693 ''' Gadfly doesn't support multiple connections to the one local
694 database
695 '''
696 def setUp(self):
697 from roundup.backends import gadfly
698 # remove previous test, ignore errors
699 if os.path.exists(config.DATABASE):
700 shutil.rmtree(config.DATABASE)
701 config.GADFLY_DATABASE = ('test', config.DATABASE)
702 os.makedirs(config.DATABASE + '/files')
703 self.db = gadfly.Database(config, 'admin')
704 setupSchema(self.db, 1, gadfly)
706 def testFilteringString(self):
707 ae, filt = self.filteringSetup()
708 ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
709 # XXX gadfly can't do substring LIKE searches
710 #ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
711 # ['1','2','3'])
713 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
714 def setUp(self):
715 from roundup.backends import gadfly
716 # remove previous test, ignore errors
717 if os.path.exists(config.DATABASE):
718 shutil.rmtree(config.DATABASE)
719 config.GADFLY_DATABASE = ('test', config.DATABASE)
720 os.makedirs(config.DATABASE + '/files')
721 db = gadfly.Database(config, 'admin')
722 setupSchema(db, 1, gadfly)
723 db.close()
724 self.db = gadfly.Database(config)
725 setupSchema(self.db, 0, gadfly)
727 class mysqlDBTestCase(anydbmDBTestCase):
728 def setUp(self):
729 from roundup.backends import mysql
730 # remove previous test, ignore errors
731 if os.path.exists(config.DATABASE):
732 shutil.rmtree(config.DATABASE)
733 os.makedirs(config.DATABASE + '/files')
734 # open database for testing
735 self.db = mysql.Database(config, 'admin')
736 setupSchema(self.db, 1, mysql)
738 def tearDown(self):
739 from roundup.backends import mysql
740 self.db.close()
741 mysql.Database.nuke(config)
743 class mysqlReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
744 def setUp(self):
745 from roundup.backends import mysql
746 # remove previous test, ignore errors
747 if os.path.exists(config.DATABASE):
748 shutil.rmtree(config.DATABASE)
749 os.makedirs(config.DATABASE + '/files')
750 self.db = mysql.Database(config)
751 setupSchema(self.db, 0, mysql)
753 def tearDown(self):
754 from roundup.backends import mysql
755 self.db.close()
756 mysql.Database.nuke(config)
758 class sqliteDBTestCase(anydbmDBTestCase):
759 def setUp(self):
760 from roundup.backends import sqlite
761 # remove previous test, ignore errors
762 if os.path.exists(config.DATABASE):
763 shutil.rmtree(config.DATABASE)
764 os.makedirs(config.DATABASE + '/files')
765 self.db = sqlite.Database(config, 'admin')
766 setupSchema(self.db, 1, sqlite)
768 class sqliteReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
769 def setUp(self):
770 from roundup.backends import sqlite
771 # remove previous test, ignore errors
772 if os.path.exists(config.DATABASE):
773 shutil.rmtree(config.DATABASE)
774 os.makedirs(config.DATABASE + '/files')
775 db = sqlite.Database(config, 'admin')
776 setupSchema(db, 1, sqlite)
777 db.close()
778 self.db = sqlite.Database(config)
779 setupSchema(self.db, 0, sqlite)
782 class metakitDBTestCase(anydbmDBTestCase):
783 def setUp(self):
784 from roundup.backends import metakit
785 import weakref
786 metakit._instances = weakref.WeakValueDictionary()
787 # remove previous test, ignore errors
788 if os.path.exists(config.DATABASE):
789 shutil.rmtree(config.DATABASE)
790 os.makedirs(config.DATABASE + '/files')
791 self.db = metakit.Database(config, 'admin')
792 setupSchema(self.db, 1, metakit)
794 def testTransactions(self):
795 # remember the number of items we started
796 num_issues = len(self.db.issue.list())
797 self.db.issue.create(title="don't commit me!", status='1')
798 self.assertNotEqual(num_issues, len(self.db.issue.list()))
799 self.db.rollback()
800 self.assertEqual(num_issues, len(self.db.issue.list()))
801 self.db.issue.create(title="please commit me!", status='1')
802 self.assertNotEqual(num_issues, len(self.db.issue.list()))
803 self.db.commit()
804 self.assertNotEqual(num_issues, len(self.db.issue.list()))
805 self.db.rollback()
806 self.assertNotEqual(num_issues, len(self.db.issue.list()))
807 self.db.file.create(name="test", type="text/plain", content="hi")
808 self.db.rollback()
809 num_files = len(self.db.file.list())
810 for i in range(10):
811 self.db.file.create(name="test", type="text/plain",
812 content="hi %d"%(i))
813 self.db.commit()
814 # TODO: would be good to be able to ensure the file is not on disk after
815 # a rollback...
816 num_files2 = len(self.db.file.list())
817 self.assertNotEqual(num_files, num_files2)
818 self.db.file.create(name="test", type="text/plain", content="hi")
819 num_rfiles = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
820 self.db.rollback()
821 num_rfiles2 = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
822 self.assertEqual(num_files2, len(self.db.file.list()))
823 self.assertEqual(num_rfiles2, num_rfiles-1)
825 def testBooleanUnset(self):
826 # XXX: metakit can't unset Booleans :(
827 nid = self.db.user.create(username='foo', assignable=1)
828 self.db.user.set(nid, assignable=None)
829 self.assertEqual(self.db.user.get(nid, "assignable"), 0)
831 def testNumberUnset(self):
832 # XXX: metakit can't unset Numbers :(
833 nid = self.db.user.create(username='foo', age=1)
834 self.db.user.set(nid, age=None)
835 self.assertEqual(self.db.user.get(nid, "age"), 0)
837 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
838 def setUp(self):
839 from roundup.backends import metakit
840 import weakref
841 metakit._instances = weakref.WeakValueDictionary()
842 # remove previous test, ignore errors
843 if os.path.exists(config.DATABASE):
844 shutil.rmtree(config.DATABASE)
845 os.makedirs(config.DATABASE + '/files')
846 db = metakit.Database(config, 'admin')
847 setupSchema(db, 1, metakit)
848 db.close()
849 self.db = metakit.Database(config)
850 setupSchema(self.db, 0, metakit)
852 def suite():
853 l = []
854 # l = [
855 # unittest.makeSuite(anydbmDBTestCase, 'test'),
856 # unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
857 # ]
858 # return unittest.TestSuite(l)
860 from roundup import backends
861 p = []
862 if hasattr(backends, 'mysql'):
863 from roundup.backends import mysql
864 try:
865 # Check if we can run mysql tests
866 import MySQLdb
867 db = mysql.Database(nodbconfig, 'admin')
868 db.conn.select_db(config.MYSQL_DBNAME)
869 db.sql("SHOW TABLES");
870 tables = db.sql_fetchall()
871 if tables:
872 # Database should be empty. We don't dare to delete any data
873 raise DatabaseError, "(Database %s contains tables)" % config.MYSQL_DBNAME
874 db.sql("DROP DATABASE %s" % config.MYSQL_DBNAME)
875 db.sql("CREATE DATABASE %s" % config.MYSQL_DBNAME)
876 db.close()
877 except (MySQLdb.ProgrammingError, DatabaseError), msg:
878 print "Warning! Mysql tests will not be performed", msg
879 print "See doc/mysql.txt for more details."
880 else:
881 p.append('mysql')
882 l.append(unittest.makeSuite(mysqlDBTestCase, 'test'))
883 l.append(unittest.makeSuite(mysqlReadOnlyDBTestCase, 'test'))
884 #return unittest.TestSuite(l)
886 if hasattr(backends, 'gadfly'):
887 p.append('gadfly')
888 l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
889 l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
891 if hasattr(backends, 'sqlite'):
892 p.append('sqlite')
893 l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
894 l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
896 if hasattr(backends, 'bsddb'):
897 p.append('bsddb')
898 l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
899 l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
901 if hasattr(backends, 'bsddb3'):
902 p.append('bsddb3')
903 l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
904 l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
906 if hasattr(backends, 'metakit'):
907 p.append('metakit')
908 l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
909 l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
911 print 'running %s backend tests'%(', '.join(p))
912 return unittest.TestSuite(l)
914 # vim: set filetype=python ts=4 sw=4 et si