Code

PostgreSQL backend minor improvement: database creation less likely to fail
[roundup.git] / roundup / backends / back_postgresql.py
1 #$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $
2 #
3 # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
4 #
5 # This module is free software, and you may redistribute it and/or modify
6 # under the same terms as Python, so long as this copyright message and
7 # disclaimer are retained in their original form.
8 #
9 '''Postgresql backend via psycopg for Roundup.'''
10 __docformat__ = 'restructuredtext'
12 import os, shutil, time
13 try:
14     import psycopg
15     from psycopg import QuotedString
16     from psycopg import ProgrammingError
17 except:
18     from psycopg2 import psycopg1 as psycopg
19     from psycopg2.extensions import QuotedString
20     from psycopg2.psycopg1 import ProgrammingError
21 import logging
23 from roundup import hyperdb, date
24 from roundup.backends import rdbms_common
25 from roundup.backends import sessions_rdbms
27 def connection_dict(config, dbnamestr=None):
28     ''' read_default_group is MySQL-specific, ignore it '''
29     d = rdbms_common.connection_dict(config, dbnamestr)
30     if 'read_default_group' in d:
31         del d['read_default_group']
32     if 'read_default_file' in d:
33         del d['read_default_file']
34     return d
36 def db_create(config):
37     """Clear all database contents and drop database itself"""
38     command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME
39     if config.RDBMS_TEMPLATE :
40         command = command + " TEMPLATE=%s" % config.RDBMS_TEMPLATE
41     logging.getLogger('roundup.hyperdb').info(command)
42     db_command(config, command)
44 def db_nuke(config, fail_ok=0):
45     """Clear all database contents and drop database itself"""
46     command = 'DROP DATABASE %s'% config.RDBMS_NAME
47     logging.getLogger('roundup.hyperdb').info(command)
48     db_command(config, command)
50     if os.path.exists(config.DATABASE):
51         shutil.rmtree(config.DATABASE)
53 def db_command(config, command, database='postgres'):
54     '''Perform some sort of database-level command. Retry 10 times if we
55     fail by conflicting with another user.
57     Since PostgreSQL version 8.1 there is a database "postgres",
58     before "template1" seems to habe been used, so we fall back to it. 
59     Compare to issue2550543.
60     '''
61     template1 = connection_dict(config)
62     template1['database'] = database
64     try:
65         conn = psycopg.connect(**template1)
66     except psycopg.OperationalError, message:
67         if str(message).find('database "postgres" does not exist') >= 0:
68             return db_command(config, command, database='template1')
69         raise hyperdb.DatabaseError(message)
71     conn.set_isolation_level(0)
72     cursor = conn.cursor()
73     try:
74         for n in range(10):
75             if pg_command(cursor, command):
76                 return
77     finally:
78         conn.close()
79     raise RuntimeError('10 attempts to create database failed')
81 def pg_command(cursor, command):
82     '''Execute the postgresql command, which may be blocked by some other
83     user connecting to the database, and return a true value if it succeeds.
85     If there is a concurrent update, retry the command.
86     '''
87     try:
88         cursor.execute(command)
89     except psycopg.ProgrammingError, err:
90         response = str(err).split('\n')[0]
91         if response.find('FATAL') != -1:
92             raise RuntimeError(response)
93         else:
94             msgs = [
95                 'is being accessed by other users',
96                 'could not serialize access due to concurrent update',
97             ]
98             can_retry = 0
99             for msg in msgs:
100                 if response.find(msg) == -1:
101                     can_retry = 1
102             if can_retry:
103                 time.sleep(1)
104                 return 0
105             raise RuntimeError(response)
106     return 1
108 def db_exists(config):
109     """Check if database already exists"""
110     db = connection_dict(config, 'database')
111     try:
112         conn = psycopg.connect(**db)
113         conn.close()
114         return 1
115     except:
116         return 0
118 class Sessions(sessions_rdbms.Sessions):
119     def set(self, *args, **kwargs):
120         try:
121             sessions_rdbms.Sessions.set(self, *args, **kwargs)
122         except ProgrammingError, err:
123             response = str(err).split('\n')[0]
124             if -1 != response.find('ERROR') and \
125                -1 != response.find('could not serialize access due to concurrent update'):
126                 # another client just updated, and we're running on
127                 # serializable isolation.
128                 # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html
129                 self.db.rollback()
131 class Database(rdbms_common.Database):
132     arg = '%s'
134     # used by some code to switch styles of query
135     implements_intersect = 1
137     def getSessionManager(self):
138         return Sessions(self)
140     def sql_open_connection(self):
141         db = connection_dict(self.config, 'database')
142         logging.getLogger('roundup.hyperdb').info(
143             'open database %r'%db['database'])
144         try:
145             conn = psycopg.connect(**db)
146         except psycopg.OperationalError, message:
147             raise hyperdb.DatabaseError(message)
149         cursor = conn.cursor()
151         return (conn, cursor)
153     def open_connection(self):
154         if not db_exists(self.config):
155             db_create(self.config)
157         self.conn, self.cursor = self.sql_open_connection()
159         try:
160             self.load_dbschema()
161         except psycopg.ProgrammingError, message:
162             if str(message).find('schema') == -1:
163                 raise
164             self.rollback()
165             self.init_dbschema()
166             self.sql("CREATE TABLE schema (schema TEXT)")
167             self.sql("CREATE TABLE dual (dummy integer)")
168             self.sql("insert into dual values (1)")
169             self.create_version_2_tables()
171     def create_version_2_tables(self):
172         # OTK store
173         self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
174             otk_value TEXT, otk_time REAL)''')
175         self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
177         # Sessions store
178         self.sql('''CREATE TABLE sessions (
179             session_key VARCHAR(255), session_time REAL,
180             session_value TEXT)''')
181         self.sql('''CREATE INDEX sessions_key_idx ON
182             sessions(session_key)''')
184         # full-text indexing store
185         self.sql('CREATE SEQUENCE ___textids_ids')
186         self.sql('''CREATE TABLE __textids (
187             _textid integer primary key, _class VARCHAR(255),
188             _itemid VARCHAR(255), _prop VARCHAR(255))''')
189         self.sql('''CREATE TABLE __words (_word VARCHAR(30),
190             _textid integer)''')
191         self.sql('CREATE INDEX words_word_idx ON __words(_word)')
192         self.sql('CREATE INDEX words_by_id ON __words (_textid)')
193         self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
194                  '__textids (_class, _itemid, _prop)')
196     def fix_version_2_tables(self):
197         # Convert journal date column to TIMESTAMP, params column to TEXT
198         self._convert_journal_tables()
200         # Convert all String properties to TEXT
201         self._convert_string_properties()
203         # convert session / OTK *_time columns to REAL
204         for name in ('otk', 'session'):
205             self.sql('drop index %ss_key_idx'%name)
206             self.sql('drop table %ss'%name)
207             self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255),
208                 %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name,
209                 name))
210             self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name,
211                 name))
213     def fix_version_3_tables(self):
214         rdbms_common.Database.fix_version_3_tables(self)
215         self.sql('''CREATE INDEX words_both_idx ON public.__words
216             USING btree (_word, _textid)''')
218     def add_actor_column(self):
219         # update existing tables to have the new actor column
220         tables = self.database_schema['tables']
221         for name in tables:
222             self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name)
224     def __repr__(self):
225         return '<roundpsycopgsql 0x%x>' % id(self)
227     def sql_commit(self, fail_ok=False):
228         ''' Actually commit to the database.
229         '''
230         logging.getLogger('roundup.hyperdb').info('commit')
232         try:
233             self.conn.commit()
234         except psycopg.ProgrammingError, message:
235             # we've been instructed that this commit is allowed to fail
236             if fail_ok and str(message).endswith('could not serialize '
237                     'access due to concurrent update'):
238                 logging.getLogger('roundup.hyperdb').info(
239                     'commit FAILED, but fail_ok')
240             else:
241                 raise
243         # open a new cursor for subsequent work
244         self.cursor = self.conn.cursor()
246     def sql_stringquote(self, value):
247         ''' psycopg.QuotedString returns a "buffer" object with the
248             single-quotes around it... '''
249         return str(QuotedString(str(value)))[1:-1]
251     def sql_index_exists(self, table_name, index_name):
252         sql = 'select count(*) from pg_indexes where ' \
253             'tablename=%s and indexname=%s'%(self.arg, self.arg)
254         self.sql(sql, (table_name, index_name))
255         return self.cursor.fetchone()[0]
257     def create_class_table(self, spec, create_sequence=1):
258         if create_sequence:
259             sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
260             self.sql(sql)
262         return rdbms_common.Database.create_class_table(self, spec)
264     def drop_class_table(self, cn):
265         sql = 'drop table _%s'%cn
266         self.sql(sql)
268         sql = 'drop sequence _%s_ids'%cn
269         self.sql(sql)
271     def newid(self, classname):
272         sql = "select nextval('_%s_ids') from dual"%classname
273         self.sql(sql)
274         return str(self.cursor.fetchone()[0])
276     def setid(self, classname, setid):
277         sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid))
278         self.sql(sql)
280     def clear(self):
281         rdbms_common.Database.clear(self)
283         # reset the sequences
284         for cn in self.classes:
285             self.cursor.execute('DROP SEQUENCE _%s_ids'%cn)
286             self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn)
288 class PostgresqlClass:
289     order_by_null_values = '(%s is not NULL)'
291 class Class(PostgresqlClass, rdbms_common.Class):
292     pass
293 class IssueClass(PostgresqlClass, rdbms_common.IssueClass):
294     pass
295 class FileClass(PostgresqlClass, rdbms_common.FileClass):
296     pass
298 # vim: set et sts=4 sw=4 :