Skip to content

Totally async EventBridge

Man entering the backdoor

I'm a big fan of the CloudWatch Embedded Metrics Format. It lets you write metrics to the lambda log without the overhead of a service call. Alas, there is no Embedded EventBridge Format, you need to make a sync call to EventBridge to put events onto the bus. Can we make it more asynchronous? In this post, I'll describe a back door to publishing events to EventBridge by just writing JSON to the lambda log.

The trick

CloudWatch has a feature called SubscriptionFilters which enables you to filter your logs for a specific pattern and route them somewhere (either kinesis or lambda) for processing. In this case, we'll use a lambda to convert the log entries into PutEvents calls to EventBridge. Here's a diagram of what we're going to do:

Sequence Diagram

Writing events to the log

The first step is to write an event to the lambda log. I use the PutEventsCommand type to build an object, stringify it into JSON and write it to the log with the prefix EVENT_BRIDGE:

let parsedBody = JSON.parse(event.body!);
let putEventsCommand = new PutEventsCommand({
      Entries: [
        {
          EventBusName: process.env.DEFAULT_EVENT_BUS_NAME!,
          Source: `cc.speedrun.${context.functionName}`,
          DetailType: 'Lambda Function Invocation',
          Detail: JSON.stringify({ message: parsedBody.message }),
        },
      ],
    });
console.log('EVENT_BRIDGE: ' + JSON.stringify(putEventsCommand.input));

Publishing events

Assuming there is a SubscriptionFilter on EVENT_BRIDGE, now we just need to rehydrate the PutEventsCommand and publish it to EventBridge.

const payload = Buffer.from(event.awslogs.data, 'base64');
const unzippedEvent = await gunzip(payload);
const parsedEvent = JSON.parse(unzippedEvent.toString('utf8'));
if (parsedEvent.messageType === 'DATA_MESSAGE') {
  for (const logEvent of parsedEvent.logEvents) {
    const message = new PutEventsCommand({
      Entries: JSON.parse(
        logEvent.message.substring(logEvent.message.indexOf('{') - 1)
      ).Entries,
    });
    let result = await eventBridgeClient.send(message);
    if (result.FailedEntryCount! > 0) {
      throw new Error('Not all events were sent to EventBridge');
    }
  }
}

Thoughts and limitations

Pros

  • No direct dependency on event bridge in latency/availability sensitive code
  • More opportunity for batching event publishing
  • You can search for events in CloudWatch Logs

Cons

  • Added latency overhead of SubscriptionFilter/Lambda
  • It's possible to log an invalid event
  • You need to encrypt PII because it will be logged

This approach is a useful trick in certain use cases. It works best for things like API billing events or notifications where you don't want to impact the latency of your API but a second or two of additional latency to process the event is acceptable. It can also be applied in cases where you have a low power/low capability device that periodically connects to the internet to publish logs.

Code

I've published a sample CDK project that demonstrates this technique here. It is meant to illustrate the technique only, the error handling isn't sufficient for production use. I have not included the EventBridge rule/destination to publish my messages to Slack because I created it with ClickOps roughly following this article. It used my Bot User OAuth Token to authenticate with Slack with the chat:write scope.

To invoke the lambda, I used this speedrun block.

Further Reading

Event Bridge documentation