A Modern C++ Kafka API | Kenneth Jia, Morgan Stanley

HostedbyConfluent 2,592 views 20 slides Jun 08, 2021
Slide 1
Slide 1 of 20
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20

About This Presentation

We wanted to embed a Kafka producer/consumer in C++ and decided to use ""librdkafka"", a robust C/C++ library that is open source, well-maintained, and widely used.
The C++ interface of ""librdkafka"" is confined to C++ 98 for compatibility, which makes it les...


Slide Content

Kenneth Jia Kafka Summit EMEA 2021 A Modern C++ Kafka API

New Requirements for Our Pub/Sub System The Story About our team The messaging HUB New requirements coming HUB publisher subscriber HUB subscriber Persist to Kafka Replay from Kafka

What does a Kafka Client Like The Story Properties Producer ProducerRecord producer.send (…) producer.close () https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

We Came to librdkafka The Story https://github.com/edenhill/librdkafka

The Example of a “ librdkafka ” Producer The Story https://github.com/edenhill/librdkafka/blob/master/examples/producer.cpp , stripped for brevity Properties Producer

The Example of a “ librdkafka ” Producer The Story producer.send (…) producer.close ()

We Worked Out a Better Choice -- modern- cpp - kafka API The Story New C++ Features (no C++98 compatibility) Header-Only Ease of Use Naming matches the Java API RAII is used for lifetime management Polling and queue management is hidden Efficient No deep copy introduced Robust Unit/integration/robustness testcases

Properties A Modern Kafka Producer “Java style” kafka ::Properties props; props.put ({" bootstrap.servers ", brokers); props.put (" enable.idempotence ", "true"); Initialization list kafka ::Properties props ({ {" bootstrap.servers ", brokers}, {" enable.idempotence ", "true"}, });

ProducerRecord A Modern Kafka Producer The record type for producer to send auto record = ProducerRecord (topic, key, value); auto record = ProducerRecord (topic, partition, key, value); Key, Value are only “thin wrappers”! const_buffer lifetime management

Producer A Modern Kafka Producer KafkaSyncProducer auto producer = kafka :: KafkaSyncProducer (props); … try { auto metadata = producer.send (record); … catch (const kafka :: KafkaException & e) { … } KafkaAsyncProducer auto producer = kafka :: KafkaAsyncProducer (props); … producer.send (record, [](const kafka ::Producer:: RecordMetadata & metadata, std:: error_code ec ) { … }); close() No need to care about the internal queue management Even no need to explicitly call it. RAII would take care of it

An Example A Modern Kafka Producer

Transaction Support A Modern Kafka Producer Note: The error handling is simple – only need to deal with exceptions thrown from the transaction API.

ssssssss The Example of a “ librdkafka ” Consumer A Modern Kafka Consumer https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consume_batch.cpp , stripped for brevity Note: consumer_batch

The Example of a “ librdkafka ” Consumer A Modern Kafka Consumer Note: The “batch” is based on, count timeout

A Modern Kafka Consumer consumer.subscribe (…) Could register rebalance event callbacks as well The “subscribe(…)” call would wait for the partitions-assigned event consumer.subscribe ({topic}, [](Consumer:: RebalanceEventType et, const TopicPartitions & tps ) { if (et == Consumer:: RebalanceEventType :: PartitionsAssigned ) { std:: cout << "Assigned partitions: " << toString ( tps ) << std:: endl ; } else { std:: cout << "Revoked partitions: " << toString ( tps ) << std:: endl ; } });

A Modern Kafka Consumer consumer.poll (…) Messages count for “batch” Config in Properties Default value: 500 props.put (" max.poll.records ", 1000); Timeout for “batch” auto records = consumer.poll (std::chrono::milliseconds(100));

A Modern Kafka Consumer Auto-Commit Consumer Note: Commits its position before each poll (not after the poll!), effectively acknowledging the messages received during the previous poll.

A Modern Kafka Consumer Manual-Commit Consumer Note: More control over the scheduling of commits commitSync commitAsync

A Modern Admin Client AdminClient Create topics adminClient.createTopics ({topic1, topic2}, partitions, replicationFactor , kafka ::Properties{{ {" message.timestamp.type ", " CreateTime "} }}); Delete topics adminClient.deleteTopics ({topic1, topic2}); List topics auto listResult = adminClient.listTopics ({topic1, topic2}); if ( listResult.errorCode ()) { std:: cerr << "Error: " << listResult.message () << std:: endl ; } else { std:: cout << "Topics: " << kafka :: toString ( listResult.topics ) << std:: endl ; }

The End The modern- cpp - kafka API provides a safe, efficient and easy to use way of producing and consuming Kafka messages. After replacing a legacy implementation with it, throughput for a key middleware system was improved by 26%! Now it has been open sourced! https://github.com/Morgan-Stanley/modern-cpp-kafka . We are actively maintaining and improving the project. And new components, such as streamer and connector are also on the roadmap. To get it done, we'd like to have more developers to be involved. Welcome to join the project and contribute!