rattail.datasync.daemon
¶
DataSync for Linux
- class rattail.datasync.daemon.DataSyncDaemon(pidfile, config=None, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null')[source]¶
Linux daemon implementation of DataSync.
This is normally started via command line, e.g.:
cd /srv/envs/poser bin/rattail -c app/datasync.conf datasync start
Note
Even though the naming implies a “proper” daemon, it will not actually daemonize itself. For true daemon behavior, you should run this using a wrapper such as supervisor.
- run()[source]¶
Starts watcher and consumer threads according to configuration.
A separate thread is started for each watcher defined in config.
watch_for_changes()
is the target callable for each of these threads.Additionally, a separate thread is started for each consumer defined for the watcher.
consume_changes_perpetual()
is the target callable for each of these threads.Once all threads are started, this (main) thread loops forever (so as to stay alive, and hence keep child threads alive) but takes no further actions.
- rattail.datasync.daemon.consume_changes_from(config, session, consumer, obtained)[source]¶
Consume all changes which were “obtained” at the given timestamp.
This fetches all changes from the datasync queue table, which correspond to the given consumer and which have an
obtained
value matching the one specified.There are two possibilities here: either the matching changes are part of a “true” batch (i.e. they have a
batch_id
value), or not.This function therefore first looks for changes which do have a
batch_id
. If found, it then sorts those changes bybatch_sequence
to be sure they are processed in the correct order.If none of the changes have a
batch_id
then this function does not sort the changes in any way; they will be processed in (presumably) random order.In any case, regardless of
batch_id
, at this point the function has identified a set of changes to be processed as a “batch” by the consumer. But the larger the batch, the longer it will take for the consumer to process it. This brings a couple of issues:If the consumer is a Rattail DB, and data versioning is enabled, this may cause rather massive resource usage if too many data writes happen.
Additionally, there is no true “progress indicator” for datasync at this time. A semi-practical way to determine its progress is simply to view the queue table and see what if anything it contains (when empty, processing is complete). The set of changes being processed here, will be removed from the queue only after being processed. Hence, the larger the batch, the “less granular” the “progress indicator” will be.
To address these issues then, this function may “prune” the set of changes such that only so many are processed at a time.
And finally, we have a (possibly smaller) set of changes to be processed. This function will then ask the consumer to begin a new transaction, then process the changes, and ultimately commit the transaction.
Once processing is complete (i.e. assuming no error) then those changes are removed from the queue.
- Parameters:
config¶ – Config object for the app.
session¶ – Current session for Rattail DB.
consumer¶ – Reference to datasync consumer object.
obtained¶ – UTC “obtained” timestamp for the first change in the queue. This is used to filter existing changes, i.e. we only want to process changes with this same timestamp, as they are treated as a single “batch”.
- Returns:
True
if all goes well,False
if error.
- rattail.datasync.daemon.consume_changes_perpetual(config, consumer)[source]¶
Target for datasync consumer threads.
This function will loop forever (barring an error) and periodically invoke the
consumer
object to process any changes in the queue, then pause before doing it again, etc. It also tries to detect errors and handle them gracefully.This function is mostly just the loop itself; it calls
consume_current_changes()
during each iteration.
- rattail.datasync.daemon.consume_current_changes(config, consumer)[source]¶
Consume all changes currently available for the given consumer.
The datasync queue table will be checked, and if it contains any changes applicable to the given consumer, then the consumer will be invoked to process the changes.
If there are no applicable changes in the queue, this function will return without taking any real action. But if there are changes, then it tries to be smart about processing them in the correct order, as follows:
The changes are sorted by
obtained
in order to determine the earliest timestamp. Then it callsconsume_changes_from()
with that timestamp.Once all changes with that timestamp have been processed (consumed), this function again looks for any applicable changes in the queue, sorting by timestamp and then calling
consume_changes_from()
with earliest timestamp.This process repeats until there are no longer any changes in the queue which pertain to the given consumer.
- rattail.datasync.daemon.prune_changes(config, watcher, changes)[source]¶
Tell the watcher to prune the original change records from its source database, if relevant.
- rattail.datasync.daemon.record_or_process_changes(config, watcher, changes)[source]¶
This function is responsible for the “initial” handling of changes obtained from a watcher. What this means will depend on the watcher, as follows:
Most watchers are just that - their only job is to report new changes to datasync. In this case this function will merely “record” the changes, by inserting them into the
datasync_change
queue table for further processing by the consumer(s).But some watchers “consume self” - meaning any changes they find, they also are responsible for processing. In this case this function will “process” (consume) the changes directly, by invoking the watcher to do so. These changes will not be added to the queue table for any other consumer(s) to process.
- Parameters:
- Returns:
True
if all goes well,False
if error.TODO: Actually current logic will raise an error instead of returning
False
. That may be preferable, in which case docs should be updated. But technically it does still returnTrue
if no error.
- rattail.datasync.daemon.watch_for_changes(config, watcher)[source]¶
Target for datasync watcher threads.
This function will loop forever (barring an error) and periodically invoke the
watcher
object to check for any new changes, then pause before doing it again, etc. It also tries to detect errors and handle them gracefully.The watcher itself is of course responsible for the mechanics of checking for new changes. It also determines how frequently the checks should happen, etc.
Each time the watcher finds/returns changes, this function will invoke
record_or_process_changes()
with the result.