Building Data Pipelines With Vitess

Building Data Pipelines With Vitess

Vitess is a popular CNCF project that is used to scale some of the largest MySQL installations in the world — by companies like Slack, Square, Shopify, and GitHub. It provides sharding, connection pooling, and many other features that make it easy to scale MySQL horizontally.

Vitess and MySQL are ideally suited for use as an Online Transaction Processing (OLTP) system — where the end-user interacts directly with the system and fast response times are essential as they get product and service information, generating critical business records such as orders, user profiles, and more. They are not optimized for Online Analytical Processing (OLAP) workloads and other use cases and needs that you will encounter as your product, company, and data needs grow. This is where Change Data Capture (CDC), AKA ETL or Extract-Transform-Load, and Data Pipelines more generally come into play as they allow you to maintain in-sync copies of data across various systems that serve specific needs, with CDC being a technique used to track changes in a database and propagate them to other systems. This is useful for a variety of use cases, including data replication, data warehousing, and data integration. This allows you to e.g. maintain a Data Warehouse and/or Data Lake for analytics and reporting purposes (e.g. quarterly and yearly sales reports), or to integrate data with other systems where you need to work with data that is initially created and updated by your OLTP system.

Vitess Primitives #

Vitess has a number of primitives or building blocks that make it easy to build your data pipelines. These are features of VReplication, a powerful system that allows for various types of data replication and transformation. For CDC and similar use cases, VReplication provides the VStream API in VTGates (Vitess Gateways) that allows you to stream changes from a Vitess cluster in real-time.

This low-level VStream primitive is then used by popular CDC tools like Debezium to capture changes in Vitess and propagate them to other systems. PlanetScale also uses the VStream API to build the Connect feature, using additional open source drivers for popular CDC/ETL services such as AirByte (source) and FiveTran (source).

A Look Under the Hood at VStream #

VStream is a low-level component, provided via gRPC, that is used internally by VReplication to replicate data within Vitess for various workflow types such as MoveTables and Reshard — typically from one VTTablet to another. The VTGate VStream RPC leverages this low-level component to stream data from the Shards within a Vitess Keyspace, providing a single unified change stream spanning the logical database which may consist of hundreds or even thousands of shards. You can see a simple example client that uses the VStream API directly here.

This is what the output looks like, with commands that you can run yourself if you are interested in the lower-level aspects (not necessary if you're going to use an existing connector/driver such as the Debezium Connector for Vitess):

git clone git@github.com:vitessio/vitess.git
cd vitess
git checkout main
make build
cd examples/local

./101_initial_cluster.sh; mysql < ../common/insert_commerce_data.sql; ./201_customer_tablets.sh; ./202_move_tables.sh; ./203_switch_reads.sh; ./204_switch_writes.sh; ./205_clean_commerce.sh; ./301_customer_sharded.sh; ./302_new_shards.sh; ./303_reshard.sh; ./304_switch_reads.sh; ./305_switch_writes.sh; ./306_down_shard_0.sh; ./307_delete_shard_0.sh

go run vstream_client.go

# In another terminal, connecting to the VTGate that was started
for i in {1..10}; do
  command mysql --no-defaults -h 127.0.0.1 -P 15306 customer -e "insert into customer (email) values ('${i}@foo.com')"
done

# Cleanup whenever you're done testing
./401_teardown.sh

The VStream client will output the changes that are being streamed from the VTGate that look like this — first snapshotting the current state of the customer table in the sharded customer keyspace, before then streaming the subsequent changes to the table as they happen in real-time:

$ go run vstream_client.go
[type:BEGIN keyspace:"customer" shard:"80-" type:FIELD field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"80-" enum_set_string_values:true} keyspace:"customer" shard:"80-"]
[type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"80-"]
[type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:14 values:"4dan@domain.com"}} keyspace:"customer" shard:"80-"} keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58" table_p_ks:{table_name:"customer" lastpk:{fields:{name:"customer_id" type:INT64 charset:63 flags:53251} rows:{lengths:1 values:"4"}}}}} keyspace:"customer" shard:"80-" type:COMMIT keyspace:"customer" shard:"80-"]
[type:BEGIN keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"80-" type:COMMIT keyspace:"customer" shard:"80-"]
[type:COPY_COMPLETED keyspace:"customer" shard:"80-"]
[type:BEGIN keyspace:"customer" shard:"-80" type:FIELD field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"-80" enum_set_string_values:true} keyspace:"customer" shard:"-80"]
[type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-58"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80"]
[type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:16 values:"1alice@domain.com"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:14 values:"2bob@domain.com"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:18 values:"3charlie@domain.com"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:14 values:"5eve@domain.com"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-58" table_p_ks:{table_name:"customer" lastpk:{fields:{name:"customer_id" type:INT64 charset:63 flags:53251} rows:{lengths:1 values:"5"}}}} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80" type:COMMIT keyspace:"customer" shard:"-80"]
[type:BEGIN keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-58"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80" type:COMMIT keyspace:"customer" shard:"-80"]
[type:COPY_COMPLETED keyspace:"customer" shard:"-80" type:COPY_COMPLETED]
[type:BEGIN timestamp:1720544456 current_time:1720544456166536000 keyspace:"customer" shard:"-80" type:FIELD timestamp:1720544456 field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"-80"} current_time:1720544456168488000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10001@foo.com"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456168646000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-59"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456168652000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1720544456 current_time:1720544456182035000 keyspace:"customer" shard:"80-" type:FIELD timestamp:1720544456 field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"80-"} current_time:1720544456183630000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10012@foo.com"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456183642000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-59"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-59"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456183649000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1720544456 current_time:1720544456197796000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10023@foo.com"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456197810000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-59"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456197814000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1720544456 current_time:1720544456211383000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10034@foo.com"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456211392000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-60"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456211398000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1720544456 current_time:1720544456224248000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10045@foo.com"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456224258000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-61"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456224261000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1720544456 current_time:1720544456237018000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10056@foo.com"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456237029000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-62"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456237031000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1720544456 current_time:1720544456249777000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10067@foo.com"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456250142000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-63"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456250150000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1720544456 current_time:1720544456263391000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10078@foo.com"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456263407000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-64"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456263411000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1720544456 current_time:1720544456276388000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"10089@foo.com"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456276398000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-61"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-64"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456276402000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1720544456 current_time:1720544456289697000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:10 values:"100910@foo.com"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456289711000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-62"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-64"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456289714000 keyspace:"customer" shard:"-80"]

If you are interested in additional lower level details, you can check out the VStream API documentation.

An Example Setup #

You could use a similar setup to the one described here, but using the Debezium Connector for Vitess rather than the Debezium Connector for MySQL and an AWS RedShift instance rather than PostgreSQL as the target (with RedShift being based on PostgreSQL). This also demonstrates the general rule that in setting these kinds of systems up you would use a Vitess variant of the connector/driver rather than the MySQL one — with things otherwise being the same.

Happy streaming!