summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 17524d8)
raw | patch | inline | side by side (parent: 17524d8)
author | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Fri, 12 Mar 2004 04:09:00 +0000 (04:09 +0000) | ||
committer | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Fri, 12 Mar 2004 04:09:00 +0000 (04:09 +0000) |
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@2145 57a73879-2fb5-44c3-a270-3262357dd7e2
index c9ebfd69ad92f57e99dc17d2ec5ab8fb2a5c82b9..b5ffa5a394bd9eed7013c8026c89e6e709d3bba3 100644 (file)
import os, shutil
from MySQLdb.constants import ER
import os, shutil
from MySQLdb.constants import ER
-# Database maintenance functions
+
def db_nuke(config):
"""Clear all database contents and drop database itself"""
def db_nuke(config):
"""Clear all database contents and drop database itself"""
- db = Database(config, 'admin')
- try:
- db.sql_commit()
- db.sql("DROP DATABASE %s" % config.MYSQL_DBNAME)
- db.sql("CREATE DATABASE %s" % config.MYSQL_DBNAME)
- finally:
- db.close()
+ if db_exists(config):
+ conn = MySQLdb.connect(config.MYSQL_DBHOST, config.MYSQL_DBUSER,
+ config.MYSQL_DBPASSWORD)
+ try:
+ conn.select_db(config.MYSQL_DBNAME)
+ except:
+ # no, it doesn't exist
+ pass
+ else:
+ cursor = conn.cursor()
+ cursor.execute("SHOW TABLES")
+ tables = cursor.fetchall()
+ for table in tables:
+ if __debug__:
+ print >>hyperdb.DEBUG, 'DROP TABLE %s'%table[0]
+ cursor.execute("DROP TABLE %s"%table[0])
+ if __debug__:
+ print >>hyperdb.DEBUG, "DROP DATABASE %s"%config.MYSQL_DBNAME
+ cursor.execute("DROP DATABASE %s"%config.MYSQL_DBNAME)
+ conn.commit()
+ conn.close()
+
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
+def db_create(config):
+ """Create the database."""
+ conn = MySQLdb.connect(config.MYSQL_DBHOST, config.MYSQL_DBUSER,
+ config.MYSQL_DBPASSWORD)
+ cursor = conn.cursor()
+ if __debug__:
+ print >>hyperdb.DEBUG, "CREATE DATABASE %s"%config.MYSQL_DBNAME
+ cursor.execute("CREATE DATABASE %s"%config.MYSQL_DBNAME)
+ conn.commit()
+ conn.close()
+
def db_exists(config):
def db_exists(config):
- """Check if database already exists"""
- # Yes, this is a hack, but we must must open connection without
- # selecting a database to prevent creation of some tables
- config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
- config.MYSQL_DBPASSWORD)
- db = Database(config, 'admin')
+ """Check if database already exists."""
+ conn = MySQLdb.connect(config.MYSQL_DBHOST, config.MYSQL_DBUSER,
+ config.MYSQL_DBPASSWORD)
+# tables = None
try:
try:
- db.conn.select_db(config.MYSQL_DBNAME)
- config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
- config.MYSQL_DBPASSWORD, config.MYSQL_DBNAME)
- db.sql("SHOW TABLES")
- tables = db.sql_fetchall()
+ try:
+ conn.select_db(config.MYSQL_DBNAME)
+# cursor = conn.cursor()
+# cursor.execute("SHOW TABLES")
+# tables = cursor.fetchall()
+# if __debug__:
+# print >>hyperdb.DEBUG, "tables %s"%(tables,)
+ except MySQLdb.OperationalError:
+ if __debug__:
+ print >>hyperdb.DEBUG, "no database '%s'"%config.MYSQL_DBNAME
+ return 0
finally:
finally:
- db.close()
- if tables or os.path.exists(config.DATABASE):
- return 1
- return 0
+ conn.close()
+ if __debug__:
+ print >>hyperdb.DEBUG, "database '%s' exists"%config.MYSQL_DBNAME
+ return 1
+
class Database(Database):
arg = '%s'
class Database(Database):
arg = '%s'
#mysql_backend = 'BDB'
def sql_open_connection(self):
#mysql_backend = 'BDB'
def sql_open_connection(self):
+ # make sure the database actually exists
+ if not db_exists(self.config):
+ db_create(self.config)
+
db = getattr(self.config, 'MYSQL_DATABASE')
try:
self.conn = MySQLdb.connect(*db)
db = getattr(self.config, 'MYSQL_DATABASE')
try:
self.conn = MySQLdb.connect(*db)
except MySQLdb.ProgrammingError, message:
if message[0] != ER.NO_SUCH_TABLE:
raise DatabaseError, message
except MySQLdb.ProgrammingError, message:
if message[0] != ER.NO_SUCH_TABLE:
raise DatabaseError, message
- self.database_schema = {}
+ self.init_dbschema()
self.sql("CREATE TABLE schema (schema TEXT) TYPE=%s"%
self.mysql_backend)
# TODO: use AUTO_INCREMENT for generating ids:
self.sql("CREATE TABLE schema (schema TEXT) TYPE=%s"%
self.mysql_backend)
# TODO: use AUTO_INCREMENT for generating ids:
self.create_version_2_tables()
def create_version_2_tables(self):
self.create_version_2_tables()
def create_version_2_tables(self):
- self.cursor.execute('CREATE TABLE otks (key VARCHAR(255), '
- 'value VARCHAR(255), __time FLOAT(20))')
- self.cursor.execute('CREATE INDEX otks_key_idx ON otks(key)')
- self.cursor.execute('CREATE TABLE sessions (key VARCHAR(255), '
- 'last_use FLOAT(20), user VARCHAR(255))')
- self.cursor.execute('CREATE INDEX sessions_key_idx ON sessions(key)')
+ self.cursor.execute('CREATE TABLE otks (otk_key VARCHAR(255), '
+ 'otk_value VARCHAR(255), otk_time FLOAT(20))')
+ self.cursor.execute('CREATE INDEX otks_key_idx ON otks(otk_key)')
+ self.cursor.execute('CREATE TABLE sessions (s_key VARCHAR(255), '
+ 's_last_use FLOAT(20), s_user VARCHAR(255))')
+ self.cursor.execute('CREATE INDEX sessions_key_idx ON sessions(s_key)')
def __repr__(self):
return '<myroundsql 0x%x>'%id(self)
def __repr__(self):
return '<myroundsql 0x%x>'%id(self)
index 147cb83f1b6960298f1bd58301ec0edcdc9fb83a..c58505658b00db012035d3b518cd39555f8c0608 100644 (file)
__docformat__ = 'restructuredtext'
__docformat__ = 'restructuredtext'
+import os, shutil, popen2, time
+import psycopg
+
from roundup import hyperdb, date
from roundup.backends import rdbms_common
from roundup import hyperdb, date
from roundup.backends import rdbms_common
-import psycopg
-import os, shutil, popen2
+
+def db_create(config):
+ """Clear all database contents and drop database itself"""
+ if __debug__:
+ print >> hyperdb.DEBUG, '+++ create database +++'
+ name = config.POSTGRESQL_DATABASE['database']
+ n = 0
+ while n < 10:
+ cout,cin = popen2.popen4('createdb %s'%name)
+ cin.close()
+ response = cout.read().split('\n')[0]
+ if response.find('FATAL') != -1:
+ raise RuntimeError, response
+ elif response.find('ERROR') != -1:
+ if not response.find('is being accessed by other users') != -1:
+ raise RuntimeError, response
+ if __debug__:
+ print >> hyperdb.DEBUG, '+++ SLEEPING +++'
+ time.sleep(1)
+ n += 1
+ continue
+ return
+ raise RuntimeError, '10 attempts to create database failed'
+
+def db_nuke(config, fail_ok=0):
+ """Clear all database contents and drop database itself"""
+ if __debug__:
+ print >> hyperdb.DEBUG, '+++ nuke database +++'
+ name = config.POSTGRESQL_DATABASE['database']
+ n = 0
+ if os.path.exists(config.DATABASE):
+ shutil.rmtree(config.DATABASE)
+ while n < 10:
+ cout,cin = popen2.popen4('dropdb %s'%name)
+ cin.close()
+ response = cout.read().split('\n')[0]
+ if response.endswith('does not exist') and fail_ok:
+ return
+ elif response.find('FATAL') != -1:
+ raise RuntimeError, response
+ elif response.find('ERROR') != -1:
+ if not response.find('is being accessed by other users') != -1:
+ raise RuntimeError, response
+ if __debug__:
+ print >> hyperdb.DEBUG, '+++ SLEEPING +++'
+ time.sleep(1)
+ n += 1
+ continue
+ return
+ raise RuntimeError, '10 attempts to nuke database failed'
+
+def db_exists(config):
+ """Check if database already exists"""
+ db = getattr(config, 'POSTGRESQL_DATABASE')
+ try:
+ conn = psycopg.connect(**db)
+ conn.close()
+ if __debug__:
+ print >> hyperdb.DEBUG, '+++ database exists +++'
+ return 1
+ except:
+ if __debug__:
+ print >> hyperdb.DEBUG, '+++ no database +++'
+ return 0
class Database(rdbms_common.Database):
arg = '%s'
def sql_open_connection(self):
class Database(rdbms_common.Database):
arg = '%s'
def sql_open_connection(self):
+ if not db_exists(self.config):
+ db_create(self.config)
+
+ if __debug__:
+ print >>hyperdb.DEBUG, '+++ open database connection +++'
+
db = getattr(self.config, 'POSTGRESQL_DATABASE')
try:
self.conn = psycopg.connect(**db)
db = getattr(self.config, 'POSTGRESQL_DATABASE')
try:
self.conn = psycopg.connect(**db)
self.load_dbschema()
except:
self.rollback()
self.load_dbschema()
except:
self.rollback()
- self.database_schema = {}
+ self.init_dbschema()
self.sql("CREATE TABLE schema (schema TEXT)")
self.sql("CREATE TABLE ids (name VARCHAR(255), num INT4)")
self.sql("CREATE INDEX ids_name_idx ON ids(name)")
self.create_version_2_tables()
def create_version_2_tables(self):
self.sql("CREATE TABLE schema (schema TEXT)")
self.sql("CREATE TABLE ids (name VARCHAR(255), num INT4)")
self.sql("CREATE INDEX ids_name_idx ON ids(name)")
self.create_version_2_tables()
def create_version_2_tables(self):
- self.cursor.execute('CREATE TABLE otks (key VARCHAR(255), '
- 'value VARCHAR(255), __time NUMERIC)')
- self.cursor.execute('CREATE INDEX otks_key_idx ON otks(key)')
- self.cursor.execute('CREATE TABLE sessions (key VARCHAR(255), '
- 'last_use NUMERIC, user VARCHAR(255))')
- self.cursor.execute('CREATE INDEX sessions_key_idx ON sessions(key)')
+ self.cursor.execute('CREATE TABLE otks (otk_key VARCHAR(255), '
+ 'otk_value VARCHAR(255), otk_time FLOAT(20))')
+ self.cursor.execute('CREATE INDEX otks_key_idx ON otks(otk_key)')
+ self.cursor.execute('CREATE TABLE sessions (s_key VARCHAR(255), '
+ 's_last_use FLOAT(20), s_user VARCHAR(255))')
+ self.cursor.execute('CREATE INDEX sessions_key_idx ON sessions(s_key)')
def __repr__(self):
return '<roundpsycopgsql 0x%x>' % id(self)
def __repr__(self):
return '<roundpsycopgsql 0x%x>' % id(self)
index 6d28cea8d53d32bedc76ed5bc104ee9cb806bdd3..1f1013c285c9302cf373bacea0e6927b38931c13 100644 (file)
-# $Id: back_sqlite.py,v 1.14 2004-03-05 00:08:09 richard Exp $
+# $Id: back_sqlite.py,v 1.15 2004-03-12 04:08:59 richard Exp $
'''Implements a backend for SQLite.
See https://pysqlite.sourceforge.net/ for pysqlite info
'''Implements a backend for SQLite.
See https://pysqlite.sourceforge.net/ for pysqlite info
except sqlite.DatabaseError, error:
if str(error) != 'no such table: schema':
raise
except sqlite.DatabaseError, error:
if str(error) != 'no such table: schema':
raise
- self.database_schema = {}
+ self.init_dbschema()
self.cursor.execute('create table schema (schema varchar)')
self.cursor.execute('create table ids (name varchar, num integer)')
self.cursor.execute('create index ids_name_idx on ids(name)')
self.create_version_2_tables()
def create_version_2_tables(self):
self.cursor.execute('create table schema (schema varchar)')
self.cursor.execute('create table ids (name varchar, num integer)')
self.cursor.execute('create index ids_name_idx on ids(name)')
self.create_version_2_tables()
def create_version_2_tables(self):
- self.cursor.execute('create table otks (key varchar, '
- 'value varchar, __time varchar)')
- self.cursor.execute('create index otks_key_idx on otks(key)')
- self.cursor.execute('create table sessions (key varchar, '
- 'last_use varchar, user varchar)')
- self.cursor.execute('create index sessions_key_idx on sessions(key)')
+ self.cursor.execute('create table otks (otk_key varchar, '
+ 'otk_value varchar, otk_time varchar)')
+ self.cursor.execute('create index otks_key_idx on otks(otk_key)')
+ self.cursor.execute('create table sessions (s_key varchar, '
+ 's_last_use varchar, s_user varchar)')
+ self.cursor.execute('create index sessions_key_idx on sessions(s_key)')
def sql_close(self):
''' Squash any error caused by us already having closed the
def sql_close(self):
''' Squash any error caused by us already having closed the
index 16392c68aba7f229095fb0044a34bb525f5a0c03..352220f247a64a01bb4240e3d8397cc30a6f6786 100644 (file)
-# $Id: rdbms_common.py,v 1.76 2004-03-05 00:08:09 richard Exp $
+# $Id: rdbms_common.py,v 1.77 2004-03-12 04:08:59 richard Exp $
''' Relational database (SQL) backend common code.
Basics:
''' Relational database (SQL) backend common code.
Basics:
The schema of the hyperdb being mapped to the database is stored in the
database itself as a repr()'ed dictionary of information about each Class
that maps to a table. If that information differs from the hyperdb schema,
The schema of the hyperdb being mapped to the database is stored in the
database itself as a repr()'ed dictionary of information about each Class
that maps to a table. If that information differs from the hyperdb schema,
-then we update it. We also store in the schema dict a __version__ which
+then we update it. We also store in the schema dict a version which
allows us to upgrade the database schema when necessary. See upgrade_db().
'''
__docformat__ = 'restructuredtext'
allows us to upgrade the database schema when necessary. See upgrade_db().
'''
__docformat__ = 'restructuredtext'
'''
return re.sub("'", "''", str(value))
'''
return re.sub("'", "''", str(value))
+ def init_dbschema(self):
+ self.database_schema = {
+ 'version': self.current_db_version,
+ 'tables': {}
+ }
+
def load_dbschema(self):
''' Load the schema definition that the database currently implements
'''
self.cursor.execute('select schema from schema')
def load_dbschema(self):
''' Load the schema definition that the database currently implements
'''
self.cursor.execute('select schema from schema')
- self.database_schema = eval(self.cursor.fetchone()[0])
+ schema = self.cursor.fetchone()
+ if schema:
+ self.database_schema = eval(schema[0])
+ else:
+ self.database_schema = {}
def save_dbschema(self, schema):
''' Save the schema definition that the database currently implements
def save_dbschema(self, schema):
''' Save the schema definition that the database currently implements
We should now confirm that the schema defined by our "classes"
attribute actually matches the schema in the database.
'''
We should now confirm that the schema defined by our "classes"
attribute actually matches the schema in the database.
'''
- self.upgrade_db()
+ save = self.upgrade_db()
# now detect changes in the schema
# now detect changes in the schema
- save = 0
+ tables = self.database_schema['tables']
for classname, spec in self.classes.items():
for classname, spec in self.classes.items():
- if self.database_schema.has_key(classname):
- dbspec = self.database_schema[classname]
+ if tables.has_key(classname):
+ dbspec = tables[classname]
if self.update_class(spec, dbspec):
if self.update_class(spec, dbspec):
- self.database_schema[classname] = spec.schema()
+ tables[classname] = spec.schema()
save = 1
else:
self.create_class(spec)
save = 1
else:
self.create_class(spec)
- self.database_schema[classname] = spec.schema()
+ tables[classname] = spec.schema()
save = 1
save = 1
- for classname, spec in self.database_schema.items():
+ for classname, spec in tables.items():
if not self.classes.has_key(classname):
if not self.classes.has_key(classname):
- self.drop_class(classname, spec)
- del self.database_schema[classname]
+ self.drop_class(classname, tables[classname])
+ del tables[classname]
save = 1
# update the database version of the schema
save = 1
# update the database version of the schema
current_db_version = 2
def upgrade_db(self):
''' Update the SQL database to reflect changes in the backend code.
current_db_version = 2
def upgrade_db(self):
''' Update the SQL database to reflect changes in the backend code.
+
+ Return boolean whether we need to save the schema.
'''
'''
- version = self.database_schema.get('__version', 1)
+ version = self.database_schema.get('version', 1)
if version == 1:
# version 1 doesn't have the OTK, session and indexing in the
# database
self.create_version_2_tables()
if version == 1:
# version 1 doesn't have the OTK, session and indexing in the
# database
self.create_version_2_tables()
+ else:
+ return 0
- self.database_schema['__version'] = self.current_db_version
+ self.database_schema['version'] = self.current_db_version
+ return 1
def refresh_database(self):
def refresh_database(self):
def sql_commit(self):
''' Actually commit to the database.
'''
def sql_commit(self):
''' Actually commit to the database.
'''
+ if __debug__:
+ print >>hyperdb.DEBUG, '+++ commit database connection +++'
self.conn.commit()
def commit(self):
self.conn.commit()
def commit(self):
return (classname, nodeid)
def sql_close(self):
return (classname, nodeid)
def sql_close(self):
+ if __debug__:
+ print >>hyperdb.DEBUG, '+++ close database connection +++'
self.conn.close()
def close(self):
self.conn.close()
def close(self):
diff --git a/run_tests.py b/run_tests.py
index 401c74a8e61808ccf01c213868a241d5013dc642..70311a2d23829e5d51293ec585894548f88dded7 100644 (file)
--- a/run_tests.py
+++ b/run_tests.py
# Hack sys.path
self.cwd = os.getcwd()
sys.path.insert(0, os.path.join(self.cwd, self.libdir))
# Hack sys.path
self.cwd = os.getcwd()
sys.path.insert(0, os.path.join(self.cwd, self.libdir))
- print sys.path
# Hack again for external products.
global functional
kind = functional and "functional" or "unit"
# Hack again for external products.
global functional
kind = functional and "functional" or "unit"
def runner(files, test_filter, debug):
runner = ImmediateTestRunner(verbosity=VERBOSE, debug=debug,
def runner(files, test_filter, debug):
runner = ImmediateTestRunner(verbosity=VERBOSE, debug=debug,
- progress=progress)
+ progress=progress)
suite = unittest.TestSuite()
for file in files:
s = get_suite(file)
suite = unittest.TestSuite()
for file in files:
s = get_suite(file)
index 586ea721b8960b2a181dae44e8c513e46f97f2bd..dd934852d79725fafc0507d1f47bba4e837d4f39 100644 (file)
<th tal:condition="request/show/assignedto">Assigned To</th>
</tr>
<tal:block tal:repeat="i batch">
<th tal:condition="request/show/assignedto">Assigned To</th>
</tr>
<tal:block tal:repeat="i batch">
- <tr tal:condition="python:request.group[1] and
- batch.propchanged(request.group[1])">
+ <tr tal:define="group python:request.group[1]"
+ tal:condition="python:group and batch.propchanged(group)">
<th tal:attributes="colspan python:len(request.columns)"
<th tal:attributes="colspan python:len(request.columns)"
- tal:content="python:i[request.group[1]]" class="group">
+ tal:content="python:str(i[group]) or '(no %s set)'%group" class="group">
</th>
</tr>
</th>
</tr>
diff --git a/test/db_test_base.py b/test/db_test_base.py
index ed021c408aabb84bf422f2469895dadd62fe0929..c7a52d25b4d91358c6285127938ded2aa42b461a 100644 (file)
--- a/test/db_test_base.py
+++ b/test/db_test_base.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: db_test_base.py,v 1.14 2004-01-20 05:55:51 richard Exp $
+# $Id: db_test_base.py,v 1.15 2004-03-12 04:09:00 richard Exp $
import unittest, os, shutil, errno, imp, sys, time, pprint
import unittest, os, shutil, errno, imp, sys, time, pprint
def testFilteringID(self):
ae, filt = self.filteringSetup()
ae(filt(None, {'id': '1'}, ('+','id'), (None,None)), ['1'])
def testFilteringID(self):
ae, filt = self.filteringSetup()
ae(filt(None, {'id': '1'}, ('+','id'), (None,None)), ['1'])
+ ae(filt(None, {'id': '2'}, ('+','id'), (None,None)), ['2'])
+ ae(filt(None, {'id': '10'}, ('+','id'), (None,None)), [])
def testFilteringString(self):
ae, filt = self.filteringSetup()
def testFilteringString(self):
ae, filt = self.filteringSetup()
diff --git a/test/test_mysql.py b/test/test_mysql.py
index 1388d47e8df0401009eaa265dc871169da67c5f0..54f7dcf6ffb7a739390585c640f228acef401303 100644 (file)
--- a/test/test_mysql.py
+++ b/test/test_mysql.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_mysql.py,v 1.6 2003-11-14 00:11:19 richard Exp $
+# $Id: test_mysql.py,v 1.7 2004-03-12 04:09:00 richard Exp $
import unittest, os, shutil, time, imp
import unittest, os, shutil, time, imp
config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
config.MYSQL_DBPASSWORD, config.MYSQL_DBNAME)
config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
config.MYSQL_DBPASSWORD, config.MYSQL_DBNAME)
-class nodbconfig(config):
- MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER, config.MYSQL_DBPASSWORD)
-
class mysqlOpener:
if hasattr(backends, 'mysql'):
from roundup.backends import mysql as module
class mysqlOpener:
if hasattr(backends, 'mysql'):
from roundup.backends import mysql as module
+ def setUp(self):
+ self.module.db_nuke(config)
+
def tearDown(self):
self.db.close()
self.nuke_database()
def tearDown(self):
self.db.close()
self.nuke_database()
self.module.db_nuke(config)
class mysqlDBTest(mysqlOpener, DBTest):
self.module.db_nuke(config)
class mysqlDBTest(mysqlOpener, DBTest):
- pass
+ def setUp(self):
+ mysqlOpener.setUp(self)
+ DBTest.setUp(self)
class mysqlROTest(mysqlOpener, ROTest):
class mysqlROTest(mysqlOpener, ROTest):
- pass
+ def setUp(self):
+ mysqlOpener.setUp(self)
+ ROTest.setUp(self)
class mysqlSchemaTest(mysqlOpener, SchemaTest):
class mysqlSchemaTest(mysqlOpener, SchemaTest):
- pass
+ def setUp(self):
+ mysqlOpener.setUp(self)
+ SchemaTest.setUp(self)
class mysqlClassicInitTest(mysqlOpener, ClassicInitTest):
backend = 'mysql'
class mysqlClassicInitTest(mysqlOpener, ClassicInitTest):
backend = 'mysql'
MYSQL_DBNAME = 'rounduptest'
MYSQL_DATABASE = (MYSQL_DBHOST, MYSQL_DBUSER, MYSQL_DBPASSWORD, MYSQL_DBNAME)
'''
MYSQL_DBNAME = 'rounduptest'
MYSQL_DATABASE = (MYSQL_DBHOST, MYSQL_DBUSER, MYSQL_DBPASSWORD, MYSQL_DBNAME)
'''
- if hasattr(backends, 'mysql'):
- from roundup.backends import mysql as module
+ def setUp(self):
+ mysqlOpener.setUp(self)
+ ClassicInitTest.setUp(self)
def tearDown(self):
ClassicInitTest.tearDown(self)
self.nuke_database()
def tearDown(self):
ClassicInitTest.tearDown(self)
self.nuke_database()
# Check if we can run mysql tests
import MySQLdb
db = mysql.Database(config, 'admin')
# Check if we can run mysql tests
import MySQLdb
db = mysql.Database(config, 'admin')
- db.conn.select_db(config.MYSQL_DBNAME)
- db.sql("SHOW TABLES");
- tables = db.sql_fetchall()
- # TODO: reinstate the check here
- if 0: #tables:
- # Database should be empty. We don't dare to delete any data
- raise DatabaseError, "Database %s contains tables"%\
- config.MYSQL_DBNAME
- db.sql("DROP DATABASE %s" % config.MYSQL_DBNAME)
- db.sql("CREATE DATABASE %s" % config.MYSQL_DBNAME)
db.close()
except (MySQLdb.ProgrammingError, DatabaseError), msg:
print "Skipping mysql tests (%s)"%msg
db.close()
except (MySQLdb.ProgrammingError, DatabaseError), msg:
print "Skipping mysql tests (%s)"%msg
index 5841f8419e2fa9add8c1d1aefc662bc8dc1f33f0..ac628be2e6ef16269c81cfcf411d2fef3e633919 100644 (file)
--- a/test/test_postgresql.py
+++ b/test/test_postgresql.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_postgresql.py,v 1.4 2003-11-14 00:11:19 richard Exp $
+# $Id: test_postgresql.py,v 1.5 2004-03-12 04:09:00 richard Exp $
-import sys, unittest, os, shutil, time, popen2
+import unittest
from roundup.hyperdb import DatabaseError
from roundup.hyperdb import DatabaseError
config.POSTGRESQL_DATABASE = {'database': 'rounduptest'}
from roundup import backends
config.POSTGRESQL_DATABASE = {'database': 'rounduptest'}
from roundup import backends
-
-def db_create():
- """Clear all database contents and drop database itself"""
- name = config.POSTGRESQL_DATABASE['database']
- cout,cin = popen2.popen4('createdb %s'%name)
- cin.close()
- response = cout.read().split('\n')[0]
- if response.find('FATAL') != -1 or response.find('ERROR') != -1:
- raise RuntimeError, response
-
-def db_nuke(fail_ok=0):
- """Clear all database contents and drop database itself"""
- name = config.POSTGRESQL_DATABASE['database']
- cout,cin = popen2.popen4('dropdb %s'%name)
- cin.close()
- response = cout.read().split('\n')[0]
- if response.endswith('does not exist') and fail_ok:
- return
- if response.find('FATAL') != -1 or response.find('ERROR') != -1:
- raise RuntimeError, response
- if os.path.exists(config.DATABASE):
- shutil.rmtree(config.DATABASE)
-
-def db_exists(config):
- """Check if database already exists"""
- try:
- db = Database(config, 'admin')
- return 1
- except:
- return 0
+from roundup.backends.back_postgresql import db_nuke, db_create, db_exists
class postgresqlOpener:
if hasattr(backends, 'postgresql'):
from roundup.backends import postgresql as module
def setUp(self):
class postgresqlOpener:
if hasattr(backends, 'postgresql'):
from roundup.backends import postgresql as module
def setUp(self):
- db_nuke(1)
- db_create()
+ #db_nuke(config, 1)
+ pass
def tearDown(self):
self.nuke_database()
def nuke_database(self):
def tearDown(self):
self.nuke_database()
def nuke_database(self):
- # clear out the database - easiest way is to nuke and re-created it
- db_nuke()
- db_create()
+ # clear out the database - easiest way is to nuke and re-create it
+ db_nuke(config)
class postgresqlDBTest(postgresqlOpener, DBTest):
def setUp(self):
class postgresqlDBTest(postgresqlOpener, DBTest):
def setUp(self):
if not hasattr(backends, 'postgresql'):
return suite
if not hasattr(backends, 'postgresql'):
return suite
- # Check if we can run postgresql tests
+ # make sure we start with a clean slate
+ db_nuke(config, 1)
+
+ # TODO: Check if we can run postgresql tests
print 'Including postgresql tests'
suite.addTest(unittest.makeSuite(postgresqlDBTest))
suite.addTest(unittest.makeSuite(postgresqlROTest))
print 'Including postgresql tests'
suite.addTest(unittest.makeSuite(postgresqlDBTest))
suite.addTest(unittest.makeSuite(postgresqlROTest))