Wednesday, August 23, 2023

A unified file streaming API for local and remote storage

Oftentimes, we want a simple API for streaming IO that works seamlessly across multiple sources. I am looking for an interface that not only supports local file systems but also files on remote HTTP servers, object storage, and is extendable for any new remote storage that comes along. I want the capability to read from the stream (thus incrementing the file pointer), skip forward within the stream, and indicate to the caller that the stream is complete. The motivation for an API like this is parsing large binary file types. Unlike plain-text files which are often read from start to end, binary data demands custom types and non-sequential navigation. For example, with bgzf or DWARF files, one must first read headers at the start which indicate where to skip around the file to grab the data you actually want. 

Let's say we have a Bgzf file loaded in memory, addressed by ptr. A common pattern looks like the following:

struct __attribute__((packed)) BgzfHeader {
  uint8_t  ID1;
  uint8_t  ID2;
  ...
  uint16_t XLEN;
};
BgzfHeader header = *(BgzfHeader*) ptr; // read header
ptr += sizeof(BgzfHeader); // increment file pointer
ptr += sizeof(BgzfExtraFields); // skip extra fields
 

Let's formalize this pattern as an interface. A user will only interact with the three functions: (1) Read, (2) Skip, and (3) Done. The end result is an API that works the same on local filesystems, S3, blob storage, or HTTP, or whatever storage medium you want to add!

class FileStreamer {

public:                                                      

  using StreamSize = size_t;


  virtual ~FileStreamer() = default;                      

  virtual StreamSize Read(U8* buffer, StreamSize size) = 0; 

  virtual StreamSize Skip(StreamSize size) = 0;

  virtual bool Done() = 0;

};



While some readers may, understandably, desire to seek backwards, I will forgo implementing that. Given the provided code, adding that functionality should be a straightforward exercise. 


FileStreamer* streamer = FileStreamer::GetStreamer(file_name);
Buffer buffer = Buffer::Allocate(1 << 20);
while (!streamer->Done())
{

  streamer->Read(buffer.Data(), buffer.Size());
  // do stuff with data 
}


We'll start with the local filesystem implementation as it is straightforward, familiar, and easily testable. FYI no guarantees on the correctness of the following code. While it probably compiles, treat it as pseudocode for inspiration. 

class DiskBackedFileStreamer : public FileStreamer {

public:

  explicit DiskBackedFileStreamer(String file_name)

    : m_fd{open(file_name.CString(), O_RDONLY)}

    , m_eof{false}

  {

    ASSERT(m_fd != -1);

  }


  virtual ~DiskBackedFileStreamer() { close(m_fd); }


  StreamSize Read(U8* buffer, StreamSize size) override

  {

    StreamSize bytes_read = 0;

    int r = read(m_fd, buffer, size);

    ASSERT(r >= 0);

    bytes_read = r;

    m_eof = (r == 0);

    return bytes_read;

  } 

  

  StreamSize Skip(size_t skipBytes) override

  {

    ASSERT(skipBytes <= std::numeric_limits<off_t>::max());

    off_t pos = lseek(m_fd, skipBytes, SEEK_CUR);

    ASSERT(pos >= 0);

    return static_cast<StreamSize>(pos);

  }


  bool Done() const override { return m_eof; }  


private:

  int  m_fd;

  bool m_eof;

};



For completeness, below is a sample implementation of an HTTP streamer conforming to our FileStreamer interface. The high level takeaway is that we need to keep track of our offset into the file. This is baked into the variable m_read_head. When the user calls Read(buffer, size), a range request is crafted based on the read head for size bytes. For example, say we have done no reads (so the read head is 0) and the user requests 20 bytes. Then the header Range: bytes=0-19 is added to the request. 

class HttpFileStreamer : public FileStreamer {

public:

  explicit HttpFileStreamer(String url)

   : m_url{url}

   , m_read_head{0}

   , m_eof{false

  {

  }


  StreamSize Read(U8* buffer, StreamSize size) override

  {

    ASSERT(size >= 1);

    String range_header = FormatString("Range: bytes=%d-%d"

                                       m_read_head, 

                                       m_read_head+size-1);

    m_http.AddHeader(range_header);

    m_http.SetBodyBuffer(buffer, size); // use our buffer

    HttpResponse response = m_http.Get(m_url); 

    ASSERT(response.status == HTTP::Ok ||

           response.status == HTTP::PartialContent);

    m_read_head += response.BodySize();

               

    // update eof based on "content-range" header

    size_t range = response.GetHeader("Content-Range");

    m_eof = (m_read_head >= range); 

    return response.BodySize();   

  }


  StreamSize Skip(size_t skipBytes) override 

  {                                                   

    m_read_head += skipBytes;

    return m_read_head; 

  }

                                            

  bool Done() const override { return m_eof; }


private:

  String m_url;

  StreamSize m_read_head;

  bool m_eof;

  HttpClient m_http;

};


I felt it important to include pseudocode for how to implement the HTTP interface because there are some subtle tricks worth fleshing out. Object storage boils down to the exact same exercise as HTTP . Since S3 is built on top of HTTP, the AWS SDKs support range requests on GetObject. The header is plumbed through to the underlying HTTP request. 

Now, here is where things get slightly interesting. An astute reader, for instance, might note that following code performs terribly.

HttpFileStreamer streamer;
for(int i=0; i<N; ++i) streamer.Read(buffer, 1);

In the case of local storage, your app will be context switching like crazy as it spams read() syscalls. And the story is even worse for network storage, incurring a latency cost for every byte. Fortunately, there is an elegant fix for this which can be done without modifying any of the code we have written thus far.

The idea here is to introduce a variant of the FileStreamer, called BufferedFileStreamer. The key insight is that BufferedFileStreamer itself takes a FileStreamer as input. In this class, the upstream FileStreamer is called the "provider". If our local buffer is empty or the user issues a request that is larger than our buffer size, the Read() call is proxied to the provider and cached in memory.

The code gets a little hairy. To keep things grounded, let's remember our high level goals: Read(): exhaust our in memory buffer and fetch from our upstream provider if we don't have enough data to satisfy the request. Skip(): First, increment our pointers in memory. If the skip exceeds what we have into memory, call Skip() on the provider. With all that in mind, let's take a look at the code.

class BufferedFileStreamer : public FileStreamer {

public:

  explicit BufferedFileStreamer(

      UniquePtr< FileStreamer > provider, 

      size_t buffer_size = (1 << 20))

   : m_provider(std::move(provider))

   , m_buffer{Buffer::Allocate(buffer_size)}

   , m_read_head{m_buffer.Data()}

   , m_buffer_bytes_left{0}

  {                                                          

  }  


  StreamSize Read(U8* output, StreamSize size) override

  {

    StreamSize bytes_read = 0;

    while (!m_provider->Done() && size > 0

    {

      if (m_buffer_bytes_left == 0

      { 

        // memory exhausted. read from our upstream provider

        size_t r = m_provider->Read(m_buffer.Data(), 

                                    m_buffer.Size());

        m_buffer_bytes_left = r;

        m_read_head = m_buffer.Data(); 

      }

      

      // copy from our internal buffer to the user's

      size_t s = MIN(size, m_buffer_bytes_left);

      ::memcpy(output, m_read_head, s);

      bytes_read += s;                                     

      output += s;

      m_read_head += s;

      size -= s;

      m_buffer_bytes_left -= s;

    }                                                              

    return bytes_read;

  }  


  StreamSize Skip(StreamSize skipBytes)

  {

    if (m_buffer_bytes_left >= skipBytes) 

    {

      // skip is entirely in our memory buffer

      m_buffer_bytes_left -= skipBytes;

      m_read_head += skipBytes;

      return skipBytes;

   

    else 

    { 

      // first, skip remaining data in memory.

      StreamSize skipped = m_buffer_bytes_left;

      StreamSize remaining = skipBytes - m_buffer_bytes_left;                                                         

      // second, skip on our upstream provider

      skipped += m_provider->Skip(remaining); 

      

      // adjust pointers

      m_read_head = m_buffer.Data();

      m_buffer_bytes_left = 0;

      return skipped; 

    }

  }                                                                                                                           

  bool Done() const 

  { 

    return m_buffer_bytes_left == 0 && m_provider->Done(); 

  }                                                        

  

  UniquePtr< FileStreamer > m_provider;

  Buffer                    m_buffer;

  uint8_t*                  m_read_head;

  size_t                    m_buffer_bytes_left;

};



Below, I show how utilize this new buffered streamer atop an HTTP streamer.

BufferedFileStreamer streamer(MakeUnique<HttpFileStreamer>(url));

Now, when a user calls streamer.Read(buffer, 1), behind the scenes an HTTP request is made for a 1MB and is stored in memory. Then, if he calls Read(buffer, 1) in a loop another million times and only a single HTTP request is issued. 

In conclusion, the FileStreamer API offers a unified interface that simplifies the reading of binary files across different local and remote storage mediums. Often, the simplest interfaces prove the most adaptable. By introducing in memory buffering we capture the convenience of C++ stringstreams without adding any new structures or sacrificing fine-grained control. Lastly, while the FileStreamer may be considered too minimalist for some, it is robust enough to serve as a foundation for higher-level constructs.

A unified file streaming API for local and remote storage

Oftentimes, we want a simple API for streaming IO that works seamlessly across multiple sources. I am looking for an interface that not only...