index 5e6214c3e72688fab0278b54c908408019aa09b5..3298389303473fe45865be789b69efc1e10ddecb 100644 (file)
+#$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $
#
# Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
#
__docformat__ = 'restructuredtext'
import os, shutil, popen2, time
-import psycopg
+try:
+ import psycopg
+ from psycopg import QuotedString
+ from psycopg import ProgrammingError
+except:
+ from psycopg2 import psycopg1 as psycopg
+ from psycopg2.extensions import QuotedString
+ from psycopg2.psycopg1 import ProgrammingError
+import logging
from roundup import hyperdb, date
from roundup.backends import rdbms_common
+from roundup.backends import sessions_rdbms
+
+def connection_dict(config, dbnamestr=None):
+ ''' read_default_group is MySQL-specific, ignore it '''
+ d = rdbms_common.connection_dict(config, dbnamestr)
+ if d.has_key('read_default_group'):
+ del d['read_default_group']
+ if d.has_key('read_default_file'):
+ del d['read_default_file']
+ return d
def db_create(config):
"""Clear all database contents and drop database itself"""
- if __debug__:
- print >> hyperdb.DEBUG, '+++ create database +++'
- name = config.POSTGRESQL_DATABASE['database']
- n = 0
- while n < 10:
- cout,cin = popen2.popen4('createdb %s'%name)
- cin.close()
- response = cout.read().split('\n')[0]
- if response.find('FATAL') != -1:
- raise RuntimeError, response
- elif response.find('ERROR') != -1:
- if not response.find('is being accessed by other users') != -1:
- raise RuntimeError, response
- if __debug__:
- print >> hyperdb.DEBUG, '+++ SLEEPING +++'
- time.sleep(1)
- n += 1
- continue
- return
- raise RuntimeError, '10 attempts to create database failed'
+ command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME
+ logging.getLogger('hyperdb').info(command)
+ db_command(config, command)
def db_nuke(config, fail_ok=0):
"""Clear all database contents and drop database itself"""
- if __debug__:
- print >> hyperdb.DEBUG, '+++ nuke database +++'
- name = config.POSTGRESQL_DATABASE['database']
- n = 0
+ command = 'DROP DATABASE %s'% config.RDBMS_NAME
+ logging.getLogger('hyperdb').info(command)
+ db_command(config, command)
+
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
- while n < 10:
- cout,cin = popen2.popen4('dropdb %s'%name)
- cin.close()
- response = cout.read().split('\n')[0]
- if response.endswith('does not exist') and fail_ok:
- return
- elif response.find('FATAL') != -1:
+
+def db_command(config, command):
+ '''Perform some sort of database-level command. Retry 10 times if we
+ fail by conflicting with another user.
+ '''
+ template1 = connection_dict(config)
+ template1['database'] = 'template1'
+
+ try:
+ conn = psycopg.connect(**template1)
+ except psycopg.OperationalError, message:
+ raise hyperdb.DatabaseError, message
+
+ conn.set_isolation_level(0)
+ cursor = conn.cursor()
+ try:
+ for n in range(10):
+ if pg_command(cursor, command):
+ return
+ finally:
+ conn.close()
+ raise RuntimeError, '10 attempts to create database failed'
+
+def pg_command(cursor, command):
+ '''Execute the postgresql command, which may be blocked by some other
+ user connecting to the database, and return a true value if it succeeds.
+
+ If there is a concurrent update, retry the command.
+ '''
+ try:
+ cursor.execute(command)
+ except psycopg.ProgrammingError, err:
+ response = str(err).split('\n')[0]
+ if response.find('FATAL') != -1:
raise RuntimeError, response
- elif response.find('ERROR') != -1:
- if not response.find('is being accessed by other users') != -1:
- raise RuntimeError, response
- if __debug__:
- print >> hyperdb.DEBUG, '+++ SLEEPING +++'
- time.sleep(1)
- n += 1
- continue
- return
- raise RuntimeError, '10 attempts to nuke database failed'
+ else:
+ msgs = [
+ 'is being accessed by other users',
+ 'could not serialize access due to concurrent update',
+ ]
+ can_retry = 0
+ for msg in msgs:
+ if response.find(msg) == -1:
+ can_retry = 1
+ if can_retry:
+ time.sleep(1)
+ return 0
+ raise RuntimeError, response
+ return 1
def db_exists(config):
"""Check if database already exists"""
- db = getattr(config, 'POSTGRESQL_DATABASE')
+ db = connection_dict(config, 'database')
try:
conn = psycopg.connect(**db)
conn.close()
- if __debug__:
- print >> hyperdb.DEBUG, '+++ database exists +++'
return 1
except:
- if __debug__:
- print >> hyperdb.DEBUG, '+++ no database +++'
return 0
+class Sessions(sessions_rdbms.Sessions):
+ def set(self, *args, **kwargs):
+ try:
+ sessions_rdbms.Sessions.set(self, *args, **kwargs)
+ except ProgrammingError, err:
+ response = str(err).split('\n')[0]
+ if -1 != response.find('ERROR') and \
+ -1 != response.find('could not serialize access due to concurrent update'):
+ # another client just updated, and we're running on
+ # serializable isolation.
+ # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html
+ self.db.rollback()
+
class Database(rdbms_common.Database):
arg = '%s'
+ # used by some code to switch styles of query
+ implements_intersect = 1
+
+ def getSessionManager(self):
+ return Sessions(self)
+
def sql_open_connection(self):
- db = getattr(self.config, 'POSTGRESQL_DATABASE')
+ db = connection_dict(self.config, 'database')
+ logging.getLogger('hyperdb').info('open database %r'%db['database'])
try:
conn = psycopg.connect(**db)
except psycopg.OperationalError, message:
if not db_exists(self.config):
db_create(self.config)
- if __debug__:
- print >>hyperdb.DEBUG, '+++ open database connection +++'
-
self.conn, self.cursor = self.sql_open_connection()
try:
self.load_dbschema()
- except:
+ except psycopg.ProgrammingError, message:
+ if str(message).find('schema') == -1:
+ raise
self.rollback()
self.init_dbschema()
self.sql("CREATE TABLE schema (schema TEXT)")
def create_version_2_tables(self):
# OTK store
- self.cursor.execute('''CREATE TABLE otks (otk_key VARCHAR(255),
- otk_value VARCHAR(255), otk_time FLOAT(20))''')
- self.cursor.execute('CREATE INDEX otks_key_idx ON otks(otk_key)')
+ self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
+ otk_value TEXT, otk_time REAL)''')
+ self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
# Sessions store
- self.cursor.execute('''CREATE TABLE sessions (
- session_key VARCHAR(255), session_time FLOAT(20),
- session_value VARCHAR(255))''')
- self.cursor.execute('''CREATE INDEX sessions_key_idx ON
+ self.sql('''CREATE TABLE sessions (
+ session_key VARCHAR(255), session_time REAL,
+ session_value TEXT)''')
+ self.sql('''CREATE INDEX sessions_key_idx ON
sessions(session_key)''')
# full-text indexing store
- self.cursor.execute('CREATE SEQUENCE ___textids_ids')
- self.cursor.execute('''CREATE TABLE __textids (
+ self.sql('CREATE SEQUENCE ___textids_ids')
+ self.sql('''CREATE TABLE __textids (
_textid integer primary key, _class VARCHAR(255),
_itemid VARCHAR(255), _prop VARCHAR(255))''')
- self.cursor.execute('''CREATE TABLE __words (_word VARCHAR(30),
+ self.sql('''CREATE TABLE __words (_word VARCHAR(30),
_textid integer)''')
- self.cursor.execute('CREATE INDEX words_word_idx ON __words(_word)')
+ self.sql('CREATE INDEX words_word_idx ON __words(_word)')
+ self.sql('CREATE INDEX words_by_id ON __words (_textid)')
+ self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
+ '__textids (_class, _itemid, _prop)')
+
+ def fix_version_2_tables(self):
+ # Convert journal date column to TIMESTAMP, params column to TEXT
+ self._convert_journal_tables()
+
+ # Convert all String properties to TEXT
+ self._convert_string_properties()
+
+ # convert session / OTK *_time columns to REAL
+ for name in ('otk', 'session'):
+ self.sql('drop index %ss_key_idx'%name)
+ self.sql('drop table %ss'%name)
+ self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255),
+ %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name,
+ name))
+ self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name,
+ name))
+
+ def fix_version_3_tables(self):
+ rdbms_common.Database.fix_version_3_tables(self)
+ self.sql('''CREATE INDEX words_both_idx ON public.__words
+ USING btree (_word, _textid)''')
def add_actor_column(self):
# update existing tables to have the new actor column
tables = self.database_schema['tables']
for name in tables.keys():
- self.cursor.execute('ALTER TABLE _%s add __actor '
- 'VARCHAR(255)'%name)
+ self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name)
def __repr__(self):
return '<roundpsycopgsql 0x%x>' % id(self)
+ def sql_commit(self, fail_ok=False):
+ ''' Actually commit to the database.
+ '''
+ logging.getLogger('hyperdb').info('commit')
+
+ try:
+ self.conn.commit()
+ except psycopg.ProgrammingError, message:
+ # we've been instructed that this commit is allowed to fail
+ if fail_ok and str(message).endswith('could not serialize '
+ 'access due to concurrent update'):
+ logging.getLogger('hyperdb').info('commit FAILED, but fail_ok')
+ else:
+ raise
+
+ # open a new cursor for subsequent work
+ self.cursor = self.conn.cursor()
+
def sql_stringquote(self, value):
''' psycopg.QuotedString returns a "buffer" object with the
single-quotes around it... '''
- return str(psycopg.QuotedString(str(value)))[1:-1]
+ return str(QuotedString(str(value)))[1:-1]
def sql_index_exists(self, table_name, index_name):
sql = 'select count(*) from pg_indexes where ' \
'tablename=%s and indexname=%s'%(self.arg, self.arg)
- self.cursor.execute(sql, (table_name, index_name))
+ self.sql(sql, (table_name, index_name))
return self.cursor.fetchone()[0]
- def create_class_table(self, spec):
- sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
- if __debug__:
- print >>hyperdb.DEBUG, 'create_class_table', (self, sql)
- self.cursor.execute(sql)
+ def create_class_table(self, spec, create_sequence=1):
+ if create_sequence:
+ sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
+ self.sql(sql)
return rdbms_common.Database.create_class_table(self, spec)
def drop_class_table(self, cn):
sql = 'drop table _%s'%cn
- if __debug__:
- print >>hyperdb.DEBUG, 'drop_class', (self, sql)
- self.cursor.execute(sql)
+ self.sql(sql)
sql = 'drop sequence _%s_ids'%cn
- if __debug__:
- print >>hyperdb.DEBUG, 'drop_class', (self, sql)
- self.cursor.execute(sql)
-
- def create_journal_table(self, spec):
- cols = ',' . join(['"%s" VARCHAR(255)'%x
- for x in 'nodeid date tag action params' . split()])
- sql = 'CREATE TABLE "%s__journal" (%s)'%(spec.classname, cols)
- if __debug__:
- print >>hyperdb.DEBUG, 'create_journal_table', (self, sql)
- self.cursor.execute(sql)
- self.create_journal_table_indexes(spec)
-
- def create_multilink_table(self, spec, ml):
- sql = '''CREATE TABLE "%s_%s" (linkid VARCHAR(255),
- nodeid VARCHAR(255))'''%(spec.classname, ml)
-
- if __debug__:
- print >>hyperdb.DEBUG, 'create_class', (self, sql)
-
- self.cursor.execute(sql)
- self.create_multilink_table_indexes(spec, ml)
+ self.sql(sql)
def newid(self, classname):
sql = "select nextval('_%s_ids') from dual"%classname
- if __debug__:
- print >>hyperdb.DEBUG, 'setid', (self, sql)
- self.cursor.execute(sql)
- return self.cursor.fetchone()[0]
+ self.sql(sql)
+ return str(self.cursor.fetchone()[0])
def setid(self, classname, setid):
sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid))
- if __debug__:
- print >>hyperdb.DEBUG, 'setid', (self, sql)
- self.cursor.execute(sql)
+ self.sql(sql)
+
+ def clear(self):
+ rdbms_common.Database.clear(self)
+
+ # reset the sequences
+ for cn in self.classes.keys():
+ self.cursor.execute('DROP SEQUENCE _%s_ids'%cn)
+ self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn)
+class PostgresqlClass:
+ order_by_null_values = '(%s is not NULL)'
-class Class(rdbms_common.Class):
+class Class(PostgresqlClass, rdbms_common.Class):
pass
-class IssueClass(rdbms_common.IssueClass):
+class IssueClass(PostgresqlClass, rdbms_common.IssueClass):
pass
-class FileClass(rdbms_common.FileClass):
+class FileClass(PostgresqlClass, rdbms_common.FileClass):
pass
+# vim: set et sts=4 sw=4 :