Source code for preprocessor.daemon

import redis
import structlog
import sys
import os

from .preprocess import preprocess_file

logger = structlog.getLogger(__name__)
CLEANUP_PRODUCT_LIMIT = int(os.environ.get("CLEANUP_PRODUCT_LIMIT", "8"))


[docs]def run_daemon( config, host, port, listen_queue, write_queue, ): """Run the preprocessing daemon, listening on a redis queue for files to be preprocessed. After preprocessing the filename of the preprocessed files will be pushed to the output queue. """ # initialize the queue client client = redis.Redis(host=host, port=port, charset="utf-8", decode_responses=True) logger.debug("waiting for redis queue '%s'..." % listen_queue) counter = 0 while True: if counter > CLEANUP_PRODUCT_LIMIT: logger.info( "%s products preprocessed. Restarting to clean up memory." % CLEANUP_PRODUCT_LIMIT ) sys.exit(0) # fetch an item from the queue to be preprocessed (queue, value) = client.brpop([listen_queue]) with structlog.contextvars.bound_contextvars(file_path=value): # start the preprocessing on that file try: file_path_or_stac_item, already_existed = preprocess_file(config, value) if already_existed: if config["target"].get("pass_further_when_exists", True): # pass item to next queue even if file already exists client.lpush(write_queue, file_path_or_stac_item) else: counter += 1 client.lpush(write_queue, file_path_or_stac_item) except Exception as e: logger.exception(e) continue