# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
DataSync for Rattail
"""
import logging
from sqlalchemy import orm
from rattail.db.util import make_topo_sortkey
from rattail.datasync import DataSyncWatcher, DataSyncConsumer, DataSyncImportConsumer
from rattail.config import parse_list
log = logging.getLogger(__name__)
[docs]
class RattailWatcher(DataSyncWatcher):
"""
DataSync watcher for Rattail databases.
"""
prunes_changes = True
def __init__(self, *args, **kwargs):
super(RattailWatcher, self).__init__(*args, **kwargs)
self.engine = self.config.rattail_engines[self.dbkey]
self.topo_sortkey = make_topo_sortkey(self.config.get_model())
[docs]
def get_changes(self, lastrun):
"""
Checks the :class:`~rattail.db.model.core.Change` table in a
Rattail database, to see if there are any pending changes for
the datasync daemon.
"""
model = self.model
session = self.app.make_session(bind=self.engine)
changes = session.query(model.Change).all()
session.expunge_all()
# nb. fetch config flag while we have session open
deleted_first = self.config.getbool(
'rattail.datasync', 'rattail_watcher_deleted_first',
session=session,
default=False)
session.close()
if not changes:
return
# here we sort our payload types (class names) topologically, to take
# foreign key dependencies into account etc. but we start with a
# simple lexical sort on class name, just for kicks maybe..?
class_names = sorted(set([c.class_name for c in changes]))
class_names.sort(key=self.topo_sortkey)
# collect datasync changes for new/dirty and deleted
dirty = self.get_new_dirty(changes, class_names)
deleted = self.get_deleted(changes, class_names)
# we traditionally have processed new/dirty first, then
# deleted. but config can reverse that, to test new logic
# which is meant to help with one particular scenario: when a
# record with "unique code" is effectively moved; the new
# record can't be created until the old is deleted
# TODO: once "deleted first" has been tested, should make it
# the default, unless there is some reason not to...
# TODO: original logic comments said: note that we must first
# add new/dirty changes only, then we add deleted at the end.
# so that deletes are processed last, by the consumer(s), to
# hopefully avoid dependency issues
if deleted_first:
final = deleted + dirty
else:
final = dirty + deleted
# when a Product has a price relationship change (e.g. it is
# given or loses the "current price"), the Product record is
# the only thing technically updated, but the simple Rattail
# -> Rattail sync will use an importer which *skips* those
# price relation fields. so here we explicitly add changes to
# the queue, which can force processing of the price relation
# changes. and we definitely want these to happen *last*.
if 'Product' in class_names:
for change in [c for c in changes
if c.class_name == 'Product' and not c.deleted]:
final.append((None, # no uuid since this need not be pruned
model.DataSyncChange(
payload_type='ProductPriceAssociation',
payload_key=change.instance_uuid)))
return final
def get_new_dirty(self, changes, class_names):
model = self.model
result = []
for class_name in class_names:
for change in [c for c in changes
if c.class_name == class_name
and not c.deleted]:
result.append((change.uuid,
model.DataSyncChange(
payload_type=class_name,
payload_key=change.instance_uuid,
deletion=change.deleted)))
return result
def get_deleted(self, changes, class_names):
model = self.model
result = []
for class_name in class_names:
for change in [c for c in changes
if c.class_name == class_name
and c.deleted]:
result.append((change.uuid,
model.DataSyncChange(
payload_type=class_name,
payload_key=change.instance_uuid,
deletion=change.deleted)))
return result
[docs]
def prune_changes(self, keys):
model = self.model
deleted = 0
try:
session = self.app.make_session(bind=self.engine)
except:
log.exception("failed to make session for pruning changes")
raise
try:
for key in keys:
if key: # note that key can sometimes be None
change = session.get(model.Change, key)
if change:
session.delete(change)
session.flush()
deleted += 1
except:
log.exception("failed to prune changes")
session.rollback()
raise
else:
session.commit()
finally:
session.close()
return deleted
# TODO: this is awful and needs to go away!
[docs]
class RattailConsumer(DataSyncConsumer):
"""
DataSync consumer for Rattail databases.
"""
# Set this to a sequence of model names if you wish to *consume* data
# *only* for those models (and ignore all others).
consume_models = None
# Set this to a sequence of model names if you wish to *ignore* data *only*
# for those models (and consume all others).
ignore_models = None
def __init__(self, *args, **kwargs):
super(RattailConsumer, self).__init__(*args, **kwargs)
self.engine = self.config.rattail_engines[self.dbkey]
self.model = self.get_data_model()
self.topo_sortkey = make_topo_sortkey(self.model)
# TODO: deprecate / remove this
[docs]
def get_data_model(self):
"""
Subclasses may override this if they have extended the schema.
Defaults to ``rattail.db.model``.
"""
return self.config.get_model()
[docs]
def process_changes(self, host_session, changes):
"""
Process changes for a Rattail database.
"""
# First determine which models are represented in the change set. Some
# models may be ignored, depending on the subclass logic etc.
class_names = set([c.payload_type for c in changes])
if self.consume_models is not None:
class_names = [c for c in class_names if c in self.consume_models]
elif self.ignore_models is not None:
class_names = [c for c in class_names if c not in self.ignore_models]
# The order will matter because of table foreign key dependencies. We
# start with a lexical sort just for kicks maybe...
class_names = sorted(class_names)
class_names.sort(key=self.topo_sortkey)
session = self.app.make_session(bind=self.engine)
if self.runas_username:
session.set_continuum_user(self.runas_username)
for class_name in class_names:
# this consumer does not need to process these changes; see
# FromRattailToRattailBase for one which can process them
if class_name == 'ProductPriceAssociation':
continue
cls = getattr(self.model, class_name)
for change in [c for c in changes if c.payload_type == class_name]:
log.debug("processing {0} for {1} {2}".format(
"deletion" if change.deletion else "change",
change.payload_type, change.payload_key))
if change.deletion:
instance = session.get(cls, change.payload_key)
if instance:
self.delete_instance(session, instance)
session.flush()
else:
log.warning("could not find instance to delete")
else: # add/update
host_instance = host_session.get(cls, change.payload_key)
if host_instance:
# if processing a pack item, must process its unit item
# first. this is because both may be "new" in which
# case the unit item must exist before the pack may
# reference it.
if class_name == 'Product' and host_instance.is_pack_item():
self.merge_instance(session, host_instance.unit)
self.merge_instance(session, host_instance)
session.flush()
else:
log.warning("could not find host instance to merge")
session.commit()
session.close()
[docs]
def merge_instance(self, session, instance):
"""
Merge the given model instance into the given database session.
Subclasses may define model-specific methods instead of or in addition
to overriding this generic one.
"""
class_name = instance.__class__.__name__.lower()
merger = getattr(self, 'merge_{0}'.format(class_name), None)
if merger:
return merger(session, instance)
# Nothing special defined, so just do the normal thing.
return session.merge(instance)
[docs]
def delete_instance(self, session, instance):
"""
Delete the given model instance from the given database session.
Subclasses may define model-specific methods instead of or in addition
to overriding this generic one.
"""
class_name = instance.__class__.__name__.lower()
deleter = getattr(self, 'delete_{0}'.format(class_name), None)
if deleter:
return deleter(session, instance)
predeleter = getattr(self, 'predelete_{0}'.format(class_name), None)
if predeleter:
predeleter(session, instance)
session.delete(instance)
[docs]
def merge_product(self, session, source_product):
"""
This method is somewhat of a hack, in order to properly handle
:class:`rattail.db.model.Product` instances and the interdependent
nature of the related :class:`rattail.db.model.ProductPrice` instances.
"""
target_product = session.merge(source_product)
# I'm not 100% sure I understand this correctly, but here's my
# thinking: First we clear the price relationships in case they've
# actually gone away; then we re-establish any which are currently
# valid.
# Setting the price relationship attributes to ``None`` isn't enough to
# force the ORM to notice a change, since the UUID field is ultimately
# what it's watching. So we must explicitly use that field here.
target_product.regular_price_uuid = None
target_product.tpr_price_uuid = None
target_product.sale_price_uuid = None
target_product.current_price_uuid = None
target_product.suggested_price_uuid = None
# If the source instance has currently valid price relationships, then
# we re-establish them. We must merge the source price instance in
# order to be certain it will exist in the target session, and avoid
# foreign key errors. However we *still* must also set the UUID fields
# because again, the ORM is watching those... This was noticed to be
# the source of some bugs where successive database syncs were
# effectively "toggling" the price relationship. Setting the UUID
# field explicitly seems to solve it.
if source_product.regular_price_uuid:
target_product.regular_price = session.merge(source_product.regular_price)
target_product.regular_price_uuid = target_product.regular_price.uuid
if source_product.tpr_price_uuid:
target_product.tpr_price = session.merge(source_product.tpr_price)
target_product.tpr_price_uuid = target_product.tpr_price.uuid
if source_product.sale_price_uuid:
target_product.sale_price = session.merge(source_product.sale_price)
target_product.sale_price_uuid = target_product.sale_price.uuid
if source_product.current_price_uuid:
target_product.current_price = session.merge(source_product.current_price)
target_product.current_price_uuid = target_product.current_price.uuid
if source_product.suggested_price_uuid:
target_product.suggested_price = session.merge(source_product.suggested_price)
target_product.suggested_price_uuid = target_product.suggested_price.uuid
return target_product
def predelete_department(self, session, department):
# Disconnect from all subdepartments.
q = session.query(model.Subdepartment).filter(
model.Subdepartment.department == department)
for subdept in q:
subdept.department = None
# Disconnect from all products.
q = session.query(model.Product).filter(
model.Product.department == department)
for product in q:
product.department = None
def predelete_subdepartment(self, session, subdepartment):
# Disconnect from all products.
q = session.query(model.Product).filter(
model.Product.subdepartment == subdepartment)
for product in q:
product.subdepartment = None
def predelete_family(self, session, family):
# Disconnect from all products.
q = session.query(model.Product).filter(
model.Product.family == family)
for product in q:
product.family = None
def predelete_vendor(self, session, vendor):
# Remove all product costs.
q = session.query(model.ProductCost).filter(
model.ProductCost.vendor == vendor)
for cost in q:
session.delete(cost)
def predelete_customergroup(self, session, group):
# Disconnect from all customers.
q = session.query(model.CustomerGroupAssignment).filter(
model.CustomerGroupAssignment.group == group)
for assignment in q:
session.delete(assignment)
[docs]
class FromRattailToRattailBase(DataSyncImportConsumer):
"""
Base class for Rattail -> Rattail datasync consumers
"""
[docs]
def setup(self):
super(FromRattailToRattailBase, self).setup()
self.topo_sortkey = make_topo_sortkey(self.model)
self.handle_price_xref = self.get_handle_price_xref()
def get_handle_price_xref(self):
if hasattr(self, 'handle_price_xref'):
return self.handle_price_xref
# TODO: this should default to True, once logic is proven
return self.config.getbool(
'rattail.datasync', 'rattail.handle_price_xref',
default=False)
[docs]
def get_importers(self):
importers = super(FromRattailToRattailBase, self).get_importers()
# add the GlobalRole importer, in place of Role, unless latter
# is already present.
if 'Role' not in importers:
importers['Role'] = self.handler.get_importer('GlobalRole')
# maybe add importer to handle product price reference changes
if self.get_handle_price_xref() and 'ProductPriceAssociation' not in importers:
importers['ProductPriceAssociation'] = self.handler.get_importer('ProductPriceAssociation')
return importers
[docs]
def get_host_object(self, session, change):
if change.payload_type == 'ProductPriceAssociation':
cls = self.model.Product
else:
cls = getattr(self.model, change.payload_type)
return session.get(cls, change.payload_key)
[docs]
class FromRattailToRattailExportConsumer(FromRattailToRattailBase):
"""
Export data changes from "local" Rattail to another
"""
handler_spec = None
def __init__(self, *args, **kwargs):
super(FromRattailToRattailExportConsumer, self).__init__(*args, **kwargs)
self.target_engine = self.config.rattail_engines[self.dbkey]
self.model = self.config.get_model()
def make_target_session(self):
return self.app.make_session(bind=self.target_engine)
[docs]
def begin_transaction(self):
self.local_session = self.app.make_session()
self.target_session = self.make_target_session()
[docs]
def rollback_transaction(self):
self.target_session.rollback()
self.target_session.close()
self.local_session.rollback()
self.local_session.close()
[docs]
def commit_transaction(self):
self.local_session.commit()
self.local_session.close()
self.target_session.commit()
self.target_session.close()
[docs]
def pre_process_changes(self, session, changes):
if self.runas_username:
self.target_session.set_continuum_user(self.runas_username)
# update all importers with current sessions
for importer in self.importers.values():
importer.host_session = self.local_session
importer.session = self.target_session
[docs]
class FromRattailToRattailImportConsumer(FromRattailToRattailBase):
"""
Import data changes from another Rattail to "local"
"""
handler_spec = None
def __init__(self, *args, **kwargs):
super(FromRattailToRattailImportConsumer, self).__init__(*args, **kwargs)
self.host_engine = self.config.rattail_engines[self.watcher.dbkey]
self.local_engine = self.config.rattail_engines[self.dbkey]
self.model = self.config.get_model()
[docs]
def process_changes(self, session, changes):
self.host_session = self.app.make_session(bind=self.host_engine)
local_session = self.app.make_session(bind=self.local_engine)
if self.runas_username:
local_session.set_continuum_user(self.runas_username)
# update all importers with current sessions
for importer in self.importers.values():
importer.host_session = self.host_session
importer.session = local_session
# topographically sort changes, i.e. per schema table dependencies.
# we start with a lexical sort just for kicks maybe...
class_names = set([c.payload_type for c in changes])
class_names = sorted(class_names)
class_names.sort(key=self.topo_sortkey)
for class_name in class_names:
cls = getattr(self.model, class_name)
for change in [c for c in changes if c.payload_type == class_name]:
self.invoke_importer(session, change)
local_session.commit()
local_session.close()
# TODO: should we ever commit this..?
# self.host_session.commit()
self.host_session.close()
[docs]
def get_host_object(self, session, change):
# we use base logic to get the object, but must substitute the
# session to ensure we read from actual host and not local
return super(FromRattailToRattailImportConsumer, self).get_host_object(
self.host_session, change)