NSQ Queue Reader Best Practices

Introduction

This post should serve as a guide for best practices when dealing with services that consume messages from queues and process those messages (we refer to them as QRs or ‘Queue Readers’). The best practices detailed below are from the perspective of both general programming idioms as well as useful performance patterns.

We also are focusing primarily on QRs that use the NSQ data pipeline and specifically for services written in Python (although I imagine most of the items discussed could translate well enough to your queue mechanism of choice).

As with all ‘best practice’ guidelines, they are just that …guidelines. Not everything listed here will be applicable for your needs. So remember to start by verifying your own application’s requirements and specific use cases.

Ephemeral Channels?

Imagine your server instance needs to be restarted, or its nsqd daemon (which receives, queues, and delivers messages to clients) is unexpectedly terminated, or maybe the nsqd exceeds the allocated mem-queue-size (which determines the number of messages that should be kept in memory).

Normally this would mean messages in the queue would be lost. If you’re OK with that scenario and its outcome, then you should append #ephemeral to your channels…

nsq_channel: 'qr_name_goes_here#ephemeral'

Otherwise, the default behaviour for NSQ queues is to persist messages on disk. Which you choose will depend on your application and how critical you feel the messages are.

Fail quickly

When processing a high throughput of messages it’s beneficial to identify invalid messages quickly, then mark them as “processed” so you can exit your handler as quickly as possible and so not cause undue processing stress on your application and/or upstream service(s).

You should wrap potentially problematic code in a try/except (e.g. a function that makes HTTP requests can have multiple types of exceptions raised). Doing this means you can isolate that specific call and handle the failing scenarios appropriately.

Verify your message handling logic

You should understand the complete request flow of your message handling function(s) and be sure you are correctly dropping and/or re-queuing messages at the appropriate places within your application code. It’s very easy to not re-queue (or drop messages) by mistake.

When processing messages synchronously you typically just return True (message was processed) or False (requeue this message) from your handler. But in order to process messages asynchronously you need to call nsq_msg.enable_async() and then you’ll need to make sure you explicitly return either nsq_msg.finish() or nsq_msg.requeue().

Be wary of global variables

Most of the time global variables can be more performant as you’re reusing a pointer to some data, but there are some cases where a long-living (and large) global object (such as a boto S3 connection) might end up leaking memory. This is something that should be measured and verified using the appropriate Python profiling tools first though.

Instrument timers around your primary message handler

It’s important to be able to identify anomalies in the performance of your message handlers. By using a decorator to time the function you can set up appropriate dashboards and alarms.

from your_metrics_abstraction import metrics

@metrics.timed("message_handler.time")
async def message_handler(nsq_msg):
    ...

Pynsq doesn’t support coroutines

The pynsq library only supports a ‘callback’ form of asynchronous message processing. Meaning if you were to define a message handler using a decorator like @gen.coroutine or a native async syntax (either one will convert the function into a coroutine) it will end up breaking the QR application by exiting the handler immediately.

See the next section for an example code snippet that works around this issue by utilising Tornado’s ioloop directly to schedule the handler’s asynchronous execution.

Prevent messages backing up in the queue

Messages can build up and cause alarms to fire if they are not pulled from the queue and successfully processed by your application in a timely fashion. You can help resolve this by either configuring the nsq.Reader#max_in_flight attribute and/or processing your messages asynchronously.

from tornado import ioloop

async def coro_handler(msg):
    # ... do stuff
    return msg.finish()

def handler(msg):
    msg.enable_async()
    ioloop.IOLoop.current().add_callback(coro_handler, msg)

You can also look to tweak the nsq.Reader#max_tries attribute, which defines the number of times a message can be requeued before it is permanently dropped (this prevents cyclic errors).

There is also the nsq.Message#touch method which lets you indicate to the NSQ daemon that you need more time to process the message and thus postpone (for a little while at least) the message processing from timing out and being automatically re-queued (depending on the setting of the max_tries attribute).

Avoid API libraries auto-retrying expensive operations

Some API libraries, such as boto, allow you to configure it so that operations are retried N number of times before finally failing. This can be helpful to ensure a temporary network blip or error doesn’t cause a message to be unnecessarily dropped or requeued. But this can also bring a performance overhead if the operation in question is very slow. Review the API calls you are making and evaluate how expensive they are. In some cases you might prefer to configure “retries” off and have NSQ handle these temporary errors (i.e. by re-queuing messages).

Below is an example of how to configure boto to not retry operations:

s3_resource = session.resource("s3", config=Config(
    connect_timeout=2, 
    read_timeout=2,
    retries={'max_attempts': 0}
  )
)

Note: as per the example above, it’s worth tweaking the connection/read timeouts as well. For example we noticed that calls for .xml files from S3 were really slow and so in that service we had to increase the read_connection by a significant amount (but not too much; you don’t want the client to sit hanging for a long period of time, so it requires some fine tuning to get it right).

Place blocking IO operations into a thread pool

Some libraries do not provide asynchronous support (such as Python’s redis library). So if your message handler is asynchronous, and you’re also executing a potentially long running blocking operation (such as an S3 object GET), then this will end up causing your application to block the ioloop and prevent concurrently handling multiple messages.

from app.threadpool import run_on_executor

async def message_handler():
    result = await run_on_executor(fn, arg1, arg2, ...)

Then the app.threadpool referenced in the above snippet would look something like:

from tornado import gen
from concurrent.futures import ThreadPoolExecutor

from bf_rig import settings


THREAD_POOL = ThreadPoolExecutor(settings.get('pool_max_workers'))  # 10


@gen.coroutine
def run_on_executor(*args, **kwargs):
    result = yield THREAD_POOL.submit(*args, **kwargs)
    raise gen.Return(result)

The above example needs to use a Tornado decorator as ThreadPoolExecutor doesn’t work with native coroutines. It would require the use of asyncio.wrap_future which isn’t much better than just using Tornado’s own decorator.

Note: the ThreadPoolExecutor will only help you deal with IO bound tasks that need to be handled asynchronously (and whose library doesn’t support natively). If the task to be executed is actually CPU bound then you’ll want to utilise a ProcessPoolExecutor instead.

Rate limit yourself

In a service where there’s a potential for lots of duplicate messages it can be useful to implement some simple rate limiting logic. In one of our QR services we use Redis to track duplicate requests and then execute some basic rate limiting logic in order to prevent overwhelming any upstream services that would otherwise be called.

Note: be aware that the rate limit you set can cause unwanted side-effects. For example, if you start to requeue messages during a rate limit period, you may start to see that messages aren’t being processed quickly enough and so the queue depth will begin to increase (i.e. the queue will start to backup and fill up) and this might cause monitors (e.g. systems like Datadog/Nagios) to trigger.

Disable yourself

Consider your upstream services and identify if there’s ever a point where your service needs to stop making requests to it. Most services will be sat behind an API Gateway so they’ll likely enforce rate limiting on you. But that might not always be the case.

One example of this is a QR service which makes requests to a separate rendering service for HTML content to be backed up into AWS S3. There are periods where this rendering service will dynamically purge its cache (both its internal application cache, and also the outer CDN cache layer). In order to prevent the QR service from overloading the rendering service during this period where it’s vulnerable(†), we automatically disable the QR service (we use a shared redis cluster to identify the switch in a key value; so we change it from disabled to enabled).

† due to it having no cache! none of these services we have are vulnerable in the security sense, as they’re internal access only within a VPC

The below example demonstrates an implementation used in one of our QR services, which was to use a Python decorator:

from app.foo import toggle

@toggle('status')
def _message_handler(nsq_msg, *args, **kwargs):
    nsq_msg.enable_async()
    ioloop.IOLoop.current().add_callback(message_handler, nsq_msg)

The app.foo code then looked something like the following:

def toggle_qr(status_key: str) -> Callable:
    """
    When status_key is set to 'stopped' in redis, 
    this decorator will finish the nsq message and return immediately, 
    otherwise it will proceed with event handling.

    Arguments:
        status: name of the status key in redis.
    Returns:
        Wrapped message handler.
    """
    def deco(function):
        @wraps(function)
        def wrapper(nsq_msg: Message, *args, **kwargs):
            assert 'redis' in kwargs, 'redis parameter is required'
            redis = kwargs['redis']
            try:
                status = redis.get(status_key)
            except Exception as e:
                status = None
            if status == b'stopped':
                nsq_msg.finish()
                return
            return function(nsq_msg, *args, **kwargs)
        return wrapper
    return deco

Drop or Requeue?

Consider the previous section about disabling a QR service in times where it might be necessary to protect an upstream (e.g. where rate limiting yourself maybe doesn’t make sense, or being rate limited by the upstream isn’t possible), you might then need to make a decision about what you do with the messages that are building up in the message queue.

Those messages will eventually reach a threshold and in some cases it might make more sense to not requeue messages while the QR service is in ‘disable’ mode, but instead just drop them completely. The answer, and your approach, will depend on the message source itself: are they messages that you can afford to drop? are they generated fairly regularly?