===================
== Nathan Matare ==
===================
🌎

Streaming petabytes (no relation to uber/petastorm)

Using protobufs to connect to BigQuery’s streaming v2 API

Earlier last year, Google deprecated their old BigQuery “InsertAll” API and bequeathed to the world a gRPC based streaming service that gives you the ability to stream data directly from BQ with Cloud Storage-like network latency. Pretty impressive.

This is important if you’re coming from the TensorFlow machine learning world because now you can stream data directly from BiqQuery tables directly into your graph session. I’m sure there are also some other neat tricks one can deploy if you’re coming from the PyTorch world…

Check-out the tensorflow-io implementation here to give you an idea.

Previously, if you needed to upload/download data to/from BigQuery you had one of two options:

  1. Stream it via the cost prohibitive legacy streaming API; or
  2. Batch upload it via a scheduled job or from multiple flat-files.

Unfortunately, there’s not much in the way of documentation for how exactly to do this, and–as of this writing, it’s still in the early beta-phases.

So to get familiar with this system, I hacked together some scripts to give the API a whirl.

Be warned, this code is fairly dense as I ripped it out of a production codebase, but it’ll give you a high-level understanding of how to use the protobufs with the streaming API.


For this bit, I’m going to write data from a Kafka consumer and sync it to the BQ tables. It all happens in-memory within the Python VM.

The first thing one has to do is figure out which “stream” to use, and build an appropriate protobuf message.

Here, I’m using the _default stream and writing a proto message that matches the request. But there are multiple types of streams you can deploy for your use-case. I refer you to the actual documentation for more details.

An AppendRowsRequest template defining the write stream.

def _build_proto_request_template(
    self, proto_data: bigquery_types.AppendRowsRequest.ProtoData,
) -> bigquery_types.AppendRowsRequest:
    request_template = bigquery_types.AppendRowsRequest(
        write_stream=(
            f'projects/{self._project_id}/datasets/{self._dataset}/'
            f'tables/{self._table_name}/_default'
        )
    )

    proto_schema = bigquery_types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    record_proto_lib.RowRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor

    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data
    return request_template

Cool. The above is the request. And now I need to generate a protobuf schema. This is a little opaque as the schema is dynamically generated from a list of fields. But essentially, you need to define the schema of your table in each request and ship that schema and the message over the wire.

def write_protobuf(
    fields: List, product: str, header: str = 'RowRecord',
) -> Dict:

    _machine_generated_prefix = (
        '// -*- coding: utf-8 -*-\n'
        '// Generated by the fabric system.  DO NOT EDIT BY HAND!\n\n'
        'syntax = "proto2";\n'
    )

    location = os.getenv(
        'ENV_SETUP_FILE', pathlib.Path(__file__).parents[1] / 'schema',
    )

    entry = ''
    for index, field in enumerate(fields, start=1):

        lower = BIGQUERY_TO_BIGQUERY_STORAGE_DTYPE_MAPPING[
            field.field_type
        ].lower()

        if field.mode == 'NULLABLE':
            mode = 'optional'
        else:
            mode = 'required'

        entry += f'\n\t{mode} {lower} {field.name} = {index};\n'

    message_proto = os.path.join(location, 'storage_record.proto')

    if not os.path.exists(message_proto):

        current_dir = os.getcwd()

        os.chdir(location)

        sys.stdout.write(
            f'Writing protobuf record for product "{product}" '
            f'to {location} \n'
        )

        with open(message_proto, mode='a+') as file:
            file.write(_machine_generated_prefix)
            file.write('\nmessage %s {\n' % header)
            file.write(entry)
            file.write('}\n')

        cmd = subprocess.run(
            [
                'protoc',
                f'--python_out=.',
                os.path.relpath(message_proto),
            ]
        )

        os.chdir(current_dir)

        if cmd.returncode:
            raise IOError(
                'Could not compile the "%s" message proto. Errors: %s-%s'
                % (message_proto, cmd.stderr, cmd.stdout)
            )

    spec = importlib.util.spec_from_file_location(
        'module.name', os.path.join(location, 'storage_record_pb2.py'),
    )

    record_lib = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(record_lib)

    global record_proto_lib

    record_proto_lib = record_lib

Here’s the defined schema, protobuf, and finally the client.

table_schema = schema_lib.build_table_from_schema(  # blocking network call
    schema=self.schema,
    product=self.product,
    environment=self._environment,
    partition_type=self._BIGQUERY_TABLE_PARTITION_TYPE,
    expiration_ms=self._BIGQUERY_TABLE_ROLLING_EXPIRATION_MS,
    require_filter=self._BIGQUERY_TABLE_REQUIRE_FILTER,
    description=self._BIGQUERY_TABLE_DESCRIPTION,
    time_partition_field=c.EXCHANGE_TIME,
)

self._write_client = (
    bigquery_storage.big_query_write.BigQueryWriteAsyncClient()
)

write_protobuf(table_schema, self.product)

For each row, you’ll need to define a request, serialize the data, and ship it over the wire:

async def _make_stream_records_request(self, records: List):
    def create_proto_row(row: Dict):
        proto = record_proto_lib.RowRecord(**row)
        return proto.SerializeToString()

    serialized_rows = []
    for c_record in records:

        assert c_record.key.decode('utf-8') == self.product

        serialized_rows.append(
            create_proto_row(c_record.value.to_storage()[0])
        )

    proto_data = bigquery_types.AppendRowsRequest.ProtoData(
        rows=bigquery_types.ProtoRows(serialized_rows=serialized_rows,)
    )

    request = self._build_proto_request_template(proto_data)

    return await self._write_client.append_rows(iter([request]))

And finally, I have an application that batch reads data from the consumer, and ships the data asynchronously.

There’s also an AsyncBigQueryStorageReadClient for the interested reader, but for this example, the data is simply uploaded via the sync client.

@core.service.catch_exceptions
async def on_main(self) -> None:

    await self.log(
        core.service.msg(self)
        % (
            f'Writing records to {self._project_id}:{self._dataset}:'
            f'{self._table_name} via the BigQuery Storage Streaming API'
        )
    )

    tasks = []
    with contextlib.suppress(aiokafka.errors.ConsumerStoppedError):
        while 1:

            batch = await asyncio.wait_for(
                self.consumer.getmany(timeout_ms=self.timeout * 1000),
                self.timeout + 1,
            )

            for records in batch.values():
                if records:
                    tasks += [self._make_stream_records_request(records)]

            await asyncio.gather(*tasks)
            tasks.clear()

Woot: and there you have it. Realtime streaming data from Kafka directly into BigQuery!

The same principle can be used in the opposite direction. You can stream data directly from BiqQuery into your program. Neat.