Skip to content
Snippets Groups Projects
Commit 659c5822 authored by Benjamin Jurk's avatar Benjamin Jurk
Browse files

feat(dc): add vhost name to critical logs

parent 31723942
No related branches found
No related tags found
No related merge requests found
Pipeline #149372 passed
......@@ -32,6 +32,7 @@ def rabbitmq_callback(
sql_connection: SQLConnection,
dirty_jobs: dict[int, int],
callback_timer: CallbackTimer,
vhost_name: str,
spec: utils.specification.Specification,
channel: pika.adapters.blocking_connection.BlockingChannel,
method: pika.spec.Basic.Deliver,
......@@ -50,6 +51,7 @@ def rabbitmq_callback(
sql_connection: connection to sql db
dirty_jobs: collection of jobs which have problems going into the db
callback_timer: a timer which will log criticals if rabbitmq doesnt send any data
vhost_name: name of rabbitmq host, used for debugging
spec
channel: connection to rabbitmq
method: ?
......@@ -70,7 +72,9 @@ def rabbitmq_callback(
case "epilog":
job_id, query = on_finish(job_handler)
case _:
logger.critical("recieved unknown mode: %s", mode)
logger.critical(
"recieved unknown mode from %s: %s", vhost_name, mode
)
return
try:
......@@ -78,7 +82,9 @@ def rabbitmq_callback(
# respond with succesfull acknowledgement to rabbitmq
channel.basic_ack(delivery_tag=method.delivery_tag)
except SQLError as error:
check_jobdirtiness(dirty_jobs, channel, method, job_id, error)
check_jobdirtiness(
vhost_name, dirty_jobs, channel, method, job_id, error
)
def on_prolog(job_handler: JobHandler) -> tuple[int, InsertQuery]:
......@@ -139,6 +145,7 @@ def on_finish(job_handler: JobHandler) -> tuple[int, UpdateQuery]:
def check_jobdirtiness(
vhost_name: str,
dirty_jobs: dict[int, int],
channel: pika.adapters.blocking_connection.BlockingChannel,
method: pika.spec.Basic.Deliver,
......@@ -156,6 +163,8 @@ def check_jobdirtiness(
many fails
Args:
vhost_name: name of rabbitmq channel, used for debugging
dirty_jobs: collections of dirty jobs
channel (BlockingChannel): connection to rabbitmq
method (Basic.Deliver): ?
job_id (int): job_id which cant be correctly inserted into the db
......@@ -175,7 +184,9 @@ def check_jobdirtiness(
channel.basic_nack(delivery_tag=method.delivery_tag)
else:
dirty_jobs.pop(job_id)
logger.critical("job is dirty: %s, error: %s", job_id, error)
logger.critical(
"job is dirty: %s, error: %s, origin: %s", job_id, error, vhost_name
)
# respond with succesfull acknowledgement to rabbitmq, abandon the
# job
channel.basic_ack(delivery_tag=method.delivery_tag)
......@@ -192,7 +203,8 @@ def main(
callback_timer = CallbackTimer(
ONE_HOUR_IN_SEC,
lambda: logger.critical(
"no data has arrived from rabbitmq for an hour"
"no data has arrived from %s for an hour",
settings.rabbitmq.rabbitmq_vhost,
),
)
callback_timer.start_or_restart()
......@@ -209,6 +221,7 @@ def main(
sql_connection,
dirty_jobs,
callback_timer,
settings.rabbitmq.rabbitmq_vhost,
spec,
channel,
method,
......@@ -224,7 +237,10 @@ def main(
try:
data_source.connect_to_server()
except pika.exceptions.AMQPConnectionError:
logger.critical("cant connect to rabbitmq server, reconnecting...")
logger.critical(
"cant connect to %s-rabbitmq server, reconnecting...",
settings.rabbitmq.rabbitmq_vhost,
)
sleep(300)
continue
......@@ -234,7 +250,8 @@ def main(
data_source.collect()
except pika.exceptions.ChannelClosed as error:
logger.critical(
"rabbitmq channel was closed by the broker: %s, reconnecting...",
"%s-rabbitmq channel was closed by the broker: %s, reconnecting...",
settings.rabbitmq.rabbitmq_vhost,
error,
)
sleep(300)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment