What's new in 1.9.0 blink planner - Kurt Young, Alibaba
FlinkForward
578 views
28 slides
Oct 23, 2019
Slide 1 of 28
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
About This Presentation
Flink 1.9.0 added the ability to support multiple SQL planners under the same API. With this help. we successfully merged a lot features which comes from Alibaba's internal flink version, called blink. In this talk, I will give a introduction about the architecture of the blink planner, and also...
Flink 1.9.0 added the ability to support multiple SQL planners under the same API. With this help. we successfully merged a lot features which comes from Alibaba's internal flink version, called blink. In this talk, I will give a introduction about the architecture of the blink planner, and also share with you the functionalities and performance enhancements we added.
Size: 2.04 MB
Language: en
Added: Oct 23, 2019
Slides: 28 pages
Slide Content
Kurt Young Staff Engineer at Alibaba Apache Flink PMC What’s new in blink planner
Background Introduction of Blink Planner Features of Blink Planner Future Plans Agenda
This is joint efforts with entire Apache Flink community!
Flink Architecture
A Step Closer Different APIs for streaming and batch Different translation path Different codes for streaming and batch What we want
How Does It Look Like Flink Task Runtime Planner Table API & SQL Stream Transformation Stream Operator stream & batch
Major Blink Planner Features Common Streaming Batch New Type System Binary Format Aggregation Skew Handling Bundle Processing Dimension Join Top N Streaming De-duplication Hash-based Algorithms Sort-based Algorithms Full TPC-H Support
Common Improvements New Type System Binary Format Aggregation Skew Handling
[FLIP-37] Rework of the Table API Type System Blink planner uses new type system instead of TypeInformation Some new features, but still have lots to do Support Decimal(p, s) Support nullability Support TIMESTAMP WITH LOCAL ZONE New Type System
Row Object[] Integer(2019) String(“Flink”) String(“Forward”) Row Space inefficiency (object header) Boxing and unboxing Serialization and deserialization cost, especially when we want to access fields randomly Row(2019, “Flink”, “Forward”)
Deeply integrated with memory segments No need to deserialize / Compact layout / Random accessible Also have BinaryString , BinaryArray , BinaryMap Binary Formats (BinaryRow) 2019 pointer pointer 5 Flink 7 Forward Memory Segment Fixed-length part Variable-length part Null info Header (Row Type)
Aggregation Skew Handling SELECT SUM( num ) FROM T GROUP BY color table.optimizer.agg -phase-strategy = TWO_PHASE
Distinct Aggregation Skew Handling Local combine doesn’t work for distinct aggregation SELECT COUNT(DISTINCT id) FROM T GROUP BY color Optimize as query rewriting: SELECT color, SUM( cnt ) FROM ( SELECT color, COUNT(DISTINCT id) as cnt FROM T GROUP BY color, MOD(HASH_CODE(id), 1024) ) GROUP BY color table.optimizer.distinct-agg.split.enabled = true
Streaming Improvements Bundle Processing Dimension Join Top N Streaming De-duplication
Each record would cost: One state reading and writing One deserialization and serialization One output Bundle Processing Normal aggregation: SELECT SUM( num ) FROM T GROUP BY color
Use heap memory to hold bundle In-memory aggregation before accessing states and serde operations Also ease the downstream loads Bundle Processing Bundled aggregation: table.exec.mini-batch.enabled = true table.exec.mini - batch.allow -latency = “5000 ms ” table.exec.mini-batch.size = 1000 SELECT SUM( num ) FROM T GROUP BY color
Dimension and fact table are popular concepts in data warehouse as well as streaming processing Frequently asked scenarios Reading facts from message queue while dimension data stored in DB, key-value store Enrich the facts with latest dimension data The dimension table itself might also changing Different with regular streaming join Changes of dimension table doesn’t trigger the join Dimension Join
Model dimension table as time-varying relations[1] (TVR), a relation that changes over time Temporal table introduces a new FOR SYSTEM_TIME keyword to access any point in time of the table No need to store whole dimension table in state if it’s a LookupableTableSource Processing Time Dimension Join SELECT o.*, p.* FROM Orders AS o JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.productId = p.productId [1] One SQL to Rule Them All
It’s impractical to do a global streaming sort But it becomes possible if user only cares about the top n elements E.g. Calculate the top 3 sellers for each category Top N SELECT * FROM ( SELECT *, // you can get like sellerId or other information from this ROW_NUMBER() OVER ( PARTITION BY category ORDER BY sales DESC ) AS rowNum FROM shop_sales ) WHERE rowNum <= 3
Top N OverAggregate Calc … … Rank … … Original Plan Optimized Plan Some other optimization factors: Whether the rank operator has to deal with retraction Whether partition key is part of the primary key Whether the ordered field is monotonic
Primary key needed, and there are basically 2 scenarios: Upstream emits repeated data due to recovery, thus only the first row is meaningful Upstream keeps updating the outputs with key, thus only the latest row meaningful Similar with Top 1 Streaming De-duplication SELECT parimary_key , a, b, c FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY parimary_key ORDER BY proctime ASC ) AS rowNum FROM T) WHERE rowNum == 1 SELECT parimary_key , a, b, c FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY parimary_key ORDER BY proctime DESC ) AS rowNum FROM T) WHERE rowNum == 1 Keep first row Keep last row
Batch Improvements Hash-based Algorithms Sort-based Algorithms Full TPC-H Support
Finiteness makes sorting practical and efficient Push based multi-threading sorter Mainly borrowed from Flink’s original UnilateralSortMerger Change from pull based to push based Adapted to binary formats / Hotspot code generated Sort aggregation Sort merge join Support inner/left/right/full/semi/anti joins Sort-bases Algorithms
Hash aggregate Hash map based on binary formats Auto detect hash set mode(select distinct) Fallback to sort aggregation in zero-copy way Hash join Mainly borrowed from Flink’s original MutableHashTable Change from pull based to push based Support inner/left/right/full/semi/anti joins Hash-based Algorithms
More data types Support all types of joins Sub query decorrelation Over window enhancements Full TPC-H support More Functionality Coverage
Flink took a big step towards truly unified architecture Blink planner is a state-of-the-art query processor for both batch & streaming Future (Flink 1.10+) Finalize new type system Finalize blink merge Full TPC-DS support Hopefully some feedbacks, to help us improve Summary & Futures