Logo
← Back

Handling dead letters in a streaming system

logoChakravarthy Varaga| Jan 10, 2023

In a streaming data system, a dead letter message is a message that cannot be processed successfully by the system. This can occur for a variety of reasons, such as if the message is malformed or if it contains invalid data.

When a dead letter message is encountered in a streaming data system, it is typically moved to a dead letter queue for further processing. This allows the system to continue processing other messages without being blocked by the dead letter message. The dead letter queue is a data structure that is used to store dead letter messages until they can be handled appropriately.

There are several strategies that can be used to handle dead letter messages in a streaming data system. For example, the system might attempt to automatically fix the issues with the message and then re-process it. Alternatively, the system might send an alert to a human operator, who can then investigate the issue and take appropriate action. In some cases, it may be necessary to discard the dead letter message if it cannot be processed successfully.

Deal letter messages are an important aspect of streaming data systems, as they allow the system to continue processing messages smoothly, even when problems arise.

Requirement

It is generally hard to achieve zero data loss with dead letter messages in streaming systems, as there will always be some messages that cannot be processed successfully. However, there are steps that can be taken to minimize the impact of dead letter messages and to minimize data loss.

One approach is to use a dead letter queue to store dead letter messages, as described in the previous answer. This allows the system to continue processing other messages without being blocked by the dead letter message. The system can then attempt to process the dead letter message at a later time, or alert a human operator if the issue cannot be automatically resolved.

Another approach is to use message acknowledgement and retry mechanisms to help ensure that messages are delivered and processed successfully. For example, the system might send a message multiple times if it is not acknowledged by the recipient, or it might use a message broker to store messages until they can be delivered.

Finally, it is important to have robust monitoring and alerting in place to identify and address issues with the system as they arise. This can help to minimize the impact of dead letter messages and to prevent data loss.

Design overview

Firehose comprises of the following key components:

  • Consumer: Kafka consumer threads that consume data from Kafka topic partitions in batches.
  • BQ Worker: Worker threads that process the batch and push it to BigQuery for a Firehose BigQuery sink.
  • Committer Thread: Threads that commit the offset to Kafka
  • Blocking Queues: Each thread group is decoupled through Queues. This design enables us to vertically scale individual components based on the application’s requirements.

The worker threads work on the batched/polled consumer records from Kafka, deserialise using protobuf schema, flatten the data set, and store the records into BigQuery tables.

Problem statement

Firehose can be used to push tens of thousands of Gigabytes of data per day. Firehose instance from one topic deserialises the bytes based on the schema enabled for this topic. Having a Firehose instance/topic/schema allows to scale based on ingestion volumes or throughput for each topic individually.

Erroneous events that are part of the data stream could fail different parts of the system, which can lead to constant crashes — eventually building the kafka consumer lag, resulting in data loss.

Unbounded Data are streaming events pushed by various producers in the system (application domain). Events confirm to a protobuf schema. Sometimes, the producers supply invalid records that fail our processing. In general, here are some erroneous types of events:

  • Invalid Data: These are semantic validation errors where the source protobuf schema is intact, but the data is invalid.
  • Invalid Schema: This is when the source protobuf schema is invalid. Cases like a retyped field fall into this category.
  • Partition Key Out Of Range: This is a case where the schema has a timestamp field that falls outside the allowed range, say, 1 year in the future. One of the mandatory fields in each event, a ‘timestamp’ type, is on which the Big Query tables are partitioned. This field signifies events produced time and is set by the producers.

Big Query rejects inserts when the event time data falls outside certain ranges as mentioned above. When this happens, the worker threads bomb out of the stack gracefully and the internal queue fills up without processing. The consumer threads fill-up the bounded queues since BQ consumer sink does not process them, timing out eventually, breaking from the thread, and thus failing the Firehose instance. The subsequent restart of the instance starts with the uncommitted offset, which has these invalid records failing Firehose again.

When Firehose is on a constant CrashLoop it leads to enormous unprocessed volumes of data sitting in Kafka, with eventual back pressure building up. The invalid data that broke the instance needs to clear up. Manual intervention is needed to enable Firehose to continue processing the valid records. This is done by changing the consumer group with the latest offset (which leads to data loss). In such cases troubleshooting is expensive.

Storing dead letters

Firehose can configure a ‘Dead Letter Store’ for out of range partition keys, which enables the following:

  • Unprocessed messages are stored in a separate store, and they can be exposed to diagnosis for applications.
  • Improves reliability.
  • Includes metrics to segment applications that produce these messages.
  • Alerts relevant applications.

Among other solutions could be to publishing these erroneous records onto Kafka. With the deployment architecture, as described above, having a Kafka dead letter topic per Firehose deployment is cumbersome. There were a few challenges with this approach:

  • Additional overhead in managing the DLQ topics. The number of topics needed would be: ~ (no of clusters * no topics that run Firehose). This would be thousands of topics overall!
  • Overhead in running additional instances (instance/topic or schema) to store these messages to an additional store.
  • Increased cost to maintain the cluster.

As an alternate solution, Firehose dumps these invalid messages (dead letters) to Blob storage (e.g. Google Cloud Storage).

Key aspects

  • The validity of the record is not determined when an event is consumed as it will increase the processing latency and reduce the throughput. We handle invalid messages only on failure to insert a batch to sink.
  • On failure to push to sink, we assess the validity of the failed batch, split the records into those that are valid and invalid. The invalid records are stored in GCS. The valid records are retried to push into sink. This is synchronously processed in the same Worker thread.
  • The error records are gzip-compressed and stored as GCS object, partitioned into respective topic/dt=date-time paths.
  • Firehose also update metrics on the processed erroneous records which are shown in Grafana. Alerting can be enabled on these metrics.
Share