# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2022 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Handler for Vendor Catalog batches
"""
from __future__ import unicode_literals, absolute_import
import decimal
from sqlalchemy import orm
from rattail.db import model
from rattail.batch import BatchHandler
[docs]
class VendorCatalogHandler(BatchHandler):
"""
Handler for vendor catalog batches.
"""
batch_model_class = model.VendorCatalogBatch
# make sure web app knows to employ versioning workarounds
# TODO: i am actually not sure why these *always* seem to be needed for
# this batch? maybe the product relationships are too "tight" somehow?
populate_with_versioning = False
refresh_with_versioning = False
execute_with_versioning = False
version_catchup_execute = [
'ProductCost',
]
# can set these to e.g. Decimal('0.01') to "ignore" cost diffs below that
case_cost_diff_threshold = None
unit_cost_diff_threshold = None
[docs]
def allow_future(self):
"""
Returns boolean indicating whether "future" cost changes
should be allowed.
:returns: ``True`` if future cost changes allowed; else ``False``.
"""
return self.config.getbool('rattail.batch', 'vendor_catalog.allow_future',
default=False)
[docs]
def should_populate(self, batch):
# all vendor catalogs must come from data file
return True
def setup(self, batch, progress=None):
model = self.model
# TODO: deprecate / remove this
self.vendor = batch.vendor
# maybe pre-cache all products
if batch.get_param('cache_products'):
self.products = {'upc': {}, 'vendor_code': {}}
session = self.app.get_session(batch)
products = session.query(model.Product)\
.options(orm.joinedload(model.Product.brand))\
.options(orm.joinedload(model.Product.costs))\
.all()
def cache(product, i):
if product.upc:
self.products['upc'][product.upc] = product
cost = product.cost_for_vendor(batch.vendor)
product.vendor_cost = cost
if cost and cost.code:
self.products['vendor_code'][cost.code] = product
self.progress_loop(cache, products, progress,
message="Caching products by UPC and vendor code")
setup_populate = setup
setup_refresh = setup
[docs]
def populate(self, batch, progress=None):
"""
Default logic just invokes :meth:`populate_from_file()`.
"""
return self.populate_from_file(batch, progress=progress)
[docs]
def populate_from_file(self, batch, progress=None):
"""
Populate the given batch using data from its input file. A
catalog parser will be instantiated and asked to read row data
from the file. Each row is then added to the batch.
The batch must have valid
:attr:`~rattail.db.model.batch.vendorcatalog.VendorCatalogBatch.filename`
and
:attr:`~rattail.db.model.batch.vendorcatalog.VendorCatalogBatch.parser_key`
attributes. The path to the input file will be determined by
invoking the
:meth:`~rattail.db.model.batch.core.BatchMixin.filepath()`
method on the batch.
:param batch: The batch to be populated.
:param progress: Optional progress factory.
"""
if not batch.filename:
raise ValueError("batch does not have a filename: {}".format(batch))
if not batch.parser_key:
raise ValueError("batch does not have a parser_key: {}".format(batch))
session = self.app.get_session(batch)
path = batch.filepath(self.config)
vendor_handler = self.app.get_vendor_handler()
parser = vendor_handler.get_catalog_parser(batch.parser_key,
require=True)
parser.session = session
parser.vendor = batch.vendor
if not batch.effective:
batch.effective = parser.parse_effective_date(path)
self._input_has_case_sizes = False
batch.set_param('input_has_case_sizes', False)
self._input_has_vendor_codes = False
batch.set_param('input_has_vendor_codes', False)
def append(row, i):
if not self._input_has_case_sizes and row.case_size is not None:
self._input_has_case_sizes = True
batch.set_param('input_has_case_sizes', True)
if not self._input_has_vendor_codes and row.vendor_code is not None:
self._input_has_vendor_codes = True
batch.set_param('input_has_vendor_codes', True)
self.add_row(batch, row)
if i % 500 == 0: # pragma: no cover
session.flush()
data = list(parser.parse_rows(path, progress=progress))
self.progress_loop(append, data, progress,
message="Adding initial rows to batch")
[docs]
def identify_product(self, row):
"""
Try to locate the product represented by the given row.
Lookups are done using either the ``upc`` or ``vendor_code``
attributes of the row.
Under normal circumstances the batch handler will have
pre-cached all existing products, for quicker lookup. For
instance this is the case for the full populate and refresh
actions. But this logic is able to do its own slower lookups
if there is no cache available.
:param row: A
:class:`~rattail.db.model.batch.vendorcatalog.VendorCatalogBatchRow`
instance.
:returns: A :class:`~rattail.db.model.products.Product`
instance, or ``None`` if no match could be found.
"""
products_handler = self.app.get_products_handler()
session = self.app.get_session(row)
# first try generic logic based on raw entry
# TODO: this does not use the setup cache (self.products),
# which means there is no point in even having a cache b/c we
# take a hit up front when making it and then continually as
# we query for items. either should improve the cache usage
# or abandon it...
if row.item_entry:
product = products_handler.locate_product_for_entry(
session, row.item_entry)
if product:
return product
# then fall back to whatever catalog-specific logic we do
# (using cache when possible)
product = None
if row.upc:
if hasattr(self, 'products'):
product = self.products['upc'].get(row.upc)
else:
product = products_handler.locate_product_for_gpc(
session, row.upc)
if not product and row.vendor_code:
if hasattr(self, 'products'):
product = self.products['vendor_code'].get(row.vendor_code)
else:
product = products_handler.locate_product_for_vendor_code(
session, row.vendor_code, vendor=row.batch.vendor)
return product
[docs]
def refresh_row(self, row):
"""
Refresh data attributes and status for the given row.
For a vendor catalog, the typical thing is done for basic
product attributes.
If case cost is known but unit cost is not, the latter will be
calculated if possible.
"Old" (i.e. "current" prior to batch execution) values will
all be re-fetched from the main database(s), and "diff" values
will be re-calculated.
:param row: A
:class:`~rattail.db.model.batch.vendorcatalog.VendorCatalogBatchRow`
instance.
"""
batch = row.batch
# clear this first in case it's set
row.status_text = None
if not row.product:
row.product = self.identify_product(row)
if not row.product:
row.status_code = row.STATUS_PRODUCT_NOT_FOUND
return
product = row.product
row.upc = row.product.upc
row.item_id = row.product.item_id
row.brand_name = row.product.brand.name if row.product.brand else None
row.description = row.product.description
row.size = row.product.size
# maybe get case size from product master
if (batch.has_param('input_has_case_sizes')
and not batch.get_param('input_has_case_sizes')):
products_handler = self.app.get_products_handler()
row.case_size = products_handler.get_case_size(product)
# maybe calculate unit cost from case cost
if row.unit_cost is None and row.case_cost is not None:
if row.case_size is not None:
# nb. sometimes both case size and cost are integers,
# and simple division yields a float! this approach
# should hopefully work regardless, to get a decimal.
row.unit_cost = decimal.Decimal('{:0.4f}'.format(
row.case_cost / row.case_size))
if hasattr(product, 'vendor_cost'):
old_cost = product.vendor_cost
else:
old_cost = product.cost_for_vendor(batch.vendor)
if not old_cost:
row.status_code = row.STATUS_NEW_COST
return
# maybe get vendor code from product master
if (batch.has_param('input_has_vendor_codes')
and not batch.get_param('input_has_vendor_codes')):
row.vendor_code = old_cost.code
row.cost = old_cost
row.old_vendor_code = old_cost.code
row.old_case_size = old_cost.case_size
row.old_case_cost = old_cost.case_cost
row.old_unit_cost = old_cost.unit_cost
# only consider vendor match if product does in fact have a vendor
# TODO: at least i assume that's a reasonable idea?
if row.product.costs:
row.is_preferred_vendor = row.product.costs[0].vendor is row.batch.vendor
self.refresh_cost_diffs(row)
self.set_status_per_diffs(row)
def refresh_cost_diffs(self, row):
# old_case_cost
if row.case_cost is not None and row.old_case_cost is not None:
row.case_cost_diff = row.case_cost - row.old_case_cost
# old_unit_cost
if row.unit_cost is not None and row.old_unit_cost is not None:
row.unit_cost_diff = row.unit_cost - row.old_unit_cost
if row.old_unit_cost:
row.unit_cost_diff_percent = 100 * row.unit_cost_diff / row.old_unit_cost
else:
row.unit_cost_diff_percent = 100
def set_status_per_diffs(self, row):
if row.vendor_code != row.old_vendor_code:
row.status_code = row.STATUS_CHANGE_VENDOR_ITEM_CODE
row.status_text = "new vendor item code {} differs from old code {}".format(
repr(row.vendor_code), repr(row.old_vendor_code))
return
if row.case_size != row.old_case_size:
row.status_code = row.STATUS_CHANGE_CASE_SIZE
row.status_text = "new case size {} differs from old case size {}".format(
repr(row.case_size), repr(row.old_case_size))
return
if row.case_cost != row.old_case_cost:
diff_meets_threshold = True
if self.case_cost_diff_threshold and self.case_cost_diff_threshold > abs(
row.case_cost - row.old_case_cost):
diff_meets_threshold = False
if diff_meets_threshold:
row.status_code = row.STATUS_CHANGE_COST
row.status_text = "new case cost {} differs from old cost {}".format(
repr(row.case_cost), repr(row.old_case_cost))
return
if row.unit_cost != row.old_unit_cost:
diff_meets_threshold = True
if (self.unit_cost_diff_threshold
and row.unit_cost is not None and row.old_unit_cost is not None
and self.unit_cost_diff_threshold > abs(
row.unit_cost - row.old_unit_cost)):
diff_meets_threshold = False
if diff_meets_threshold:
row.status_code = row.STATUS_CHANGE_COST
row.status_text = "new unit cost {} differs from old cost {}".format(
repr(row.unit_cost), repr(row.old_unit_cost))
return
row.status_code = row.STATUS_NO_CHANGE
# TODO: who uses this..?
[docs]
def cost_differs(self, row, cost):
"""
Compare a batch row with a cost record to determine whether they match
or differ.
"""
if row.vendor_code is not None and row.vendor_code != cost.code:
return "new vendor code {} differs from old code {}".format(
repr(row.vendor_code), repr(cost.code))
if row.case_cost is not None and row.case_cost != cost.case_cost:
return "new case cost {} differs from old cost {}".format(
row.case_cost, cost.case_cost)
if row.unit_cost is not None and row.unit_cost != cost.unit_cost:
return "new unit cost {} differs from old cost {}".format(
row.unit_cost, cost.unit_cost)