# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2024 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Views for 'ordering' (purchasing) batches
"""
import os
import json
import openpyxl
from rattail.core import Object
from tailbone.db import Session
from tailbone.views.purchasing import PurchasingBatchView
[docs]
class OrderingBatchView(PurchasingBatchView):
"""
Master view for "ordering" batches.
"""
route_prefix = 'ordering'
url_prefix = '/ordering'
model_title = "Ordering Batch"
model_title_plural = "Ordering Batches"
index_title = "Ordering"
rows_editable = True
has_worksheet = True
default_help_url = 'https://rattailproject.org/docs/rattail-manual/features/purchasing/ordering/index.html'
downloadable = True
configurable = True
labels = {
'po_total_calculated': "PO Total",
}
form_fields = [
'id',
'store',
'vendor',
'description',
'workflow',
'order_file',
'order_parser_key',
'buyer',
'department',
'params',
'purchase',
'vendor_email',
'vendor_fax',
'vendor_contact',
'vendor_phone',
'date_ordered',
'po_number',
'po_total_calculated',
'notes',
'created',
'created_by',
'status_code',
'complete',
'executed',
'executed_by',
]
row_labels = {
'po_total_calculated': "PO Total",
}
row_grid_columns = [
'sequence',
'upc',
# 'item_id',
'brand_name',
'description',
'size',
'cases_ordered',
'units_ordered',
# 'cases_received',
# 'units_received',
'po_total_calculated',
# 'invoice_total',
# 'credits',
'status_code',
]
row_form_fields = [
'item_entry',
'item_id',
'upc',
'product',
'brand_name',
'description',
'size',
'case_quantity',
'cases_ordered',
'units_ordered',
'po_line_number',
'po_unit_cost',
'po_total_calculated',
'status_code',
]
order_form_header_columns = [
"UPC",
"Brand",
"Description",
"Case",
"Vend. Code",
"Pref.",
"Unit Cost",
]
@property
def batch_mode(self):
return self.enum.PURCHASE_BATCH_MODE_ORDERING
def configure_form(self, f):
super().configure_form(f)
batch = f.model_instance
workflow = self.request.matchdict.get('workflow_key')
# purchase
if self.creating or not batch.executed or not batch.purchase:
f.remove_field('purchase')
# now that all fields are setup, some final tweaks based on workflow
if self.creating and workflow:
if workflow == 'from_scratch':
f.remove('order_file',
'order_parser_key')
elif workflow == 'from_file':
f.set_required('order_file')
def get_batch_kwargs(self, batch, **kwargs):
kwargs = super().get_batch_kwargs(batch, **kwargs)
kwargs['ship_method'] = batch.ship_method
kwargs['notes_to_vendor'] = batch.notes_to_vendor
return kwargs
def scanning_entry(self):
"""
AJAX view to handle user entry/fetch input for "scanning" feature.
"""
data = self.request.json_body
app = self.get_rattail_app()
prodder = app.get_products_handler()
batch = self.get_instance()
entry = data['entry']
row = self.handler.quick_entry(self.Session(), batch, entry)
uom = self.enum.UNIT_OF_MEASURE_EACH
if row.product and row.product.weighed:
uom = self.enum.UNIT_OF_MEASURE_POUND
cases_ordered = None
if row.cases_ordered:
cases_ordered = float(row.cases_ordered)
units_ordered = None
if row.units_ordered:
units_ordered = float(row.units_ordered)
po_case_cost = None
if row.po_unit_cost is not None:
po_case_cost = row.po_unit_cost * (row.case_quantity or 1)
product_url = None
if row.product_uuid:
product_url = self.request.route_url('products.view', uuid=row.product_uuid)
product_price = None
if row.product and row.product.regular_price:
product_price = row.product.regular_price.price
product_price_display = None
if product_price is not None:
product_price_display = app.render_currency(product_price)
return {
'ok': True,
'entry': entry,
'row': {
'uuid': row.uuid,
'item_id': row.item_id,
'upc_display': row.upc.pretty() if row.upc else None,
'brand_name': row.brand_name,
'description': row.description,
'size': row.size,
'unit_of_measure_display': self.enum.UNIT_OF_MEASURE[uom],
'case_quantity': float(row.case_quantity) if row.case_quantity is not None else None,
'cases_ordered': cases_ordered,
'units_ordered': units_ordered,
'po_unit_cost': float(row.po_unit_cost) if row.po_unit_cost is not None else None,
'po_unit_cost_display': app.render_currency(row.po_unit_cost),
'po_case_cost': float(po_case_cost) if po_case_cost is not None else None,
'po_case_cost_display': app.render_currency(po_case_cost),
'image_url': prodder.get_image_url(upc=row.upc),
'product_url': product_url,
'product_price_display': product_price_display,
},
}
def scanning_update(self):
"""
AJAX view to handle row data updates for "scanning" feature.
"""
data = self.request.json_body
batch = self.get_instance()
assert batch.mode == self.enum.PURCHASE_BATCH_MODE_ORDERING
assert not (batch.executed or batch.complete)
uuid = data.get('row_uuid')
row = self.Session.get(self.model_row_class, uuid) if uuid else None
if not row:
return {'error': "Row not found"}
if row.batch is not batch or row.removed:
return {'error': "Row is not active for batch"}
self.handler.update_row_quantity(row, **data)
return {'ok': True}
def worksheet(self):
"""
View for editing batch row data as an order form worksheet.
"""
batch = self.get_instance()
if batch.executed:
return self.redirect(self.get_action_url('view', batch))
# organize existing batch rows by product
order_items = {}
for row in batch.active_rows():
order_items[row.product_uuid] = row
# organize vendor catalog costs by dept / subdept
departments = {}
costs = self.handler.get_order_form_costs(self.Session(), batch.vendor)
costs = self.handler.sort_order_form_costs(costs)
costs = list(costs) # we must have a stable list for the rest of this
self.handler.decorate_order_form_costs(batch, costs)
for cost in costs:
department = cost.product.department
if department:
departments.setdefault(department.uuid, department)
else:
if None not in departments:
department = Object(name='', number=None)
departments[None] = department
department = departments[None]
subdepartments = getattr(department, '_order_subdepartments', None)
if subdepartments is None:
subdepartments = department._order_subdepartments = {}
subdepartment = cost.product.subdepartment
if subdepartment:
subdepartments.setdefault(subdepartment.uuid, subdepartment)
else:
if None not in subdepartments:
subdepartment = Object(name=None, number=None)
subdepartments[None] = subdepartment
subdepartment = subdepartments[None]
subdept_costs = getattr(subdepartment, '_order_costs', None)
if subdept_costs is None:
subdept_costs = subdepartment._order_costs = []
subdept_costs.append(cost)
cost._batchrow = order_items.get(cost.product_uuid)
# fetch recent purchase history, sort/pad for template convenience
history = self.handler.get_order_form_history(batch, costs, 6)
for i in range(6 - len(history)):
history.append(None)
history = list(reversed(history))
title = self.get_instance_title(batch)
order_date = batch.date_ordered
if not order_date:
order_date = self.app.today()
return self.render_to_response('worksheet', {
'batch': batch,
'order_date': order_date,
'instance': batch,
'instance_title': title,
'instance_url': self.get_action_url('view', batch),
'vendor': batch.vendor,
'departments': departments,
'history': history,
'get_upc': lambda p: p.upc.pretty() if p.upc else '',
'header_columns': self.order_form_header_columns,
'ignore_cases': not self.handler.allow_cases(),
'worksheet_data': self.get_worksheet_data(departments),
})
def get_worksheet_data(self, departments):
data = {}
for department in departments.values():
for subdepartment in department._order_subdepartments.values():
for i, cost in enumerate(subdepartment._order_costs, 1):
cases = int(cost._batchrow.cases_ordered or 0) if cost._batchrow else None
units = int(cost._batchrow.units_ordered or 0) if cost._batchrow else None
key = 'cost_{}'.format(cost.uuid)
data['{}_cases'.format(key)] = cases
data['{}_units'.format(key)] = units
total = 0
row = cost._batchrow
if row:
total = row.po_total_calculated or row.po_total or 0
if not (total or cases or units):
display = ''
else:
display = '${:0,.2f}'.format(total)
data['{}_total_display'.format(key)] = display
return data
[docs]
def worksheet_update(self):
"""
Handles AJAX requests to update the order quantities for some row
within the current batch, from the worksheet view. POST data should
include:
* ``product_uuid``
* ``cases_ordered``
* ``units_ordered``
If a row already exists for the given product, it will be updated;
otherwise a new row is created for the product and then that is
updated. The handler's
:meth:`~rattail:rattail.batch.purchase.PurchaseBatchHandler.update_row_quantity()`
method is invoked to update the row.
However, if both of the quantities given are empty, and a row exists
for the given product, then that row is removed from the batch, instead
of being updated. If a matching row is not found, it will not be
created.
"""
model = self.app.model
batch = self.get_instance()
try:
data = self.request.json_body
except json.JSONDecodeError:
data = self.request.POST
cases_ordered = data.get('cases_ordered')
if cases_ordered is None:
cases_ordered = 0
elif not isinstance(cases_ordered, int):
if cases_ordered == '':
cases_ordered = 0
else:
cases_ordered = int(float(cases_ordered))
if cases_ordered >= 100000: # TODO: really this depends on underlying column
return {'error': "Invalid value for cases ordered: {}".format(cases_ordered)}
units_ordered = data.get('units_ordered')
if units_ordered is None:
units_ordered = 0
elif not isinstance(units_ordered, int):
if units_ordered == '':
units_ordered = 0
else:
units_ordered = int(float(units_ordered))
if units_ordered >= 100000: # TODO: really this depends on underlying column
return {'error': "Invalid value for units ordered: {}".format(units_ordered)}
uuid = data.get('product_uuid')
product = self.Session.get(model.Product, uuid) if uuid else None
if not product:
return {'error': "Product not found"}
# first we find out which existing row(s) match the given product
matches = [row for row in batch.active_rows()
if row.product_uuid == product.uuid]
if matches and len(matches) != 1:
raise RuntimeError("found too many ({}) matches for product {} in batch {}".format(
len(matches), product.uuid, batch.uuid))
row = None
if cases_ordered or units_ordered:
# make a new row if necessary
if matches:
row = matches[0]
else:
row = self.handler.make_row()
row.product = product
self.handler.add_row(batch, row)
# update row quantities
try:
self.handler.update_row_quantity(row, cases_ordered=cases_ordered,
units_ordered=units_ordered)
except Exception as error:
return {'error': str(error)}
else: # empty order quantities
# remove row if present
if matches:
row = matches[0]
self.handler.do_remove_row(row)
row = None
return {
'row_cases_ordered': int(row.cases_ordered or 0) if row else None,
'row_units_ordered': int(row.units_ordered or 0) if row else None,
'row_po_total': '${:0,.2f}'.format(row.po_total or 0) if row else None,
'row_po_total_calculated': '${:0,.2f}'.format(row.po_total_calculated or 0) if row else None,
'row_po_total_display': '${:0,.2f}'.format(row.po_total_calculated or row.po_total or 0) if row else None,
'batch_po_total': '${:0,.2f}'.format(batch.po_total or 0),
'batch_po_total_calculated': '${:0,.2f}'.format(batch.po_total_calculated or 0),
'batch_po_total_display': '${:0,.2f}'.format(batch.po_total_calculated or batch.po_total or 0),
}
def download_excel(self):
"""
Download ordering batch as Excel spreadsheet.
"""
batch = self.get_instance()
# populate Excel worksheet
workbook = openpyxl.Workbook()
worksheet = workbook.active
worksheet.title = "Purchase Order"
worksheet.append(["Store", "Vendor", "Date ordered"])
date_ordered = batch.date_ordered.strftime('%m/%d/%Y') if batch.date_ordered else None
worksheet.append([batch.store.name, batch.vendor.name, date_ordered])
worksheet.append([])
worksheet.append(['vendor_code', 'upc', 'brand_name', 'description', 'cases_ordered', 'units_ordered'])
for row in batch.active_rows():
worksheet.append([row.vendor_code, str(row.upc), row.brand_name,
'{} {}'.format(row.description, row.size),
row.cases_ordered, row.units_ordered])
# write Excel file to batch data dir
filedir = batch.filedir(self.rattail_config)
if not os.path.exists(filedir):
os.makedirs(filedir)
filename = 'PO.{}.xlsx'.format(batch.id_str)
path = batch.filepath(self.rattail_config, filename)
workbook.save(path)
return self.file_response(path)
def get_execute_success_url(self, batch, result, **kwargs):
model = self.app.model
if isinstance(result, model.Purchase):
return self.request.route_url('purchases.view', uuid=result.uuid)
return super().get_execute_success_url(batch, result, **kwargs)
def configure_get_simple_settings(self):
return [
# workflows
{'section': 'rattail.batch',
'option': 'purchase.allow_ordering_from_scratch',
'type': bool,
'default': True},
{'section': 'rattail.batch',
'option': 'purchase.allow_ordering_from_file',
'type': bool,
'default': True},
# vendors
{'section': 'rattail.batch',
'option': 'purchase.allow_ordering_any_vendor',
'type': bool,
'default': True,
},
]
def configure_get_context(self):
context = super().configure_get_context()
vendor_handler = self.app.get_vendor_handler()
Parsers = vendor_handler.get_all_order_parsers()
Supported = vendor_handler.get_supported_order_parsers()
context['order_parsers'] = Parsers
context['order_parsers_data'] = dict([(Parser.key, Parser in Supported)
for Parser in Parsers])
return context
def configure_gather_settings(self, data):
settings = super().configure_gather_settings(data)
vendor_handler = self.app.get_vendor_handler()
supported = []
for Parser in vendor_handler.get_all_order_parsers():
name = f'order_parser_{Parser.key}'
if data.get(name) == 'true':
supported.append(Parser.key)
settings.append({'name': 'rattail.vendors.supported_order_parsers',
'value': ', '.join(supported)})
return settings
def configure_remove_settings(self):
super().configure_remove_settings()
names = [
'rattail.vendors.supported_order_parsers',
]
# nb. using thread-local session here; we do not use
# self.Session b/c it may not point to Rattail
session = Session()
for name in names:
self.app.delete_setting(session, name)
@classmethod
def defaults(cls, config):
cls._ordering_defaults(config)
cls._purchase_batch_defaults(config)
cls._batch_defaults(config)
cls._defaults(config)
@classmethod
def _ordering_defaults(cls, config):
route_prefix = cls.get_route_prefix()
instance_url_prefix = cls.get_instance_url_prefix()
permission_prefix = cls.get_permission_prefix()
model_title = cls.get_model_title()
model_title_plural = cls.get_model_title_plural()
# fix permission group label
config.add_tailbone_permission_group(permission_prefix, model_title_plural,
overwrite=False)
# scanning entry
config.add_route('{}.scanning_entry'.format(route_prefix), '{}/scanning-entry'.format(instance_url_prefix))
config.add_view(cls, attr='scanning_entry', route_name='{}.scanning_entry'.format(route_prefix),
permission='{}.edit_row'.format(permission_prefix),
renderer='json')
# scanning update
config.add_route('{}.scanning_update'.format(route_prefix), '{}/scanning-update'.format(instance_url_prefix))
config.add_view(cls, attr='scanning_update', route_name='{}.scanning_update'.format(route_prefix),
permission='{}.edit_row'.format(permission_prefix),
renderer='json')
# download as Excel
config.add_route('{}.download_excel'.format(route_prefix), '{}/excel'.format(instance_url_prefix))
config.add_view(cls, attr='download_excel', route_name='{}.download_excel'.format(route_prefix),
permission='{}.download_excel'.format(permission_prefix))
config.add_tailbone_permission(permission_prefix, '{}.download_excel'.format(permission_prefix),
"Download {} as Excel".format(model_title))
def defaults(config, **kwargs):
base = globals()
OrderingBatchView = kwargs.get('OrderingBatchView', base['OrderingBatchView'])
OrderingBatchView.defaults(config)
def includeme(config):
defaults(config)