Apache Samza: Reliable Stream Processing Atop Apache Kafka and Hadoop YARN

blueboxtraveler 7,581 views 29 slides Oct 22, 2013
Slide 1
Slide 1 of 29
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29

About This Presentation

Overview of Apache Samza presented to the London HUG, October 22, 2013


Slide Content

Apache Samza Reliable Stream Processing Atop Apache Kafka and Hadoop YARN Jakob Homan London HUG

Who I am Samza for five months B efore that Hadoop , Hive, Giraph Say hi: @ blueboxtraveler

Things we would like to do (better)

Provide timely, relevant updates to your newsfeed

Update search results with new information as it appears

Sculpt metrics and logs into useful shapes

Tools? Response latency Samza Milliseconds to minutes RPC Synchronous Later. Possibly much later.

Frame(work) of reference Classic Hadoop Samza Storage layer Execution engine API HDFS Kafka Map-Reduce YARN map(k, v) => ( k,v ) reduce(k, list(v)) => ( k,v ) process( msg ( k,v )) => msg ( k,v )

Storage layer: Kafka

Apache Kafka Persistent, reliable, distributed message queue Shiny new logo!

At LinkedIn 10+ billion writes per day 172k messages per second (average) 55+ billion messages per day to real-time consumers

Quick aside… Kafka: First among (pluggable) equals LinkedIn: Espresso and Databus Coming soon? HDFS, ActiveMQ , Amazon SQS

Kafka in four bullet points Producers send messages to brokers Messages are key, value pairs Brokers store messages in topics for consumers Consumers pull messages from brokers

A Kafka Topic “Very sleepy” 534 “Car nicked!” 755 “The ref’s blind!” 234 “Nicked a car!” 534 Topic: StatusUpdateEvent Key: User ID of user who updated the status Value: Timestamp, new status, geolocation , etc.

Kafka topics are partitioned Message contents Key Message contents Key Message contents Key Message contents Key Message contents Key Message contents Key Message contents Key Message contents Key Message contents Key Message contents Key Partition 0 Partition 1 Partition 2 For our purposes, hash partitioned on the key!

A Samza job Input topics StatusUpdateEvent NewConnectionEvent LikeUpdateEvent Some code MyStreamTask implements StreamTask { …………. } Output topics NewsUpdatePost UpdatesPerHourMetric

Execution engine: YARN

What we use YARN for Distributing our tasks across multiple machines Letting us know when one has died Distributing a replacement Isolating our tasks from each other

Machine 1 Machine 1 YARN: Execution and reliability MyStreamTask: process () Samza TaskRunner : Partition 0 MyStreamTask: process () Samza TaskRunner : Partition 1 Node Manager 2 Node Manager 1 Samza App Master Kafka Broker Kafka Broker

Co-partitioning of topics MyStreamTask: process () Samza TaskRunner : Partition 0 StatusUpdateEvent , Partition 0 NewConnectionEvent , Partition 0 NewsUpdatePost An instance of StreamTask is responsible for a specific partition

API: process()

public interface StreamTask { void process ( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator ) } getKey (), getMsg () sendMsg (topic, key , value) commit(), shutdown()

Awesome feature: State Generic data store interface Key-value out-of-box More soon? Bloom filter, lucene , etc. Restored by Samza upon task crash MyStreamTask: process () Samza TaskRunner : Partition 0 Store state

(Pseudo)code snippet: Newsfeed Consume StatusUpdateEvent Send those updates to all your conmections via the NewsUpdatePost topic Consume NewConnectionEvent Maintain state of connections to know who to send to

public class NewsFeed implements StreamTask { void process( envelope , collector , coordinator ) { msg = env.getMsg () userId = msg.get ( “ userID ” ); if( msg .get ( “type” )==STATUS_UPDATE) { foreach ( conn : kvStore .get ( userId ) { collector .send ( “ NewsUpdatePost ” , new Msg ( conn , msg .get ( “ newStatus ”) ) } } else { newConn = msg .get ( “ newConnection ” ) connections = kvStore .get ( userId ) kvStore .put ( userID , connections ++ newConn ) }

Current status

Hello, Samza ! Cool, eh? bit.ly /hello- samza Consume Wikipedia edits live Up and running in 3 minutes Generate stats on those edits

samza.incubator.apache.org bit.ly / samza_newbie_issues

Cheers! Quick start: bit.ly /hello- samza Project homepage: samza.incubator.apache.org Newbie issues: bit.ly / samza_newbie_issues Detailed Samza and YARN talk: bit.ly /samza_and_yarn Twitter: @ samzastream