'How to read Protobuf files with Dask?

Has anyone tried reading Protobuf files over Dask? Each Protobuf file I have, has multiple records, and each record is prefixed with the length of the record (4 bytes) as shown in the snippet.

This is what the current code to read/parse these files looks like:

import mystream.MyPacket_pb2 as pb
import struct
​
def read_uint32(data): # little endian
    return struct.unpack('<L', data)[0]
​
def parse(filepath):
    packets = []
    prefix = 4
    pos, data = 0, open(filepath, 'rb').read()
    count = 0
    while pos < len(data):
        next_pos = read_uint32(data[pos:pos+4])
        packet = pb.MyPacket()
        packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
        pos += prefix + next_pos
        packets.append(packet)
    return packets
​
# Some more processing to follow after

The code snippet standalone runs fine, but fails over Dask when using a simple distributed.Client.submit()/map() with the error: Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object.

I also left a comment on the only GitHub discussion I could find on Protobuf & Dask here. This is a pretty interesting discussion, but it's been 6 years already.

I'd love to hear if any one has ideas on how we can get something like this to work, if at all possible.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source