summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 39b435c)
raw | patch | inline | side by side (parent: 39b435c)
author | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Mon, 13 Jan 2003 23:32:12 +0000 (23:32 +0000) | ||
committer | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Mon, 13 Jan 2003 23:32:12 +0000 (23:32 +0000) |
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@1451 57a73879-2fb5-44c3-a270-3262357dd7e2
roundup/backends/back_mysql.py | [new file with mode: 0644] | patch | blob |
test/test_db.py | patch | blob | history |
diff --git a/roundup/backends/back_mysql.py b/roundup/backends/back_mysql.py
--- /dev/null
@@ -0,0 +1,98 @@
+from roundup.backends.rdbms_common import *
+import MySQLdb
+from MySQLdb.constants import ER
+
+class Database(Database):
+ arg = '%s'
+
+ def open_connection(self):
+ db = getattr(self.config, 'MYSQL_DATABASE')
+ try:
+ self.conn = MySQLdb.connect(*db)
+ except MySQLdb.OperationalError, message:
+ raise DatabaseError, message
+
+ self.cursor = self.conn.cursor()
+ try:
+ self.database_schema = self.load_dbschema()
+ except MySQLdb.ProgrammingError, message:
+ if message[0] != ER.NO_SUCH_TABLE:
+ raise DatabaseError, message
+ self.database_schema = {}
+ self.cursor.execute("CREATE TABLE schema (schema TEXT)")
+ self.cursor.execute("CREATE TABLE ids (name varchar(255), num INT)")
+
+ def close(self):
+ try:
+ self.conn.close()
+ except MySQLdb.OperationalError, message:
+ raise
+
+ def __repr__(self):
+ return '<myroundsql 0x%x>'%id(self)
+
+ def sql_fetchone(self):
+ return self.cursor.fetchone()
+
+ def sql_fetchall(self):
+ return self.cursor.fetchall()
+
+ def save_dbschema(self, schema):
+ s = repr(self.database_schema)
+ self.sql('INSERT INTO schema VALUES (%s)', (s,))
+
+ def load_dbschema(self):
+ self.cursor.execute('SELECT schema FROM schema')
+ return eval(self.cursor.fetchone()[0])
+
+ def save_journal(self, classname, cols, nodeid, journaldate,
+ journaltag, action, params):
+ params = repr(params)
+ entry = (nodeid, journaldate, journaltag, action, params)
+
+ a = self.arg
+ sql = 'insert into %s__journal (%s) values (%s,%s,%s,%s,%s)'%(classname,
+ cols, a, a, a, a, a)
+ if __debug__:
+ print >>hyperdb.DEBUG, 'addjournal', (self, sql, entry)
+ self.cursor.execute(sql, entry)
+
+ def load_journal(self, classname, cols, nodeid):
+ sql = 'select %s from %s__journal where nodeid=%s'%(cols, classname,
+ self.arg)
+ if __debug__:
+ print >>hyperdb.DEBUG, 'getjournal', (self, sql, nodeid)
+ self.cursor.execute(sql, (nodeid,))
+ res = []
+ for nodeid, date_stamp, user, action, params in self.cursor.fetchall():
+ params = eval(params)
+ res.append((nodeid, date.Date(date_stamp), user, action, params))
+ return res
+
+ def create_class_table(self, spec):
+ cols, mls = self.determine_columns(spec.properties.items())
+ cols.append('id')
+ cols.append('__retired__')
+ scols = ',' . join(['`%s` VARCHAR(255)'%x for x in cols])
+ sql = 'CREATE TABLE `_%s` (%s)'%(spec.classname, scols)
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_class', (self, sql)
+ self.cursor.execute(sql)
+ return cols, mls
+
+ def create_journal_table(self, spec):
+ cols = ',' . join(['`%s` VARCHAR(255)'%x
+ for x in 'nodeid date tag action params' . split()])
+ sql = 'CREATE TABLE `%s__journal` (%s)'%(spec.classname, cols)
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_class', (self, sql)
+ self.cursor.execute(sql)
+
+ def create_multilink_table(self, spec, ml):
+ sql = '''CREATE TABLE `%s_%s` (linkid VARCHAR(255),
+ nodeid VARCHAR(255))'''%(spec.classname, ml)
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_class', (self, sql)
+ self.cursor.execute(sql)
+
+
diff --git a/test/test_db.py b/test/test_db.py
index d44d23648daa19512a9f8e4b4d861d37fdf0713b..6592a35e9106377368a91e610f95297ab3dd53cd 100644 (file)
--- a/test/test_db.py
+++ b/test/test_db.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_db.py,v 1.65 2003-01-12 23:53:20 richard Exp $
+# $Id: test_db.py,v 1.66 2003-01-13 23:32:12 richard Exp $
import unittest, os, shutil, time
# remove previous test, ignore errors
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
- config.MYSQL_DATABASE='mysql@localhost root rootpasswd'.split()
+ config.MYSQL_DATABASE = ('localhost', 'rounduptest', 'rounduptest',
+ 'rounduptest')
os.makedirs(config.DATABASE + '/files')
self.db = mysql.Database(config, 'admin')
setupSchema(self.db, 1, mysql)
# remove previous test, ignore errors
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
- config.MYSQL_DATABASE='mysql@localhost root rootpasswd'.split()
+ config.MYSQL_DATABASE = ('localhost', 'rounduptest', 'rounduptest',
+ 'rounduptest')
os.makedirs(config.DATABASE + '/files')
db = mysql.Database(config, 'admin')
setupSchema(db, 1, mysql)
# return unittest.TestSuite(l)
from roundup import backends
-# if hasattr(backends, 'mysql'):
-# l.append(unittest.makeSuite(mysqlDBTestCase, 'test'))
-# l.append(unittest.makeSuite(mysqlReadOnlyDBTestCase, 'test'))
+ if hasattr(backends, 'mysql'):
+ l.append(unittest.makeSuite(mysqlDBTestCase, 'test'))
+ l.append(unittest.makeSuite(mysqlReadOnlyDBTestCase, 'test'))
# return unittest.TestSuite(l)
if hasattr(backends, 'gadfly'):