# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
File Monitor for Windows
"""
import os
import time
import datetime
from six.moves import queue
import logging
from rattail.win32.service import Service
from rattail.threads import Thread
from rattail.filemon.config_ import load_profiles
from rattail.filemon.actions import perform_actions
from rattail.filemon.util import queue_existing
# TODO: Would be nice to have a note explaining why this hack exists.
name = __name__
if name == u'win32':
name = u'rattail.filemon.win32'
log = logging.getLogger(name)
[docs]
class RattailFileMonitor(Service):
"""
Windows service implementation of the File Monitor.
"""
_svc_name_ = u"RattailFileMonitor"
_svc_display_name_ = u"Rattail : File Monitoring Service"
_svc_description_ = (u"Monitors one or more folders for incoming files, "
u"and performs configured actions as new files arrive.")
[docs]
def Initialize(self, config):
"""
Service initialization.
"""
# Read monitor profile(s) from config.
self.monitored = load_profiles(config)
# Make sure we have something to do.
if not self.monitored:
return False
# Create monitor and action threads for each profile.
for key, profile in self.monitored.items():
# Create a file queue for the profile.
profile.queue = queue.Queue()
# Perform setup for each of the watched folders.
for i, path in enumerate(profile.dirs, 1):
# Maybe put all pre-existing files in the queue.
if profile.process_existing:
queue_existing(profile, path)
# Create a watcher thread for the folder.
name = u'watcher_{0}-{1}'.format(key, i)
log.debug(u"starting {0} thread for folder: {1}".format(repr(name), repr(path)))
thread = Thread(target=watch_directory, name=name, args=(profile, path))
thread.daemon = True
thread.start()
# Since `ReadDirectoryChangesW()` doesn't guarantee we'll receive
# all file events, maybe make a "fallback" watcher thread as well.
if profile.fallback_watcher_enable:
name = 'fallback-watcher-{0}'.format(key)
log.debug("starting fallback watcher thread: {0}".format(name))
thread = Thread(target=fallback_watcher, name=name, args=(profile,))
thread.daemon = True
thread.start()
# Create an action thread for the profile.
name = u'actions_{0}'.format(key)
log.debug(u"starting action thread: {0}".format(repr(name)))
thread = Thread(target=perform_actions, name=name, args=(profile,))
thread.daemon = True
thread.start()
return True
[docs]
def watch_directory(profile, path):
"""
Callable target for watcher threads.
"""
import win32file
import win32con
import winnt
hDir = win32file.CreateFile(
path,
winnt.FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None)
if hDir == win32file.INVALID_HANDLE_VALUE:
log.error(u"can't open directory with CreateFile(): {0}".format(repr(path)))
return
dwNotifyFilter = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
if profile.watch_locks:
dwNotifyFilter |= win32con.FILE_NOTIFY_CHANGE_DIR_NAME
while True:
results = win32file.ReadDirectoryChangesW(
hDir,
1024,
False,
dwNotifyFilter)
log.debug(u"ReadDirectoryChangesW() returned: {0}".format(repr(results)))
for action, fname in results:
fpath = os.path.join(path, fname)
queue = False
if profile.watch_locks:
if action == winnt.FILE_ACTION_REMOVED and fpath.endswith('.lock'):
fpath = fpath[:-5]
queue = True
else:
if action in (winnt.FILE_ACTION_ADDED, winnt.FILE_ACTION_RENAMED_NEW_NAME):
if os.path.isfile(fpath): # just in case..ignore folders etc.
queue = True
if queue:
log.debug(u"queueing {0} file: {1}".format(repr(profile.key), repr(fpath)))
profile.queue.put(fpath)
[docs]
def fallback_watcher(profile):
"""
Fallback watcher thread, to deal with any files which the primary watchers
may have missed. While it doesn't often happen, sometimes the Windows API
`ReadDirectoryChangesW()`_ function can "miss" file events. The primary
watchers rely on that exclusively, so this function provides a workaround
to the problem. See also this `Stack Overflow post`_.
.. _`ReadDirectoryChangesW()`: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365465%28v=vs.85%29.aspx
.. _`Stack Overflow post`: http://stackoverflow.com/questions/57254/how-to-keep-readdirectorychangesw-from-missing-file-changes
"""
started = datetime.datetime.now()
max_delta = datetime.timedelta(seconds=profile.fallback_watcher_maxage)
min_delta = datetime.timedelta(seconds=profile.fallback_watcher_minage)
while True:
# Wait the configured number of seconds before looking for files.
time.sleep(profile.fallback_watcher_delay)
# Find the timestamp range we're interested in, each time we look.
now = datetime.datetime.now()
min_time = now - max_delta
if min_time < started:
min_time = started
max_time = now - min_delta
# Inspect all files within all watched folders.
for path in profile.dirs:
for fn in os.listdir(path):
fpath = os.path.join(path, fn)
# We only want files; no folders etc. Note that if the primary
# watcher noticed and is handling this file, it very well could
# have disappeared already.
try:
if not os.path.isfile(fpath):
continue
modtime = os.path.getmtime(fpath)
except WindowsError:
log.debug("file presumably disappeared: {0}".format(fpath))
continue
# Queue file for processing only if its last modification time
# falls within our calculated range. We filter thus in order to
# avoid files which may be old enough to have already been queued
# at service start via the `process_existing` setting, and to
# avoid files which may be new enough to have been queued by the
# primary watcher threads.
modtime = datetime.datetime.fromtimestamp(modtime)
if min_time <= modtime <= max_time:
# Ignore lock files, but only if the profile is configured to
# watch for them in general. If lock files are *supposed* to be
# involved, and one is still hanging around at this point, that
# presumably points to a different problem than the one we're
# trying to solve here...
if profile.watch_locks and fpath.endswith('.lock'):
continue
log.warning("queueing {0} file: {1}".format(repr(profile.key), repr(fpath)))
profile.queue.put(fpath)
else:
log.debug("ignoring file due to age ({0} seconds): {1}".format(
(now - modtime).seconds, fpath))
if __name__ == '__main__':
import win32serviceutil
win32serviceutil.HandleCommandLine(RattailFileMonitor)