Source code for rattail.filemon.win32

# -*- coding: utf-8; -*-
################################################################################
#
#  Rattail -- Retail Software Framework
#  Copyright © 2010-2023 Lance Edgar
#
#  This file is part of Rattail.
#
#  Rattail is free software: you can redistribute it and/or modify it under the
#  terms of the GNU General Public License as published by the Free Software
#  Foundation, either version 3 of the License, or (at your option) any later
#  version.
#
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
#  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
#  details.
#
#  You should have received a copy of the GNU General Public License along with
#  Rattail.  If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
File Monitor for Windows
"""

import os
import time
import datetime
from six.moves import queue
import logging

from rattail.win32.service import Service
from rattail.threads import Thread
from rattail.filemon.config_ import load_profiles
from rattail.filemon.actions import perform_actions
from rattail.filemon.util import queue_existing


# TODO: Would be nice to have a note explaining why this hack exists.
name = __name__
if name == u'win32':
   name = u'rattail.filemon.win32'
log = logging.getLogger(name)


[docs] class RattailFileMonitor(Service): """ Windows service implementation of the File Monitor. """ _svc_name_ = u"RattailFileMonitor" _svc_display_name_ = u"Rattail : File Monitoring Service" _svc_description_ = (u"Monitors one or more folders for incoming files, " u"and performs configured actions as new files arrive.")
[docs] def Initialize(self, config): """ Service initialization. """ # Read monitor profile(s) from config. self.monitored = load_profiles(config) # Make sure we have something to do. if not self.monitored: return False # Create monitor and action threads for each profile. for key, profile in self.monitored.items(): # Create a file queue for the profile. profile.queue = queue.Queue() # Perform setup for each of the watched folders. for i, path in enumerate(profile.dirs, 1): # Maybe put all pre-existing files in the queue. if profile.process_existing: queue_existing(profile, path) # Create a watcher thread for the folder. name = u'watcher_{0}-{1}'.format(key, i) log.debug(u"starting {0} thread for folder: {1}".format(repr(name), repr(path))) thread = Thread(target=watch_directory, name=name, args=(profile, path)) thread.daemon = True thread.start() # Since `ReadDirectoryChangesW()` doesn't guarantee we'll receive # all file events, maybe make a "fallback" watcher thread as well. if profile.fallback_watcher_enable: name = 'fallback-watcher-{0}'.format(key) log.debug("starting fallback watcher thread: {0}".format(name)) thread = Thread(target=fallback_watcher, name=name, args=(profile,)) thread.daemon = True thread.start() # Create an action thread for the profile. name = u'actions_{0}'.format(key) log.debug(u"starting action thread: {0}".format(repr(name))) thread = Thread(target=perform_actions, name=name, args=(profile,)) thread.daemon = True thread.start() return True
[docs] def watch_directory(profile, path): """ Callable target for watcher threads. """ import win32file import win32con import winnt hDir = win32file.CreateFile( path, winnt.FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None) if hDir == win32file.INVALID_HANDLE_VALUE: log.error(u"can't open directory with CreateFile(): {0}".format(repr(path))) return dwNotifyFilter = win32con.FILE_NOTIFY_CHANGE_FILE_NAME if profile.watch_locks: dwNotifyFilter |= win32con.FILE_NOTIFY_CHANGE_DIR_NAME while True: results = win32file.ReadDirectoryChangesW( hDir, 1024, False, dwNotifyFilter) log.debug(u"ReadDirectoryChangesW() returned: {0}".format(repr(results))) for action, fname in results: fpath = os.path.join(path, fname) queue = False if profile.watch_locks: if action == winnt.FILE_ACTION_REMOVED and fpath.endswith('.lock'): fpath = fpath[:-5] queue = True else: if action in (winnt.FILE_ACTION_ADDED, winnt.FILE_ACTION_RENAMED_NEW_NAME): if os.path.isfile(fpath): # just in case..ignore folders etc. queue = True if queue: log.debug(u"queueing {0} file: {1}".format(repr(profile.key), repr(fpath))) profile.queue.put(fpath)
[docs] def fallback_watcher(profile): """ Fallback watcher thread, to deal with any files which the primary watchers may have missed. While it doesn't often happen, sometimes the Windows API `ReadDirectoryChangesW()`_ function can "miss" file events. The primary watchers rely on that exclusively, so this function provides a workaround to the problem. See also this `Stack Overflow post`_. .. _`ReadDirectoryChangesW()`: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365465%28v=vs.85%29.aspx .. _`Stack Overflow post`: http://stackoverflow.com/questions/57254/how-to-keep-readdirectorychangesw-from-missing-file-changes """ started = datetime.datetime.now() max_delta = datetime.timedelta(seconds=profile.fallback_watcher_maxage) min_delta = datetime.timedelta(seconds=profile.fallback_watcher_minage) while True: # Wait the configured number of seconds before looking for files. time.sleep(profile.fallback_watcher_delay) # Find the timestamp range we're interested in, each time we look. now = datetime.datetime.now() min_time = now - max_delta if min_time < started: min_time = started max_time = now - min_delta # Inspect all files within all watched folders. for path in profile.dirs: for fn in os.listdir(path): fpath = os.path.join(path, fn) # We only want files; no folders etc. Note that if the primary # watcher noticed and is handling this file, it very well could # have disappeared already. try: if not os.path.isfile(fpath): continue modtime = os.path.getmtime(fpath) except WindowsError: log.debug("file presumably disappeared: {0}".format(fpath)) continue # Queue file for processing only if its last modification time # falls within our calculated range. We filter thus in order to # avoid files which may be old enough to have already been queued # at service start via the `process_existing` setting, and to # avoid files which may be new enough to have been queued by the # primary watcher threads. modtime = datetime.datetime.fromtimestamp(modtime) if min_time <= modtime <= max_time: # Ignore lock files, but only if the profile is configured to # watch for them in general. If lock files are *supposed* to be # involved, and one is still hanging around at this point, that # presumably points to a different problem than the one we're # trying to solve here... if profile.watch_locks and fpath.endswith('.lock'): continue log.warning("queueing {0} file: {1}".format(repr(profile.key), repr(fpath))) profile.queue.put(fpath) else: log.debug("ignoring file due to age ({0} seconds): {1}".format( (now - modtime).seconds, fpath))
if __name__ == '__main__': import win32serviceutil win32serviceutil.HandleCommandLine(RattailFileMonitor)