# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Data Batch Handlers
"""
import os
import shutil
import datetime
import warnings
import logging
import json
from sqlalchemy import orm
from rattail.barcodes import upce_to_upca
log = logging.getLogger(__name__)
[docs]
class BatchHandler(object):
"""
Base class and partial default implementation for batch handlers. It is
expected that all batch handlers will ultimately inherit from this base
class, therefore it defines the implementation "interface" loosely
speaking. Custom batch handlers are welcome to supplement or override this
as needed, and in fact must do so for certain aspects.
.. attribute:: pseudo_remove_rows
Flag indicating that when a row is "removed" it is merely
marked as such, but is not deleted outright. This flag is on
by default; if a handler sets it to false then rows will be
deleted outright instead.
"""
populate_batches = False
populate_with_versioning = True
refresh_with_versioning = True
repopulate_when_refresh = False
execute_with_versioning = True
delete_with_versioning = True
# TODO: this should probably default to false
pseudo_remove_rows = True
def __init__(self, config, **kwargs):
self.config = config
self.app = self.config.get_app()
self.enum = config.get_enum()
self.model = config.get_model()
@property
def batch_model_class(self):
"""
Reference to the data model class of the batch type for which this
handler is responsible,
e.g. :class:`~rattail.db.model.batch.labels.LabelBatch`. Each handler
must define this (or inherit from one that does).
"""
raise NotImplementedError("You must set the 'batch_model_class' attribute "
"for class '{}'".format(self.__class__.__name__))
@property
def batch_key(self):
"""
The "batch type key" for the handler, e.g. ``'labels'``. This is not
meant to uniquely identify the handler itself, but rather the *type* of
batch which it handles. Therefore multiple handlers may be defined
which share the same ``batch_key`` - although in practice usually each
app will need only one handler per batch type.
If the handler doesn't define this, it will be obtained from the
``batch_key`` attribute of the :attr:`batch_model_class` attribute.
"""
return self.batch_model_class.batch_key
@classmethod
def get_spec(cls):
return '{}:{}'.format(cls.__module__, cls.__name__)
def get_model_title(self):
return self.batch_model_class.get_model_title()
def allow_versioning(self, action):
if action == 'populate':
return self.populate_with_versioning
if action == 'refresh':
return self.refresh_with_versioning
if action == 'execute':
return self.execute_with_versioning
if action == 'delete':
return self.delete_with_versioning
log.warning("unknown batch action: %s", action)
return True
def is_mutable(self, batch):
"""
Returns a boolean indicating whether or not *any* modifications are to
be allowed for the given batch. By default this returns ``False`` if
the batch is already executed, or has been marked complete; otherwise
returns ``True``.
"""
if batch.executed:
return False
if batch.complete:
return False
return True
[docs]
def consume_batch_id(self, session, as_str=False):
"""
Consumes a new batch ID from the generator, and returns it.
:param session: Current session for Rattail DB.
:param as_str: Flag indicating whether the return value should be a
string, as opposed to the default of integer.
:returns: Batch ID as integer, or zero-padded string of 8 chars.
"""
batch_id = self.app.next_counter_value(session, 'batch_id')
if as_str:
return '{:08d}'.format(batch_id)
return batch_id
[docs]
def make_basic_batch(self, session, user=None, progress=None, **kwargs):
"""
Make a new "basic" batch, with no customization beyond what is provided
by ``kwargs``, which are passed directly to the batch class constructor.
Note that the new batch instance will be added to the provided session,
which will also be flushed.
Callers should use :meth:`make_batch()` instead of this method.
"""
kwargs.setdefault('rowcount', 0)
kwargs.setdefault('complete', False)
# we used just let the postgres sequence auto-generate the id,
# but now we are trying to support more than just postgres,
# and so we consume a batch id using shared logic which can
# accommodate more than just postgres
if 'id' not in kwargs:
kwargs['id'] = self.consume_batch_id(session)
# try to provide default creator
if user and 'created_by' not in kwargs and 'created_by_uuid' not in kwargs:
kwargs['created_by'] = user
batch = self.batch_model_class(**kwargs)
session.add(batch)
session.flush()
return batch
[docs]
def make_batch(self, session, progress=None, **kwargs):
"""
Make and return a new batch instance.
This is the method which callers should use. It invokes
:meth:`make_basic_batch()` to actually create the new batch instance,
and then :meth:`init_batch()` to perform any extra initialization for
it. Note that the batch returned will *not* yet be fully populated.
"""
batch = self.make_basic_batch(session, progress=progress, **kwargs)
kwargs['session'] = session
self.init_batch(batch, progress=progress, **kwargs)
return batch
[docs]
def init_batch(self, batch, progress=None, **kwargs):
"""
Perform extra initialization for the given batch, in whatever way might
make sense. Default of course is to do nothing here; handlers are free
to override as needed.
Note that initial population of a batch should *not* happen here; see
:meth:`populate()` for a place to define that logic.
"""
[docs]
def make_row(self, **kwargs):
"""
Make a new row for the batch. Note however that the row will **not**
be added to the batch; that should be done with :meth:`add_row()`.
:returns: A new row object, which does *not* yet belong to any batch.
"""
return self.batch_model_class.row_class(**kwargs)
[docs]
def add_row(self, batch, row):
"""
Add the given row to the given batch. This assumes it is a *new* row
which does not yet belong to any batch. This logic performs the
following steps:
The row is officially added to the batch, and is immediately
"refreshed" via :meth:`refresh_row()`.
The row is then examined to see if it has been marked as "removed" by
the refresh. If it was *not* removed then the batch's cached
``rowcount`` is incremented, and the :meth:`after_add_row()` hook is
invoked.
"""
session = orm.object_session(batch)
with session.no_autoflush:
batch.data_rows.append(row)
self.refresh_row(row)
if not row.removed:
batch.rowcount = (batch.rowcount or 0) + 1
self.after_add_row(batch, row)
[docs]
def after_add_row(self, batch, row):
"""
Event hook, called immediately after the given row has been "properly"
added to the batch. This is a good place to update totals for the
batch, to account for the new row, etc.
"""
def is_row_deletable(self, row, **kwargs):
"""
General logic to determine if deleting the given row from its
batch should be allowed.
"""
return True
[docs]
def purge_batches(self, session, before=None, before_days=90,
dry_run=False, delete_all_data=None,
progress=None, **kwargs):
"""
Purge all batches which were executed prior to a given date.
:param before: If provided, must be a timezone-aware datetime object.
If not provided, it will be calculated from the current date, using
``before_days``.
:param before_days: Number of days before the current date, to be used
as the cutoff date if ``before`` is not specified.
:param dry_run: Flag indicating that this is a "dry run" and all logic
involved should be (made) aware of that fact.
:param delete_all_data: Flag indicating whether *all* data should be
deleted for each batch being purged. This flag is passed along to
:meth:`delete()`; see that for more info. NOTE: This flag should
probably be deprecated, but so far has not been...but ``dry_run``
should be preferred for readability etc.
:returns: Integer indicating the number of batches purged.
"""
if delete_all_data and dry_run:
raise ValueError("You can enable (n)either of `dry_run` or "
"`delete_all_data` but both cannot be True")
delete_all_data = not dry_run
if not before:
before = self.app.today() - datetime.timedelta(days=before_days)
before = datetime.datetime.combine(before, datetime.time(0))
before = self.app.localtime(before)
log.info("will purge '%s' batches, executed before %s",
self.batch_key, before.date())
old_batches = session.query(self.batch_model_class)\
.filter(self.batch_model_class.executed < before)\
.options(orm.joinedload(self.batch_model_class.data_rows))\
.all()
log.info("found %s batches to purge", len(old_batches))
result = self.app.make_object()
result.purged = 0
def purge(batch, i):
self.do_delete(batch, dry_run=dry_run)
result.purged += 1
if i % 5 == 0:
session.flush()
self.progress_loop(purge, old_batches, progress,
message="Purging old batches")
session.flush()
if old_batches:
log.info("%spurged %s '%s' batches",
"(would have) " if dry_run else "",
result.purged, self.batch_key)
return result.purged
@property
def root_datadir(self):
"""
The absolute path of the root folder in which data for this particular
type of batch is stored. The structure of this path is as follows:
.. code-block:: none
/{root_batch_data_dir}/{batch_type_key}
* ``{root_batch_data_dir}`` - Value of the 'batch.files' option in the
[rattail] section of config file.
* ``{batch_type_key}`` - Unique key for the type of batch it is.
.. note::
While it is likely that the data folder returned by this method
already exists, this method does not guarantee it.
"""
return self.config.batch_filedir(self.batch_key)
def datadir(self, batch):
"""
Returns the absolute path of the folder in which the batch's source
data file(s) resides. Note that the batch must already have been
persisted to the database. The structure of the path returned is as
follows:
.. code-block:: none
/{root_datadir}/{uuid[:2]}/{uuid[2:]}
* ``{root_datadir}`` - Value returned by :meth:`root_datadir()`.
* ``{uuid[:2]}`` - First two characters of batch UUID.
* ``{uuid[2:]}`` - All batch UUID characters *after* the first two.
.. note::
While it is likely that the data folder returned by this method
already exists, this method does not guarantee any such thing. It
is typically assumed that the path will have been created by a
previous call to :meth:`make_batch()` however.
"""
return os.path.join(self.root_datadir, batch.uuid[:2], batch.uuid[2:])
def make_datadir(self, batch):
"""
Returns the data folder specific to the given batch, creating if necessary.
"""
datadir = self.datadir(batch)
if not os.path.exists(datadir):
os.makedirs(datadir)
return datadir
# TODO: remove default attr?
def set_input_file(self, batch, path, attr='filename'):
"""
Assign the data file found at ``path`` to the batch. This overwrites
the given attribute (``attr``) of the batch and places a copy of the
data file in the batch's data folder.
"""
datadir = self.make_datadir(batch)
filename = os.path.basename(path)
shutil.copyfile(path, os.path.join(datadir, filename))
setattr(batch, attr, filename)
[docs]
def should_populate(self, batch):
"""
Must return a boolean indicating whether the given batch should be
populated from an initial data source, i.e. at time of batch creation.
Override this method if you need to inspect the batch in order to
determine whether the populate step is needed. Default behavior is to
simply return the value of :attr:`populate_batches`.
"""
return self.populate_batches
[docs]
def setup_populate(self, batch, progress=None):
"""
Perform any setup (caching etc.) necessary for populating a batch.
"""
[docs]
def teardown_populate(self, batch, progress=None):
"""
Perform any teardown (cleanup etc.) necessary after populating a batch.
"""
[docs]
def do_populate(self, batch, user, progress=None):
"""
Perform initial population for the batch, i.e. fill it with data rows.
Where the handler obtains the data to do this, will vary greatly.
Note that callers *should* use this method, but custom batch handlers
should *not* override this method. Conversely, custom handlers
*should* override the :meth:`~populate()` method, but callers should
*not* use that one directly.
"""
self.setup_populate(batch, progress=progress)
self.populate(batch, progress=progress)
self.teardown_populate(batch, progress=progress)
self.refresh_batch_status(batch)
return True
[docs]
def populate(self, batch, progress=None):
"""
Populate the batch with initial data rows. It is assumed that the data
source to be used will be known by inspecting various properties of the
batch itself.
Note that callers should *not* use this method, but custom batch
handlers *should* override this method. Conversely, custom handlers
should *not* override the :meth:`~do_populate()` method, but callers
*should* use that one directly.
"""
raise NotImplementedError("Please implement `{}.populate()` method".format(self.__class__.__name__))
[docs]
def refreshable(self, batch):
"""
This method should return a boolean indicating whether or not the
handler supports a "refresh" operation for the batch, given its current
condition. The default assumes a refresh is allowed unless the batch
has already been executed.
"""
if batch.executed:
return False
return True
def progress_loop(self, *args, **kwargs):
return self.app.progress_loop(*args, **kwargs)
[docs]
def setup_refresh(self, batch, progress=None):
"""
Perform any setup (caching etc.) necessary for refreshing a batch.
"""
[docs]
def teardown_refresh(self, batch, progress=None):
"""
Perform any teardown (cleanup etc.) necessary after refreshing a batch.
"""
[docs]
def do_refresh(self, batch, user, progress=None):
"""
Perform a full data refresh for the batch, i.e. update any data which
may have become stale, etc.
Note that callers *should* use this method, but custom batch handlers
should *not* override this method. Conversely, custom handlers
*should* override the :meth:`~refresh()` method, but callers should
*not* use that one directly.
"""
self.refresh(batch, progress=progress)
return True
[docs]
def refresh(self, batch, progress=None):
"""
Perform a full data refresh for the batch. What exactly this means will
depend on the type of batch, and specific handler logic.
Generally speaking this refresh is meant to use queries etc. to obtain
"fresh" data for the batch (header) and all its rows. In most cases
certain data is expected to be "core" to the batch and/or rows, and
such data will be left intact, with all *other* data values being
re-calculated and/or reset etc.
Note that callers should *not* use this method, but custom batch
handlers *should* override this method. Conversely, custom handlers
should *not* override the :meth:`~do_refresh()` method, but callers
*should* use that one directly.
"""
session = orm.object_session(batch)
self.setup_refresh(batch, progress=progress)
if self.repopulate_when_refresh:
del batch.data_rows[:]
batch.rowcount = 0
session.flush()
self.populate(batch, progress=progress)
else:
batch.rowcount = 0
def refresh(row, i):
with session.no_autoflush:
self.refresh_row(row)
if not row.removed:
batch.rowcount += 1
self.progress_loop(refresh, batch.active_rows(), progress,
message="Refreshing batch data rows")
self.refresh_batch_status(batch)
self.teardown_refresh(batch, progress=progress)
return True
[docs]
def refresh_many(self, batches, user=None, progress=None):
"""
Refresh a set of batches, with given progress. Default behavior is to
simply refresh each batch in succession. Any batches which are already
executed are skipped.
Handlers may have to override this method if "grouping" or other
special behavior is needed.
"""
needs_refresh = [batch for batch in batches
if not batch.executed]
if not needs_refresh:
return
# TODO: should perhaps try to make the progress indicator reflect the
# "total" number of rows across all batches being refreshed? seems
# like that might be more accurate, for the user. but also harder.
for batch in needs_refresh:
self.do_refresh(batch, user, progress=progress)
[docs]
def refresh_row(self, row):
"""
This method will be passed a row object which has already been properly
added to a batch, and which has basic required fields already
populated. This method is then responsible for further populating all
applicable fields for the row, based on current data within the
appropriate system(s).
Note that in some cases this method may be called multiple times for
the same row, e.g. once when first creating the batch and then later
when a user explicitly refreshes the batch. The method logic must
account for this possibility.
"""
def refresh_product_basics(self, row):
"""
This method will refresh the "basic" product info for a row. It
assumes that the row is derived from
:class:`~rattail.db.model.batch.core.ProductBatchRowMixin` and that
``row.product`` is already set to a valid product.
"""
product = getattr(row, 'product', None)
if not product:
return
row.item_id = product.item_id
row.upc = product.upc
row.brand_name = str(product.brand or "")
row.description = product.description
row.size = product.size
department = product.department
row.department_number = department.number if department else None
row.department_name = department.name if department else None
subdepartment = product.subdepartment
row.subdepartment_number = subdepartment.number if subdepartment else None
row.subdepartment_name = subdepartment.name if subdepartment else None
def quick_entry(self, session, batch, entry):
"""
Handle a "quick entry" value, e.g. from user input. Most frequently this
value would represent a UPC or similar "ID" value for e.g. a product record,
and the handler's duty would be to either locate a corresponding row within
the batch (if one exists), or else add a new row to the batch.
In any event this method can be customized and in fact has no default
behavior, so must be defined by a handler.
:param session: Database sesssion.
:param batch: Batch for which the quick entry is to be handled. Note that
this batch is assumed to belong to the given ``session``.
:param entry: String value to be handled. This is generally assumed to
be from user input (e.g. UPC scan field) but may not always be.
:returns: New or existing "row" object, for the batch.
"""
raise NotImplementedError
[docs]
def locate_product_for_entry(self, session, entry, **kwargs):
"""
Convenience method which invokes
:meth:`rattail.products.ProductsHandler.locate_product_for_entry()`.
"""
products_handler = self.app.get_products_handler()
return products_handler.locate_product_for_entry(session, entry, **kwargs)
[docs]
def remove_row(self, row):
"""
Remove the given row from its batch, and update the batch accordingly.
How exactly the row is "removed" is up to this method. Default is to
set the row's ``removed`` flag, then invoke the
:meth:`refresh_batch_status()` method.
Note that callers should *not* use this method, but custom batch
handlers *should* override this method. Conversely, custom handlers
should *not* override the :meth:`do_remove_row()` method, but callers
*should* use that one directly.
"""
batch = row.batch
if self.pseudo_remove_rows:
row.removed = True
else:
session = self.app.get_session(batch)
session.delete(row)
session.flush()
self.refresh_batch_status(batch)
[docs]
def do_remove_row(self, row):
"""
Remove the given row from its batch, and update the batch accordingly.
Uses the following logic:
If the row's ``removed`` flag is already set, does nothing and returns
immediately.
Otherwise, it invokes :meth:`remove_row()` and then decrements the
batch ``rowcount`` attribute.
Note that callers *should* use this method, but custom batch handlers
should *not* override this method. Conversely, custom handlers
*should* override the :meth:`remove_row()` method, but callers should
*not* use that one directly.
"""
if row.removed:
return
self.remove_row(row)
batch = row.batch
if batch.rowcount is not None:
batch.rowcount -= 1
[docs]
def refresh_batch_status(self, batch):
"""
Update the batch status, as needed. This method does nothing by
default, but may be overridden if the overall batch status needs to be
updated according to the status of its rows. This method may be
invoked whenever rows are added, removed, updated etc.
"""
def write_worksheet(self, batch, progress=None):
"""
Write a worksheet file, to be downloaded by the user. Must return the
file path.
"""
raise NotImplementedError("Please define logic for `{}.write_worksheet()`".format(
self.__class__.__name__))
def update_from_worksheet(self, batch, path, progress=None):
"""
Save the given file to a batch-specific location, then update the
batch data from the file contents.
"""
raise NotImplementedError("Please define logic for `{}.update_from_worksheet()`".format(
self.__class__.__name__))
def mark_complete(self, batch, progress=None):
"""
Mark the given batch as "complete". This usually is just a matter of
setting the :attr:`~rattail.db.model.batch.BatchMixin.complete` flag
for the batch, with the idea that this should "freeze" the batch so
that another user can verify its state before finally executing it.
Each handler is of course free to expound on this idea, or to add extra
logic to this "event" of marking a batch complete.
"""
batch.complete = True
def mark_incomplete(self, batch, progress=None):
"""
Mark the given batch as "incomplete" (aka. pending). This usually is
just a matter of clearing the
:attr:`~rattail.db.model.batch.BatchMixin.complete` flag for the batch,
with the idea that this should "thaw" the batch so that it may be
further updated, i.e. it's not yet ready to execute.
Each handler is of course free to expound on this idea, or to add extra
logic to this "event" of marking a batch incomplete.
"""
batch.complete = False
[docs]
def why_not_execute(self, batch, user=None, **kwargs):
"""
This method should inspect the given batch and, if there is a reason
that execution should *not* be allowed for it, the method should return
a text string indicating that reason. It should return ``None`` if no
such reason could be identified, and execution should be allowed.
Note that it is assumed the batch has not already been executed, since
execution is globally prevented for such batches. In other words you
needn't check for that as a possible reason not to execute.
"""
[docs]
def executable(self, batch):
"""
This method should return a boolean indicating whether or not execution
should be allowed for the batch, given its current condition.
While you may override this method, you are encouraged to override
:meth:`why_not_execute()` instead. Default logic for this method is as
follows:
If the batch is ``None`` then the caller simply wants to know if "any"
batch may be executed, so we return ``True``.
If the batch has already been executed then we return ``False``.
If the :meth:`why_not_execute()` method returns a value, then we assume
execution is not allowed and return ``False``.
Finally we will return ``True`` if none of the above rules matched.
"""
if batch is None:
return True
if batch.executed:
return False
if self.why_not_execute(batch):
return False
return True
[docs]
def auto_executable(self, batch):
"""
Must return a boolean indicating whether the given bath is eligible for
"automatic" execution, i.e. immediately after batch is created.
"""
return False
def describe_execution(self, batch, **kwargs):
"""
This method should essentially return some text describing briefly what
will happen when the given batch is executed.
:param batch: The batch in question, which is a candidate for
execution.
:returns: String value describing the batch execution.
"""
[docs]
def do_execute(self, batch, user, progress=None, **kwargs):
"""
Perform final execution for the batch. What that means for any given
batch, will vary greatly.
Note that callers *should* use this method, but custom batch handlers
should *not* override this method. Conversely, custom handlers
*should* override the :meth:`~execute()` method, but callers should
*not* use that one directly.
"""
reason = self.why_not_execute(batch, user=user)
if reason:
raise RuntimeError(f"Execute not allowed: {reason}")
# make sure we declare who's responsible, if we can
# TODO: seems like if caller already knows user, they should
# have already done this. and probably bad form to do it here
session = self.app.get_session(batch)
session.set_continuum_user(user)
result = self.execute(batch, user=user, progress=progress, **kwargs)
if not result:
return False
batch.executed = self.app.make_utc()
batch.executed_by = user
# record the execution kwargs within batch params, if there
# were any. this is mostly for troubleshooting after the fact
if kwargs:
kwargs = self.get_executed_with_kwargs(batch, **kwargs)
try:
# first make sure kwargs are JSON-safe
json.dumps(kwargs)
except:
# TODO: may need to lower log level if this is common,
# although underlying causes are hopefully easy to fix
log.exception("kwargs are not JSON-safe: %s", kwargs)
else:
batch.set_param('_executed_with_kwargs_', kwargs)
return result
def get_executed_with_kwargs(self, batch, **kwargs):
"""
Should return a JSON-safe set of kwargs the batch was executed
with, for saving into the batch params.
If there are any "special" kwargs, they may need to be
type-coerced or perhaps simply removed.
"""
return kwargs
[docs]
def get_effective_rows(self, batch):
"""
Should return the set of rows from the given batch which are
considered "effective" - i.e. when the batch is executed,
these rows should be processed whereas the remainder should
not.
:param batch: A
:class:`~rattail.db.model.batch.vendorcatalog.VendorCatalogBatch`
instance.
:returns: List of
:class:`~rattail.db.model.batch.vendorcatalog.VendorCatalogBatchRow`
instances.
"""
return batch.active_rows()
[docs]
def execute(self, batch, progress=None, **kwargs):
"""
Execute the given batch, according to the given kwargs. This is really
where the magic happens, although each handler must define that magic,
since the default logic does nothing at all.
Note that callers should *not* use this method, but custom batch
handlers *should* override this method. Conversely, custom handlers
should *not* override the :meth:`~do_execute()` method, but callers
*should* use that one directly.
"""
[docs]
def execute_many(self, batches, progress=None, **kwargs):
"""
Execute a set of batches, with given progress and kwargs. Default
behavior is to simply execute each batch in succession. Any batches
which are already executed are skipped.
Handlers may have to override this method if "grouping" or other
special behavior is needed.
"""
now = self.app.make_utc()
for batch in batches:
if not batch.executed:
self.execute(batch, progress=progress, **kwargs)
batch.executed = now
batch.executed_by = kwargs['user']
return True
def do_delete(self, batch, dry_run=False, progress=None, **kwargs):
"""
Totally delete the given batch. This includes deleting the batch
itself, any rows and "extra" data such as files.
Note that callers *should* use this method, but custom batch handlers
should *not* override this method. Conversely, custom handlers
*should* override the :meth:`~delete()` method, but callers should
*not* use that one directly.
"""
session = orm.object_session(batch)
if 'delete_all_data' in kwargs:
warnings.warn("The 'delete_all_data' kwarg is not supported for "
"this method; please use 'dry_run' instead",
DeprecationWarning, stacklevel=2)
kwargs['delete_all_data'] = not dry_run
self.delete(batch, progress=progress, **kwargs)
session.delete(batch)
[docs]
def delete(self, batch, delete_all_data=True, progress=None, **kwargs):
"""
Delete all data for the batch, including any related (e.g. row)
records, as well as files on disk etc. This method should *not* delete
the batch itself however.
Note that callers should *not* use this method, but custom batch
handlers *should* override this method. Conversely, custom handlers
should *not* override the :meth:`~do_delete()` method, but callers
*should* use that one directly.
:param delete_all_data: Flag indicating whether *all* data should be
deleted. You should probably set this to ``False`` if in dry-run
mode, since deleting *all* data often implies deleting files from
disk, which is not transactional and therefore can't be rolled back.
"""
if delete_all_data:
self.delete_extra_data(batch, progress=progress)
# delete all rows from batch, one by one. maybe would be nicer if we
# could delete all in one fell swoop, but sometimes "extension" row
# records might exist, and can get FK constraint errors
# TODO: in other words i don't even know why this is necessary. seems
# to me that one fell swoop should not incur FK errors
if hasattr(batch, 'data_rows'):
session = orm.object_session(batch)
def delete(row, i):
session.delete(row)
if i % 200 == 0:
session.flush()
self.progress_loop(delete, batch.data_rows, progress,
message="Deleting rows from batch")
session.flush()
# even though we just deleted all rows, we must also "remove" all
# rows explicitly from the batch; otherwise when the batch itself
# is deleted, SQLAlchemy may complain about an unexpected number of
# rows being deleted
del batch.data_rows[:]
def get_filepath(self, batch, **kwargs):
"""
Get the absolute path of a particular file associated with the
given batch.
"""
# TODO: probably need to rework this somehow..
return batch.filepath(self.config, **kwargs)
def delete_extra_data(self, batch, progress=None, **kwargs):
"""
Delete all "extra" data for the batch. This method should *not* bother
trying to delete the batch itself, or rows thereof. It typically is
only concerned with deleting extra files on disk, related to the batch.
"""
path = self.config.batch_filepath(self.batch_key, batch.uuid)
if os.path.exists(path):
shutil.rmtree(path)
[docs]
def setup_clone(self, oldbatch, progress=None):
"""
Perform any setup (caching etc.) necessary for cloning batch. Note
that the ``oldbatch`` arg is the "old" batch, i.e. the one from which a
clone is to be created.
"""
[docs]
def teardown_clone(self, newbatch, progress=None):
"""
Perform any teardown (cleanup etc.) necessary after cloning a batch.
Note that the ``newbatch`` arg is the "new" batch, i.e. the one which
was just created by cloning the old batch.
"""
[docs]
def clone(self, oldbatch, created_by, progress=None, **kwargs):
"""
Clone the given batch as a new batch, and return the new batch.
"""
self.setup_clone(oldbatch, progress=progress)
batch_class = self.batch_model_class
batch_mapper = orm.class_mapper(batch_class)
newbatch = batch_class()
newbatch.created_by = created_by
newbatch.rowcount = 0
for name in batch_mapper.columns.keys():
if name not in ('uuid', 'id', 'created', 'created_by_uuid', 'rowcount', 'executed', 'executed_by_uuid'):
setattr(newbatch, name, getattr(oldbatch, name))
session = orm.object_session(oldbatch)
session.add(newbatch)
session.flush()
row_class = newbatch.row_class
row_mapper = orm.class_mapper(row_class)
def clone_row(oldrow, i):
newrow = self.clone_row(oldrow)
self.add_row(newbatch, newrow)
self.progress_loop(clone_row, self.get_clonable_rows(oldbatch), progress,
message="Cloning data rows for new batch")
self.refresh_batch_status(newbatch)
self.teardown_clone(newbatch, progress=progress)
return newbatch
def get_clonable_rows(self, batch, **kwargs):
return batch.data_rows
def clone_row(self, oldrow):
row_class = self.batch_model_class.row_class
row_mapper = orm.class_mapper(row_class)
newrow = row_class()
for name in row_mapper.columns.keys():
if name not in ('uuid', 'batch_uuid', 'sequence'):
setattr(newrow, name, getattr(oldrow, name))
return newrow
def cache_model(self, session, model_class, **kwargs):
return self.app.cache_model(session, model_class, **kwargs)
def get_batch_types(config):
"""
Returns the list of available batch type keys.
"""
model = config.get_model()
keys = []
for name in dir(model):
if name == 'BatchMixin':
continue
obj = getattr(model, name)
if isinstance(obj, type):
if issubclass(obj, model.Base):
if issubclass(obj, model.BatchMixin):
keys.append(obj.batch_key)
keys.sort()
return keys
[docs]
def get_batch_handler(config, batch_key, default=None, error=True): # pragma: no cover
warnings.warn("function is deprecated; please use "
"`app.get_batch_handler() instead",
DeprecationWarning, stacklevel=2)
app = config.get_app()
return app.get_batch_handler(batch_key, default=default, error=error)