Kinesis Firehose writes concatenated JSON objects to S3. Most python solutions to reading the data rely on pre-processing with a lambda, or struggle to deal with large files. A better solution is to use a streaming parser. Luckily, I have just the thing.

As part of building out our home-brewed stats system, we need to write a couple of lambdas to read the stored metrics. These metrics are written as JSON blobs to S3 by Kinesis Firehose. The individual hits are recorded as json objects, but Firehose has an interesting quirk - it writes files full of json objects concatenated together, with no delimiter.

For example, one hit in our schema looks like this

{
  "u": "https://codefiend.co.uk/parsing-kinesis-firehose-json-in-python",
  "r": "www.google.com",
  "t": 1644670501,
  "v": 1900
}

but when we receive multiple hits in a short space of time, they're written to the same file, in a continuous stream like this:

{ "u": "https://codefiend.co.uk/parsing-kinesis-firehose-json-in-python", "r": "www.google.com","t": 1644670501, "v": 1900}{"u":"https://codefiend.co.uk/","r":"t.co","t":1644670500,v:500}{"u":"https://codefiend.co.uk/tackling-the-delivery-service-kata","r":"t.co","t":1644670517,v:950}

This breaks the python json parser, which is only built to handle a single object at a time. I've seen some solutions around the internet that either use a lambda to add delimiters to the data1 before it's written to S3, or read the whole file into memory, and split it apart2 before processing it. What we'd like is to be able to stream large files, pulling out json objects as we encounter them. I found it surprising that there wasn't much information on how to deal with Kinesis' default processing mode in Python, which is probably the most common data-enginering language in AWS.

Streaming JSON for fun and profit

There are a few streaming JSON parsers out there already. Before writing this post I did a few spikes, with NAYA3 and Yajl-py4. Both seemed like reasonable options, but then I stumbled across the raw_decode method of the JSONDecoder in the python standard library.

The raw_decode method does almost exactly what we want - it reads a string and extracts a JSON object from it, ignoring any data after the object closes. It also returns the index of the remaining data, so we can call the method a second time and read the next object.

Reading the source code for JSONDecoder, we find that internally, it's using a scanner function. This is the magic we need. The scanner reads a string, and pulls a json object from the beginning. It then returns that object and the index of the string where the object finished. If there's no object found, it raises StopIteration

from json import scanner, dumps
from json.decoder import JSONDecoder

import pytest

def test_scanner():
    decoder = JSONDecoder()
    
    # A scanner uses a Json Decoder to produce objects
    scan = scanner.make_scanner(decoder)

    datum = {"A": 1, "B": 2}
    data = dumps(datum) * 2

    # When we scan, we get back the first json object in the string
    result, idx = scan(data, 0)
    assert result == datum

    # We can scan a second time from the previous index
    result, idx = scan(data, idx)
    assert result == datum

    # And when we reach the end of the string, we get StopIteration
    with pytest.raises(StopIteration):
        scan(data, idx)

I've written a little package called firehose-sipper using this technique that can stream large files straight out of S3, and read the concatenated JSON objects.

import firehose_sipper

# Read a single file out of S3

for entry in firehose_sipper.sip(bucket=some_bucket, key=some_key):
    print(entry)
    
# or go nuts and read all objects under a prefix
for entry in firehose_sipper.sip(bucket=some_bucket, prefix=some_prefix):
    print(entry)

Now I can leave the Firehose to its default configuration, skip the extra lambdas, and read the results with a one-liner.