1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
import time
import traceback
import os
import h5py
import queue
from typing import Union
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent
class NewFileHandler(FileSystemEventHandler):
"""h5 file creation handler for Watchdog"""
def __init__(self):
self.file_queue = queue.Queue()
# callback for File/Directory created event, called by Observer.
def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
if event.src_path[-4:] == "hdf5":
# run callback with path string
self.file_queue.put(event.src_path)
class ObserverWrapper:
"""Encapsulated Observer boilerplate"""
def __init__(self, path: str, recursive=True):
self.path = path
self.recursive = recursive
self.observer = Observer()
self.handler = NewFileHandler()
self.observer.schedule(self.handler, path=path, recursive=recursive)
self.start()
def start(self):
"""
Starts observing for filesystem events. Runs self.routine() every 1 second.
:param blocking: If true, blocks main thread until keyboard interrupt.
"""
self.observer.start()
def stop(self):
"""
Stops the observer. When running self.start(blocking=True) then you don't need to call this.
"""
self.observer.stop()
self.observer.join()
def wait_for_file(self):
"""
Wait and Process newly created files
"""
max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished.
# will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished.
# Files are usually finished within 20-30 seconds
#
retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing
# wait for file to be added
#print(self.handler.file_queue.get(block=True))
file_path = self.handler.file_queue.get(block=True)
file_name = os.path.basename(file_path)
# try to open the file
retry_count = 0
while True:
try:
file = h5py.File(file_path, "r")
file.close()
return file_path, file_name
except OSError:
if retry_count < max_retry_count:
retry_count += 1
print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
time.sleep(retry_interval_seconds)
else:
print(f"h5 file <{file_path}> reached max retry count, skipping")
except Exception as err:
print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
traceback.print_exc()
|