.. SPDX-FileCopyrightText: 2024 Red Hat, Inc
..
.. SPDX-License-Identifier: GPL-2.0-or-later

.. _consumers:

=========
Consumers
=========

This library is aimed at making implementing a message consumer as simple as
possible by implementing common boilerplate code and offering a command line
interface to easily start a consumer as a service under init systems like
systemd.

Introduction
============

`AMQP consumers`_ configure a `queue`_ for their use in the message broker.
When a message is published to an `exchange`_ and matches the `bindings`_ the
consumer has declared, the message is placed in the queue and eventually
delivered to the consumer. Fedora uses a `topic exchange`_ for general-purpose
messages.

Fortunately, you don't need to manage the connection to the broker or configure
the queue. All you need to do is to implement some code to run when a message
is received. The API expects a callable object that accepts a single positional
argument::

    from fedora_messaging import api, config

    # The fedora_messaging API does not automatically configure logging so as
    # to not destroy application logging setup. This is a convenience method
    # to configure the Python logger with the fedora-messaging logging config.
    config.conf.setup_logging()

    # First, define a function to be used as our callback. This will be called
    # whenever a message is received from the server.
    def printer_callback(message):
        """
        Print the message to standard output.

        Args:
            message (fedora_messaging.message.Message): The message we received
                from the queue.
        """
        print(str(message))

    # Next, we need a queue to consume messages from. We can define
    # the queue and binding configurations in these dictionaries:
    queues = {
        'demo': {
            'durable': False,  # Delete the queue on broker restart
            'auto_delete': True,  # Delete the queue when the client terminates
            'exclusive': False,  # Allow multiple simultaneous consumers
            'arguments': {},
        },
    }
    binding = {
        'exchange': 'amq.topic',  # The AMQP exchange to bind our queue to
        'queue': 'demo',  # The unique name of our queue on the AMQP broker
        'routing_keys': ['#'],  # The topics that should be delivered to the queue
    }

    # Start consuming messages using our callback. This call will block until
    # a KeyboardInterrupt is raised, or the process receives a SIGINT or SIGTERM
    # signal.
    api.consume(printer_callback, bindings=binding, queues=queues)

In this example, there's one queue and the queue only has one binding, but it's
possible to consume from multiple queues and each queue can have multiple
bindings.


Command Line Interface
======================

A command line interface, :ref:`fm-cli`, is included to make running
consumers easier. It's not necessary to write any boilerplate code calling the
API, just run ``fedora-messaging consume`` and provide it the Python path to
your callback::

    $ fedora-messaging consume --callback=fedora_messaging.example:printer

Consult the manual page for complete details on this command line interface.

.. note:: For users of fedmsg, this is roughly equivalent to fedmsg-hub


Consumer API
============

The introduction contains a very minimal callback. This section covers the
complete API for consumers.

Message Lifecycle Overview
--------------------------

This is the full path of a message, from the producer to the consumer(s).

.. figure:: Producer-Consumer.png
    :alt: Producers and consumers

    (:download:`diagram source <Producer-Consumer.drawio>` in `draw.io <https://draw.io>`_ format)

Message classes/schemas will be covered in the :ref:`messages` documentation.
They are not strictly required, you don't have to install the schemas for the
messages you are consuming, but you will not have access to their advanced
Python API.

The Callback
------------

The callback provided to :func:`fedora_messaging.api.consume` or the command-line
interface can be any callable Python object, so long as it accepts the message
object as a single positional argument.

The API will also accept a Python class, which it will instantiate before
using as a callable object. This allows you to write a callback with easy
one-time initialization or a callback that maintains state between calls::

    import os

    from fedora_messaging import config


    class SaveMessage:
        """
        A fedora-messaging consumer that saves the message to a file.

        A single configuration key is used from fedora-messaging's
        "consumer_config" key, "path", which is where the consumer will save
        the messages::

            [consumer_config]
            path = "/tmp/fedora-messaging/messages.txt"
        """

        def __init__(self):
            """Perform some one-time initialization for the consumer."""
            self.path = config.conf["consumer_config"]["path"]

            # Ensure the path exists before the consumer starts
            if not os.path.exists(os.path.dirname(self.path)):
                os.mkdir(os.path.dirname(self.path))

        def __call__(self, message):
            """
            Invoked when a message is received by the consumer.

            Args:
                message (fedora_messaging.api.Message): The message from AMQP.
            """
            with open(self.path, "a") as fd:
                fd.write(str(message))

When running this type of callback from the command-line interface, specify
the Python path to the class object, not the ``__call__`` method::

    $ fedora-messaging consume --callback=package_name.module:SaveMessage


Exceptions
----------

* Consumers should raise the :class:`fedora_messaging.exceptions.Nack`
  exception if the consumer cannot handle the message at this time. The message
  will be re-queued, and the server will attempt to re-deliver it at a later
  time.

* Consumers should raise the :class:`fedora_messaging.exceptions.Drop` exception
  when they wish to explicitly indicate they do not want handle the message. This
  is similar to simply calling ``return``, but the server is informed the client
  dropped the message. What the server does depends on configuration.

* Consumers should raise the :class:`fedora_messaging.exceptions.HaltConsumer`
  exception if they wish to stop consuming messages.

If a consumer raises any other exception, a traceback will be logged at the
error level, the message being processed and any pre-fetched messages will be
returned to the queue for later delivery, and the consumer will be canceled.

If the CLI is being used, it will halt with a non-zero exit code. If the API
is being used directly, consult the API documentation for exact results, as
the synchronous and asynchronous APIs communicate failures differently.


Synchronous and Asynchronous Calls
----------------------------------

The AMQP consumer runs in a Twisted event loop. When a message arrives, it
calls the callback in a separate Python thread to avoid blocking vital
operations like the connection heartbeat. The callback is free to use any
blocking (synchronous) calls it likes.

.. note:: Your callback does not need to be thread-safe. By default, messages
          are processed serially.

It is safe to start threads to perform IO-blocking work concurrently. If you
wish to make use of a Twisted API, you must use the
:func:`twisted.internet.threads.blockingCallFromThread` or
:class:`twisted.internet.interfaces.IReactorFromThreads` APIs.

You may also use asyncio-based asynchronous callbacks, either via an ``async``
function or via an object that has an async ``__call__()`` method. In this
case, the callback will not be run in a separate thread, it will instead be
scheduled as a regular asyncio task.


Consumer Configuration
----------------------

A special section of the fedora-messaging configuration will be available for
consumers to use if they need configuration options. Refer to the
:ref:`conf-consumer-config` in the Configuration documentation for details.


Monitoring
==========

A Fedora Messaging consumer can start a dedicated HTTP server to let users
monitor its state. You can configure the port in the configuration file under
the ``[monitoring]`` section, with ``address`` and ``port``.
If the section is empty, the monitoring service will be disabled.
The default value for ``address`` is an empty string, which means that the
monitoring server will listen on all interfaces.
There is no default value for ``port``, you will have to choose a port.

When a consumer is running with the monitoring server enabled, you can get the
following data::

    $ curl http://localhost:8070/live
    {"status": "OK"}

    $ curl http://localhost:8070/ready
    {"consuming": true, "published": 0, "consumed": {"received": 0, "processed": 0, "dropped": 0, "rejected": 0, "failed": 0}}

The ``/live`` endpoint always returns the same JSON data.
The statistics in the ``/ready`` endpoint are gathered from the start of the
service. They mean:

- ``consuming``: whether the consumer is running or not.
- ``published``: amount of messages published on the bus.
- ``consumed.received``: amount of messages received from the bus.
- ``consumer.processed``: amount of messages successfully processed by the consumer
- ``consumer.dropped``: amount of messages dropped by the consumer (by raising the
  :class:`fedora_messaging.exceptions.Drop` exception).
- ``consumer.rejected``: amount of messages rejected by the consumer (by raising the
  :class:`fedora_messaging.exceptions.Nack` exception). Those messages were put
  back in the queue.
- ``consumer.failed``: amount of message that caused another exception to be raised.


systemd Service
===============

A systemd service file is also included in the Python package for your
convenience. It is called ``fm-consumer@.service`` and simply runs
``fedora-messaging consume`` with a configuration file from
``/etc/fedora-messaging/`` that matches the service name::

    $ systemctl start fm-consumer@sample.service  # uses /etc/fedora-messaging/sample.toml


.. _AMQP overview: https://www.rabbitmq.com/tutorials/amqp-concepts.html
.. _RabbitMQ tutorials: https://www.rabbitmq.com/getstarted.html
.. _pika: https://pika.readthedocs.io/
.. _bindings: https://www.rabbitmq.com/tutorials/amqp-concepts.html#bindings
.. _queue: https://www.rabbitmq.com/tutorials/amqp-concepts.html#queues
.. _AMQP consumers: https://www.rabbitmq.com/tutorials/amqp-concepts.html#consumers
.. _exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
.. _topic exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-topic
