Source code for rattail.importing.postgresql
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2018 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
PostgreSQL data importers
"""
from __future__ import unicode_literals, absolute_import
import os
import datetime
import logging
import six
from rattail.importing import BulkImporter, ToSQLAlchemy
from rattail.time import make_utc
log = logging.getLogger(__name__)
[docs]
class BulkToPostgreSQL(BulkImporter, ToSQLAlchemy):
"""
Base class for bulk data importers which target PostgreSQL on the local side.
"""
@property
def data_path(self):
return os.path.join(self.config.workdir(),
'import_bulk_postgresql_{}.csv'.format(self.model_name))
[docs]
def setup(self):
self.data_buffer = open(self.data_path, 'wb')
[docs]
def teardown(self):
self.data_buffer.close()
os.remove(self.data_path)
self.data_buffer = None
[docs]
def create_object(self, key, data):
data = self.prep_data_for_postgres(data)
self.data_buffer.write('{}\n'.format('\t'.join([data[field] for field in self.fields])).encode('utf-8'))
def prep_data_for_postgres(self, data):
data = dict(data)
for key, value in data.items():
data[key] = self.prep_value_for_postgres(value)
return data
def prep_value_for_postgres(self, value):
if value is None:
return '\\N'
if value is True:
return 't'
if value is False:
return 'f'
if isinstance(value, datetime.datetime):
value = make_utc(value, tzinfo=False)
elif isinstance(value, six.string_types):
value = value.replace('\\', '\\\\')
value = value.replace('\r', '\\r')
value = value.replace('\n', '\\n')
value = value.replace('\t', '\\t') # TODO: add test for this
return six.text_type(value)
[docs]
def flush_create_update(self):
pass
[docs]
def flush_create_update_final(self):
log.info("copying {} data from buffer to PostgreSQL".format(self.model_name))
self.data_buffer.close()
self.data_buffer = open(self.data_path, 'rb')
cursor = self.session.connection().connection.cursor()
table_name = '"{}"'.format(self.model_table.name)
cursor.copy_from(self.data_buffer, table_name, columns=self.fields)
log.debug("PostgreSQL data copy completed")