import redis
import structlog
import sys
import os
from .preprocess import preprocess_file
logger = structlog.getLogger(__name__)
CLEANUP_PRODUCT_LIMIT = int(os.environ.get("CLEANUP_PRODUCT_LIMIT", "8"))
[docs]def run_daemon(
config,
host,
port,
listen_queue,
write_queue,
):
"""Run the preprocessing daemon, listening on a redis queue
for files to be preprocessed. After preprocessing the filename
of the preprocessed files will be pushed to the output queue.
"""
# initialize the queue client
client = redis.Redis(host=host, port=port, charset="utf-8", decode_responses=True)
logger.debug("waiting for redis queue '%s'..." % listen_queue)
counter = 0
while True:
if counter > CLEANUP_PRODUCT_LIMIT:
logger.info(
"%s products preprocessed. Restarting to clean up memory."
% CLEANUP_PRODUCT_LIMIT
)
sys.exit(0)
# fetch an item from the queue to be preprocessed
(queue, value) = client.brpop([listen_queue])
with structlog.contextvars.bound_contextvars(file_path=value):
# start the preprocessing on that file
try:
file_path_or_stac_item, already_existed = preprocess_file(config, value)
if already_existed:
if config["target"].get("pass_further_when_exists", True):
# pass item to next queue even if file already exists
client.lpush(write_queue, file_path_or_stac_item)
else:
counter += 1
client.lpush(write_queue, file_path_or_stac_item)
except Exception as e:
logger.exception(e)
continue