rattail.datasync.daemon

DataSync for Linux

class rattail.datasync.daemon.DataSyncDaemon(pidfile, config=None, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null')[source]

Linux daemon implementation of DataSync.

This is normally started via command line, e.g.:

cd /srv/envs/poser
bin/rattail -c app/datasync.conf datasync start

Note

Even though the naming implies a “proper” daemon, it will not actually daemonize itself. For true daemon behavior, you should run this using a wrapper such as supervisor.

run()[source]

Starts watcher and consumer threads according to configuration.

A separate thread is started for each watcher defined in config. watch_for_changes() is the target callable for each of these threads.

Additionally, a separate thread is started for each consumer defined for the watcher. consume_changes_perpetual() is the target callable for each of these threads.

Once all threads are started, this (main) thread loops forever (so as to stay alive, and hence keep child threads alive) but takes no further actions.

rattail.datasync.daemon.consume_changes_from(config, session, consumer, obtained)[source]

Consume all changes which were “obtained” at the given timestamp.

This fetches all changes from the datasync queue table, which correspond to the given consumer and which have an obtained value matching the one specified.

There are two possibilities here: either the matching changes are part of a “true” batch (i.e. they have a batch_id value), or not.

This function therefore first looks for changes which do have a batch_id. If found, it then sorts those changes by batch_sequence to be sure they are processed in the correct order.

If none of the changes have a batch_id then this function does not sort the changes in any way; they will be processed in (presumably) random order.

In any case, regardless of batch_id, at this point the function has identified a set of changes to be processed as a “batch” by the consumer. But the larger the batch, the longer it will take for the consumer to process it. This brings a couple of issues:

If the consumer is a Rattail DB, and data versioning is enabled, this may cause rather massive resource usage if too many data writes happen.

Additionally, there is no true “progress indicator” for datasync at this time. A semi-practical way to determine its progress is simply to view the queue table and see what if anything it contains (when empty, processing is complete). The set of changes being processed here, will be removed from the queue only after being processed. Hence, the larger the batch, the “less granular” the “progress indicator” will be.

To address these issues then, this function may “prune” the set of changes such that only so many are processed at a time.

And finally, we have a (possibly smaller) set of changes to be processed. This function will then ask the consumer to begin a new transaction, then process the changes, and ultimately commit the transaction.

Once processing is complete (i.e. assuming no error) then those changes are removed from the queue.

Parameters:
  • config – Config object for the app.

  • session – Current session for Rattail DB.

  • consumer – Reference to datasync consumer object.

  • obtained – UTC “obtained” timestamp for the first change in the queue. This is used to filter existing changes, i.e. we only want to process changes with this same timestamp, as they are treated as a single “batch”.

Returns:

True if all goes well, False if error.

rattail.datasync.daemon.consume_changes_perpetual(config, consumer)[source]

Target for datasync consumer threads.

This function will loop forever (barring an error) and periodically invoke the consumer object to process any changes in the queue, then pause before doing it again, etc. It also tries to detect errors and handle them gracefully.

This function is mostly just the loop itself; it calls consume_current_changes() during each iteration.

Parameters:
  • config – Config object for the app.

  • consumer – Reference to datasync consumer object.

rattail.datasync.daemon.consume_current_changes(config, consumer)[source]

Consume all changes currently available for the given consumer.

The datasync queue table will be checked, and if it contains any changes applicable to the given consumer, then the consumer will be invoked to process the changes.

If there are no applicable changes in the queue, this function will return without taking any real action. But if there are changes, then it tries to be smart about processing them in the correct order, as follows:

The changes are sorted by obtained in order to determine the earliest timestamp. Then it calls consume_changes_from() with that timestamp.

Once all changes with that timestamp have been processed (consumed), this function again looks for any applicable changes in the queue, sorting by timestamp and then calling consume_changes_from() with earliest timestamp.

This process repeats until there are no longer any changes in the queue which pertain to the given consumer.

Parameters:
  • config – Config object for the app.

  • consumer – Reference to datasync consumer object.

Returns:

True if all goes well, False if error.

rattail.datasync.daemon.prune_changes(config, watcher, changes)[source]

Tell the watcher to prune the original change records from its source database, if relevant.

rattail.datasync.daemon.record_or_process_changes(config, watcher, changes)[source]

This function is responsible for the “initial” handling of changes obtained from a watcher. What this means will depend on the watcher, as follows:

Most watchers are just that - their only job is to report new changes to datasync. In this case this function will merely “record” the changes, by inserting them into the datasync_change queue table for further processing by the consumer(s).

But some watchers “consume self” - meaning any changes they find, they also are responsible for processing. In this case this function will “process” (consume) the changes directly, by invoking the watcher to do so. These changes will not be added to the queue table for any other consumer(s) to process.

Parameters:
  • config – Config object for the app.

  • watcher – Reference to datasync watcher, from whence changes came.

  • changes – List of changes obtained from the watcher.

Returns:

True if all goes well, False if error.

TODO: Actually current logic will raise an error instead of returning False. That may be preferable, in which case docs should be updated. But technically it does still return True if no error.

rattail.datasync.daemon.watch_for_changes(config, watcher)[source]

Target for datasync watcher threads.

This function will loop forever (barring an error) and periodically invoke the watcher object to check for any new changes, then pause before doing it again, etc. It also tries to detect errors and handle them gracefully.

The watcher itself is of course responsible for the mechanics of checking for new changes. It also determines how frequently the checks should happen, etc.

Each time the watcher finds/returns changes, this function will invoke record_or_process_changes() with the result.

Parameters:
  • config – Config object for the app.

  • watcher – Reference to datasync watcher object.