# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2022 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
DataSync for Linux
"""
from __future__ import unicode_literals, absolute_import
import sys
import time
import logging
from traceback import format_exception
from rattail.daemon import Daemon
from rattail.threads import Thread
from rattail.datasync.config import load_profiles
from rattail.datasync.util import next_batch_id
log = logging.getLogger(__name__)
[docs]
class DataSyncDaemon(Daemon):
"""
Linux daemon implementation of DataSync.
This is normally started via command line, e.g.:
.. code-block:: sh
cd /srv/envs/poser
bin/rattail -c app/datasync.conf datasync start
.. note::
Even though the naming implies a "proper" daemon, it will not
actually daemonize itself. For true daemon behavior, you
should run this using a wrapper such as `supervisor`_.
.. _supervisor: http://supervisord.org
"""
[docs]
def run(self):
"""
Starts watcher and consumer threads according to configuration.
A separate thread is started for each watcher defined in
config. :func:`watch_for_changes()` is the target callable
for each of these threads.
Additionally, a separate thread is started for each consumer
defined for the watcher. :func:`consume_changes_perpetual()`
is the target callable for each of these threads.
Once all threads are started, this (main) thread loops forever
(so as to stay alive, and hence keep child threads alive) but
takes no further actions.
"""
for key, profile in load_profiles(self.config).items():
# Create watcher thread for the profile.
name = '{}-watcher'.format(key)
log.debug("starting thread '{}' with watcher: {}".format(name, profile.watcher_spec))
thread = Thread(target=watch_for_changes, name=name, args=(self.config, profile.watcher))
thread.daemon = True
thread.start()
# Create consumer threads, unless watcher consumes itself.
if not profile.watcher.consumes_self:
# Create a thread for each "isolated" consumer.
# for consumer in profile.isolated_consumers:
for consumer in profile.consumers:
name = '{}-consumer-{}'.format(key, consumer.key)
log.debug("starting thread '%s' with consumer: %s", name, consumer.spec)
thread = Thread(target=consume_changes_perpetual, name=name, args=(self.config, consumer))
thread.daemon = True
thread.start()
# Loop indefinitely. Since this is the main thread, the app will
# terminate when this method ends; all other threads are "subservient"
# to this one.
while True:
time.sleep(.01)
[docs]
def watch_for_changes(config, watcher):
"""
Target for datasync watcher threads.
This function will loop forever (barring an error) and
periodically invoke the ``watcher`` object to check for any new
changes, then pause before doing it again, etc. It also tries to
detect errors and handle them gracefully.
The watcher itself is of course responsible for the mechanics of
checking for new changes. It also determines how frequently the
checks should happen, etc.
Each time the watcher finds/returns changes, this function will
invoke :func:`record_or_process_changes()` with the result.
:param config: Config object for the app.
:param watcher: Reference to datasync watcher object.
"""
app = config.get_app()
datasync_handler = app.get_datasync_handler()
# let watcher do any setup it needs
watcher.setup()
# the 'last run' value is maintained as zone-aware UTC
lastrun = datasync_handler.get_watcher_lastrun(watcher.key)
lastrun_setting = datasync_handler.get_watcher_lastrun_setting(watcher.key)
timefmt = datasync_handler.get_lastrun_timefmt()
# outer loop - this should never terminate unless an unhandled
# exception happens, or the daemon is stopped
while True:
# reset for inner loop
thisrun = app.make_utc(tzinfo=True)
attempts = 0
last_error_type = None
# inner loop - this is for sake of retry
while True:
attempts += 1
try:
changes = watcher.get_changes(lastrun)
except Exception as error:
exc_type, exc, traceback = sys.exc_info()
# If we've reached our final attempt, stop retrying.
if attempts >= watcher.retry_attempts:
log.warning("attempt #%s of %s failed calling `watcher.get_changes()`, "
"this thread will now *terminate* until datasync restart",
attempts, watcher.retry_attempts, exc_info=True)
app.send_email('datasync_error_watcher_get_changes', {
'watcher': watcher,
'error': exc,
'attempts': attempts,
'traceback': ''.join(format_exception(exc_type, exc, traceback)).strip(),
'datasync_url': config.datasync_url(),
})
raise
# If this exception is not the first, and is of a different type
# than seen previously, do *not* continue to retry.
if last_error_type is not None and not isinstance(error, last_error_type):
log.exception("new exception differs from previous one(s), "
"giving up on watcher.get_changes()")
raise
# remember which type of error this was; pause for next retry
last_error_type = type(error)
log.warning("attempt #%s of %s for watcher.get_changes() failed",
attempts, watcher.retry_attempts, exc_info=True)
log.debug("pausing for %s seconds before making next attempt",
watcher.retry_delay)
if watcher.retry_delay:
time.sleep(watcher.retry_delay)
else: # watcher got changes okay (possibly empty set)
# record new lastrun time
lastrun = thisrun
try:
session = app.make_session()
except:
log.exception("failed to make session, to save lastrun time")
raise
else:
try:
app.save_setting(session, lastrun_setting,
lastrun.strftime(timefmt))
session.commit()
except:
session.rollback()
log.exception("failed to save lastrun time")
raise
finally:
session.close()
if changes:
log.debug("got %s changes from watcher", len(changes))
# record or process changes (depends on watcher)
try:
record_or_process_changes(config, watcher, changes)
except:
log.exception("failed to record/process changes")
raise
# prune changes if applicable (depends on watcher)
try:
prune_changes(config, watcher, changes)
except:
log.exception("failed to prune changes")
raise
# break out of inner loop to reset the attempt count
# for next grab
break
# pause between successful change grabs
time.sleep(watcher.delay)
[docs]
def record_or_process_changes(config, watcher, changes):
"""
This function is responsible for the "initial" handling of changes
obtained from a watcher. What this means will depend on the
watcher, as follows:
Most watchers are just that - their only job is to report new
changes to datasync. In this case this function will merely
"record" the changes, by inserting them into the
``datasync_change`` queue table for further processing by the
consumer(s).
But some watchers "consume self" - meaning any changes they find,
they also are responsible for processing. In this case this
function will "process" (consume) the changes directly, by
invoking the watcher to do so. These changes will *not* be added
to the queue table for any other consumer(s) to process.
:param config: Config object for the app.
:param watcher: Reference to datasync watcher, from whence changes
came.
:param changes: List of changes obtained from the watcher.
:returns: ``True`` if all goes well, ``False`` if error.
TODO: Actually current logic will raise an error instead of
returning ``False``. That may be preferable, in which case
docs should be updated. But technically it does still return
``True`` if no error.
"""
app = config.get_app()
model = config.get_model()
# if watcher consumes itself, then it will process its own
# changes. note that there are no assumptions made about the
# format or structure of these change objects.
if watcher.consumes_self:
try:
session = app.make_session()
except:
log.exception("failed to make session")
raise
try:
watcher.process_changes(session, changes)
except:
log.exception("watcher failed to process its changes")
session.rollback()
raise
else:
session.commit()
log.debug("watcher has consumed its own changes")
return True
finally:
session.close()
# will record changes to consumer queue...
# give all change stubs the same timestamp, to help identify them
# as a "batch" of sorts, so consumers can process them as such.
# (note, this is less important for identifiying a batch now that
# we have batch_id, but is probably still helpful anyway)
now = app.make_utc()
# save change stub records to rattail database, for consumer
# thread to find and process
saved = 0
try:
session = app.make_session()
except:
log.exception("failed to make session for recording changes")
raise
try:
# assign new/unique batch_id so that consumers can keep things
# straight
batch_id = next_batch_id(session)
batch_seq = 0
for key, change in changes:
batch_seq += 1
for consumer in watcher.consumer_stub_keys:
session.add(model.DataSyncChange(
source=watcher.key,
batch_id=batch_id,
batch_sequence=batch_seq,
payload_type=change.payload_type,
payload_key=change.payload_key,
deletion=change.deletion,
obtained=now,
consumer=consumer))
saved += 1
session.flush()
except:
log.exception("failed to record changes")
session.rollback()
raise
else:
session.commit()
finally:
session.close()
log.debug("saved %s '%s' changes to datasync queue", saved, watcher.key)
return True
[docs]
def prune_changes(config, watcher, changes):
"""
Tell the watcher to prune the original change records from its source
database, if relevant.
"""
if not watcher.prunes_changes:
return
try:
# note that we only give it the keys for this
pruned = watcher.prune_changes([c[0] for c in changes])
except:
log.exception("failed to prune changes")
raise
if pruned is not None:
log.debug("watcher pruned %s changes", pruned)
[docs]
def consume_changes_perpetual(config, consumer):
"""
Target for datasync consumer threads.
This function will loop forever (barring an error) and
periodically invoke the ``consumer`` object to process any changes
in the queue, then pause before doing it again, etc. It also
tries to detect errors and handle them gracefully.
This function is mostly just the loop itself; it calls
:func:`consume_current_changes()` during each iteration.
:param config: Config object for the app.
:param consumer: Reference to datasync consumer object.
"""
# tell consumer to do initial setup
consumer.setup()
# begin thread perma-loop
while True:
# try to consume all current changes
try:
result = consume_current_changes(config, consumer)
except:
log.exception("failed to consume current changes")
raise
if not result:
# consumption failed, so exit the perma-loop (this thread
# is now dead)
break
# wait 1 sec by default, then look for more changes
time.sleep(consumer.delay)
[docs]
def consume_current_changes(config, consumer):
"""
Consume all changes currently available for the given consumer.
The datasync queue table will be checked, and if it contains any
changes applicable to the given consumer, then the consumer will
be invoked to process the changes.
If there are no applicable changes in the queue, this function
will return without taking any real action. But if there are
changes, then it tries to be smart about processing them in the
correct order, as follows:
The changes are sorted by
:attr:`~rattail.db.model.datasync.DataSyncChange.obtained` in
order to determine the earliest timestamp. Then it calls
:func:`consume_changes_from()` with that timestamp.
Once all changes with that timestamp have been processed
(consumed), this function again looks for any applicable changes
in the queue, sorting by timestamp and then calling
:func:`consume_changes_from()` with earliest timestamp.
This process repeats until there are no longer any changes in the
queue which pertain to the given consumer.
:param config: Config object for the app.
:param consumer: Reference to datasync consumer object.
:returns: ``True`` if all goes well, ``False`` if error.
"""
app = config.get_app()
model = config.get_model()
try:
session = app.make_session()
except:
log.exception("failed to make session for consuming changes")
raise
def get_first_change():
change = session.query(model.DataSyncChange)\
.filter(model.DataSyncChange.source == consumer.watcher.key)\
.filter(model.DataSyncChange.consumer == consumer.key)\
.order_by(model.DataSyncChange.obtained)\
.first()
return change
# determine first 'obtained' timestamp
try:
first = get_first_change()
except:
log.exception("failed to get first change")
session.close()
return False
error = False
while first:
# try to consume these changes
try:
if not consume_changes_from(config, session, consumer,
first.obtained):
error = True
except:
error = True
log.exception("failed to consume changes obtained at: %s",
first.obtained)
if error:
break
# fetch next 'obtained' timestamp
try:
first = get_first_change()
except:
log.exception("failed to get next-first change")
break
# no more changes! (or perhaps an error)
session.close()
return not error
[docs]
def consume_changes_from(config, session, consumer, obtained):
"""
Consume all changes which were "obtained" at the given timestamp.
This fetches all changes from the datasync queue table, which
correspond to the given consumer and which have an
:attr:`~rattail.db.model.datasync.DataSyncChange.obtained` value
matching the one specified.
There are two possibilities here: either the matching changes are
part of a "true" batch (i.e. they have a
:attr:`~rattail.db.model.datasync.DataSyncChange.batch_id` value),
or not.
This function therefore first looks for changes which *do* have a
``batch_id``. If found, it then sorts those changes by
:attr:`~rattail.db.model.datasync.DataSyncChange.batch_sequence`
to be sure they are processed in the correct order.
If none of the changes have a ``batch_id`` then this function does
not sort the changes in any way; they will be processed in
(presumably) random order.
In any case, regardless of ``batch_id``, at this point the
function has identified a set of changes to be processed as a
"batch" by the consumer. But the larger the batch, the longer it
will take for the consumer to process it. This brings a couple of
issues:
If the consumer is a Rattail DB, and data versioning is enabled,
this may cause rather massive resource usage if too many data
writes happen.
Additionally, there is no true "progress indicator" for datasync
at this time. A semi-practical way to determine its progress is
simply to view the queue table and see what if anything it
contains (when empty, processing is complete). The set of changes
being processed here, will be removed from the queue only after
being processed. Hence, the larger the batch, the "less granular"
the "progress indicator" will be.
To address these issues then, this function may "prune" the set of
changes such that only so many are processed at a time.
And finally, we have a (possibly smaller) set of changes to be
processed. This function will then ask the consumer to begin a
new transaction, then process the changes, and ultimately commit
the transaction.
Once processing is complete (i.e. assuming no error) then those
changes are removed from the queue.
:param config: Config object for the app.
:param session: Current session for Rattail DB.
:param consumer: Reference to datasync consumer object.
:param obtained: UTC "obtained" timestamp for the first change in
the queue. This is used to filter existing changes, i.e. we
only want to process changes with this same timestamp, as they
are treated as a single "batch".
:returns: ``True`` if all goes well, ``False`` if error.
"""
app = config.get_app()
model = config.get_model()
# we only want changes "obtained" at the given time. however, at least
# until all code has been refactored, we must take two possibilities into
# account here: some changes may have been given a batch ID, but some may
# not. we will prefer those with batch ID, or fall back to those without.
changes = session.query(model.DataSyncChange)\
.filter(model.DataSyncChange.source == consumer.watcher.key)\
.filter(model.DataSyncChange.consumer == consumer.key)\
.filter(model.DataSyncChange.obtained == obtained)\
.filter(model.DataSyncChange.batch_id != None)\
.order_by(model.DataSyncChange.batch_id,
model.DataSyncChange.batch_sequence)\
.all()
if changes:
# okay, we got some with a batch ID, now we must prune that list down
# so that we're only dealing with a single batch
batch_id = changes[0].batch_id
changes = [c for c in changes if c.batch_id == batch_id]
else:
# no changes with batch ID, so let's get all without ID instead
changes = session.query(model.DataSyncChange)\
.filter(model.DataSyncChange.source == consumer.watcher.key)\
.filter(model.DataSyncChange.consumer == consumer.key)\
.filter(model.DataSyncChange.obtained == obtained)\
.filter(model.DataSyncChange.batch_id == None)\
.all()
# maybe limit size of batch to process. this can be useful e.g. when large
# amounts of changes land in the queue with same timestamp, and versioning
# is also enabled.
batch_size = config.getint('rattail.datasync', 'batch_size_limit',
session=session)
if batch_size and len(changes) > batch_size:
changes = changes[:batch_size]
log.debug("will process %s changes from %s", len(changes),
app.localtime(obtained, from_utc=True))
# first retry loop is to begin the transaction
attempts = 0
errtype = None
while True:
attempts += 1
try:
consumer.begin_transaction()
except Exception as errobj: # processing failed!
exc_type, exc, traceback = sys.exc_info()
# if we've reached our final attempt, stop retrying
if attempts >= consumer.retry_attempts:
log.warning("attempt #%s failed calling `consumer.begin_transaction()`; "
"this thread will now *terminate* until datasync restart",
attempts, exc_info=True)
app.send_email('datasync_error_consumer_process_changes', {
'watcher': consumer.watcher,
'consumer': consumer,
'error': exc,
'attempts': attempts,
'traceback': ''.join(format_exception(exc_type, exc, traceback)).strip(),
'datasync_url': config.datasync_url(session=session),
})
return False
# if this exception is not the first, and is of a different type
# than seen previously, do *not* continue to retry
if errtype is not None and not isinstance(errobj, errtype):
log.exception("new exception differs from previous one(s), "
"giving up on consumer.begin_transaction()")
return False
# record the type of exception seen; maybe pause before next retry
errtype = type(errobj)
log.warning("attempt #%s failed for '%s' -> '%s' consumer.begin_transaction()",
attempts, consumer.watcher.key, consumer.key)
log.debug("pausing for %s seconds before making attempt #%s of %s",
consumer.retry_delay, attempts + 1, consumer.retry_attempts)
if consumer.retry_delay:
time.sleep(consumer.retry_delay)
else: # transaction began okay
# can stop the attempt/retry loop now
break
# second retry loop is to process the changes
attempts = 0
errtype = None
while True:
attempts += 1
try:
consumer.process_changes(session, changes)
except Exception as errobj: # processing failed!
exc_type, exc, traceback = sys.exc_info()
try:
consumer.rollback_transaction()
except:
log.exception("consumer failed to rollback transaction")
return False
# if we've reached our final attempt, stop retrying
if attempts >= consumer.retry_attempts:
log.warning("attempt #%s failed calling `consumer.process_changes()`; "
"this thread will now *terminate* until datasync restart",
attempts, exc_info=True)
app.send_email('datasync_error_consumer_process_changes', {
'watcher': consumer.watcher,
'consumer': consumer,
'error': exc,
'attempts': attempts,
'traceback': ''.join(format_exception(exc_type, exc, traceback)).strip(),
'datasync_url': config.datasync_url(session=session),
})
return False
else: # more attempts to be made, but log error for debug
log.debug("attempt #%s failed calling `consumer.process_changes()`",
attempts, exc_info=True)
# if this exception is not the first, and is of a different type
# than seen previously, do *not* continue to retry
if errtype is not None and not isinstance(errobj, errtype):
log.exception("new exception differs from previous one(s), "
"giving up on consumer.process_changes()")
return False
# record the type of exception seen; maybe pause before next retry
errtype = type(errobj)
log.warning("attempt #%s failed for '%s' -> '%s' consumer.process_changes()",
attempts, consumer.watcher.key, consumer.key)
log.debug("pausing for %s seconds before making attempt #%s of %s",
consumer.retry_delay, attempts + 1, consumer.retry_attempts)
if consumer.retry_delay:
time.sleep(consumer.retry_delay)
else: # consumer processed changes okay
# commit consumer transaction
try:
consumer.commit_transaction()
except:
log.exception("consumer failed to commit transaction")
return False
# delete these changes from datasync queue
try:
for i, change in enumerate(changes):
session.delete(change)
if i % 200 == 0:
session.flush()
session.commit()
except:
log.exception("failed to delete changes from queue")
return False
# can stop the attempt/retry loop now
log.debug("processed %s changes", len(changes))
break
return True