Source code for rattail.batch.labels

# -*- coding: utf-8; -*-
################################################################################
#
#  Rattail -- Retail Software Framework
#  Copyright © 2010-2023 Lance Edgar
#
#  This file is part of Rattail.
#
#  Rattail is free software: you can redistribute it and/or modify it under the
#  terms of the GNU General Public License as published by the Free Software
#  Foundation, either version 3 of the License, or (at your option) any later
#  version.
#
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
#  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
#  details.
#
#  You should have received a copy of the GNU General Public License along with
#  Rattail.  If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Handler for label batches
"""

import csv
import decimal
import logging

import json
import sqlalchemy as sa
from sqlalchemy import orm

from rattail import enum
from rattail.db.model import LabelBatch
from rattail.gpc import GPC
from rattail.batch import BatchHandler
from rattail.csvutil import UnicodeDictReader
from rattail.time import make_utc
from rattail.config import parse_bool


log = logging.getLogger(__name__)


[docs] class LabelBatchHandler(BatchHandler): """ Handler for Print Labels batches. """ batch_model_class = LabelBatch def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.products_handler = self.app.get_products_handler() def setup(self, batch, progress=None): self.now = make_utc() setup_populate = setup setup_refresh = setup setup_clone = setup
[docs] def make_batch(self, session, progress=None, **kwargs): """ Make a new batch, with initial rows if applicable. """ self.skip_first_line = parse_bool(kwargs.pop('skip_first_line', False)) self.calc_check_digit = kwargs.pop('calc_check_digit', False) if self.calc_check_digit != 'upc': self.calc_check_digit = parse_bool(self.calc_check_digit) file_has_options = parse_bool(kwargs.pop('file_has_options', False)) batch = super(LabelBatchHandler, self).make_batch(session, progress, **kwargs) batch.file_has_options = file_has_options return batch
[docs] def auto_executable(self, batch): """ Must return a boolean indicating whether the given bath is eligible for "automatic" execution, i.e. immediately after batch is created. """ if batch.filename and '.autoexecute.' in batch.filename: return True return False
[docs] def populate(self, batch, progress=None): """ Pre-fill batch with row data from handheld batch, etc. """ model = self.model session = self.app.get_session(batch) if batch.label_profile: self.label_profile = batch.label_profile else: self.label_profile = self.get_label_profile(session) if hasattr(batch, 'product_batch') and batch.product_batch: self.populate_from_product_batch(batch, progress=progress) return assert batch.handheld_batch or batch.filename or batch.products label_code = self.label_profile.code if self.label_profile else None def append(item, i): row = self.make_row() row.label_code = label_code row.label_profile = self.label_profile with session.no_autoflush: if isinstance(item, model.Product): row.product = item row.label_quantity = 1 if batch.static_prices and hasattr(item, '_batch_price'): row.regular_price = item._batch_price else: # item is handheld batch row row.product = item.product row.label_quantity = item.units or 1 # copy these in case product is null row.item_entry = item.item_entry row.item_id = item.item_id row.upc = item.upc row.brand_name = item.brand_name row.description = item.description row.size = item.size self.add_row(batch, row) if i % 200 == 0: session.flush() if batch.handheld_batch: data = batch.handheld_batch.active_rows() elif batch.filename: if batch.file_has_options: self.set_options_from_file(batch) if batch.label_profile: self.label_profile = batch.label_profile data = self.read_products_from_file(batch, progress=progress) elif batch.products: data = batch.products self.progress_loop(append, data, progress, message="Adding initial rows to batch")
[docs] def populate_from_product_batch(self, batch, progress=None): """ Populate label batch from product batch. """ session = orm.object_session(batch) product_batch = batch.product_batch label_code = self.label_profile.code if self.label_profile else None def add(prow, i): row = self.make_row() row.label_code = label_code row.label_profile = self.label_profile row.label_quantity = 1 with session.no_autoflush: row.product = prow.product self.add_row(batch, row) if i % 200 == 0: session.flush() self.progress_loop(add, product_batch.active_rows(), progress, message="Adding initial rows to batch")
[docs] def set_options_from_file(self, batch): """ Set various batch options, if any are present within the data file. """ model = self.model path = batch.filepath(self.config) with open(path, 'rt') as f: options = json.loads(f.readline()) if 'description' in options and options['description']: batch.description = options['description'] if 'notes' in options and options['notes']: batch.notes = options['notes'] if 'static_prices' in options: batch.static_prices = options['static_prices'] if 'label_code' in options: batch.label_code = options['label_code'] if batch.label_code: session = orm.object_session(batch) batch.label_profile = session.query(model.LabelProfile)\ .filter(model.LabelProfile.code == batch.label_code)\ .one()
[docs] def read_products_from_file(self, batch, progress=None): """ Returns list of Product objects based on lookup from CSV file data. # TODO: should this actually happen here? vs refresh and just mark product not found? """ path = batch.filepath(self.config) with open(path, 'rt') as f: if self.skip_first_line: f.readline() reader = csv.reader(f) data = [{'upc': row[0]} for row in reader] else: fields = None if batch.file_has_options: f.readline() reader = csv.reader(f) fields = next(reader) f.seek(0) f.readline() f.readline() reader = UnicodeDictReader(f, fieldnames=fields) data = list(reader) products = [] session = orm.object_session(batch) def append(entry, i): upc = entry['upc'].strip() if upc: try: upc = GPC(upc, calc_check_digit=self.calc_check_digit) except ValueError: pass else: product = self.products_handler.locate_product_for_gpc(session, upc) if product: if batch.static_prices and entry['regular_price']: product._batch_price = decimal.Decimal(entry['regular_price']) products.append(product) else: log.warning("product not found: {}".format(upc)) self.progress_loop(append, data, progress, message="Reading data from CSV file") return products
def get_label_profile(self, session): model = self.model code = self.config.get('rattail.batch', 'labels.default_code') if code: return session.query(model.LabelProfile)\ .filter(model.LabelProfile.code == code)\ .one() else: return session.query(model.LabelProfile)\ .order_by(model.LabelProfile.ordinal)\ .first()
[docs] def refresh_row(self, row): """ Inspect a row from the source data and populate additional attributes for it, according to what we find in the database. """ if not row.product: session = orm.object_session(row) if row.item_entry: row.product = self.locate_product_for_entry(session, row.item_entry) if not row.product and row.upc: row.product = self.products_handler.locate_product_for_gpc(session, row.upc) if not row.product: row.status_code = row.STATUS_PRODUCT_NOT_FOUND return self.refresh_product_basics(row) product = row.product category = product.category row.category_code = category.code if category else None row.category_name = category.name if category else None if not row.batch.static_prices: regular_price = product.regular_price row.regular_price = regular_price.price if regular_price else None row.pack_quantity = regular_price.pack_multiple if regular_price else None row.pack_price = regular_price.pack_price if regular_price else None sale_price = product.sale_price if sale_price: now = getattr(self, 'now', None) or make_utc() if (sale_price.type == enum.PRICE_TYPE_SALE and sale_price.starts and sale_price.starts <= now and sale_price.ends and sale_price.ends >= now): pass # this is what we want else: sale_price = None row.sale_price = sale_price.price if sale_price else None row.sale_start = sale_price.starts if sale_price else None row.sale_stop = sale_price.ends if sale_price else None tpr_price = product.tpr_price if tpr_price: now = getattr(self, 'now', None) or self.app.make_utc() if (tpr_price.type == self.enum.PRICE_TYPE_TPR and tpr_price.starts and tpr_price.starts <= now and tpr_price.ends and tpr_price.ends >= now): pass # this is what we want else: tpr_price = None row.tpr_price = tpr_price.price if tpr_price else None row.tpr_starts = tpr_price.starts if tpr_price else None row.tpr_ends = tpr_price.ends if tpr_price else None current_price = product.current_price if current_price: now = getattr(self, 'now', None) or self.app.make_utc() if (current_price.type in (self.enum.PRICE_TYPE_SALE, self.enum.PRICE_TYPE_TPR) and current_price.starts and current_price.starts <= now and current_price.ends and current_price.ends >= now): pass # this is what we want else: current_price = None row.current_price = current_price.price if current_price else None row.current_starts = current_price.starts if current_price else None row.current_ends = current_price.ends if current_price else None cost = product.cost vendor = cost.vendor if cost else None row.vendor_id = vendor.id if vendor else None row.vendor_name = vendor.name if vendor else None row.vendor_item_code = cost.code if cost else None row.case_quantity = cost.case_size if cost else None if row.regular_price: row.status_code = row.STATUS_OK else: row.status_code = row.STATUS_REGULAR_PRICE_UNKNOWN
[docs] def quick_entry(self, session, batch, entry): """ Quick entry is assumed to be a UPC scan or similar user input. If a matching product can be found, this will add a new row for the batch; otherwise an error is raised. """ product = self.locate_product_for_entry(session, entry) if not product: raise ValueError("Product not found: {}".format(entry)) row = self.make_row() row.product = product self.add_row(batch, row) return row
[docs] def get_effective_rows(self, batch): # filter out removed rows, and maybe inactive product rows rows = batch.active_rows() if self.config.getbool('rattail.batch', 'labels.exclude_inactive_products', default=False): rows = [row for row in rows if row.status_code not in (row.STATUS_PRODUCT_APPEARS_INACTIVE, row.STATUS_PRODUCT_NOT_FOUND)] return rows
[docs] def execute(self, batch, progress=None, **kwargs): """ Print some labels! """ rows = self.get_effective_rows(batch) self.print_labels(batch, rows, progress=progress) return True
[docs] def print_labels(self, batch, rows, progress=None): """ Print all labels for the given batch. """ label_handler = self.app.get_label_handler() profiles = {} def organize(row, i): profile = row.label_profile if not profile: return if profile.uuid not in profiles: profiles[profile.uuid] = profile profile.labels = [] data = row.get_data_dict() # TODO: should rename these columns in the schema; for now # just copy values in the dict to make other logic happy data['sale_starts'] = data['sale_start'] data['sale_ends'] = data['sale_stop'] # TODO: not sure what to make of this yet. printing # labels should not require a product i think, but it # currently does (cf. CommandFormatter.format_labels). # and then again it seems like it would be useful for some # label formatters to leverage things from the product # which aren't in the batch data. so for now we just # always pass it i guess. data['product'] = row.product profile.labels.append((data, row.label_quantity)) self.progress_loop(organize, rows, progress, message="Organizing labels by type") # okay now print for real for profile in profiles.values(): printer = label_handler.get_printer(profile) printer.print_labels(profile.labels, progress=progress)