What's new in 1.9.0 blink planner - Kurt Young, Alibaba

FlinkForward 578 views 28 slides Oct 23, 2019
Slide 1
Slide 1 of 28
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28

About This Presentation

Flink 1.9.0 added the ability to support multiple SQL planners under the same API. With this help. we successfully merged a lot features which comes from Alibaba's internal flink version, called blink. In this talk, I will give a introduction about the architecture of the blink planner, and also...


Slide Content

Kurt Young Staff Engineer at Alibaba Apache Flink PMC What’s new in blink planner

Background Introduction of Blink Planner Features of Blink Planner Future Plans Agenda

This is joint efforts with entire Apache Flink community!

Flink Architecture

A Step Closer Different APIs for streaming and batch Different translation path Different codes for streaming and batch What we want

How Does It Look Like Flink Task Runtime Planner Table API & SQL Stream Transformation Stream Operator stream & batch

Backward Compatibility Flink Task Runtime Blink Planner Table API & SQL Stream Transformation Stream Operator Flink Planner DataSet Driver stream & batch batch stream

Major Blink Planner Features Common Streaming Batch New Type System Binary Format Aggregation Skew Handling Bundle Processing Dimension Join Top N Streaming De-duplication Hash-based Algorithms Sort-based Algorithms Full TPC-H Support

Common Improvements New Type System Binary Format Aggregation Skew Handling

[FLIP-37] Rework of the Table API Type System Blink planner uses new type system instead of TypeInformation Some new features, but still have lots to do Support Decimal(p, s) Support nullability Support TIMESTAMP WITH LOCAL ZONE New Type System

Row Object[] Integer(2019) String(“Flink”) String(“Forward”) Row Space inefficiency (object header) Boxing and unboxing Serialization and deserialization cost, especially when we want to access fields randomly Row(2019, “Flink”, “Forward”)

Deeply integrated with memory segments No need to deserialize / Compact layout / Random accessible Also have BinaryString , BinaryArray , BinaryMap Binary Formats (BinaryRow) 2019 pointer pointer 5 Flink 7 Forward Memory Segment Fixed-length part Variable-length part Null info Header (Row Type)

Aggregation Skew Handling SELECT SUM( num ) FROM T GROUP BY color table.optimizer.agg -phase-strategy = TWO_PHASE

Distinct Aggregation Skew Handling Local combine doesn’t work for distinct aggregation SELECT COUNT(DISTINCT id) FROM T GROUP BY color Optimize as query rewriting: SELECT color, SUM( cnt )   FROM (     SELECT color, COUNT(DISTINCT id) as cnt     FROM T     GROUP BY color, MOD(HASH_CODE(id), 1024) ) GROUP BY color table.optimizer.distinct-agg.split.enabled = true

Streaming Improvements Bundle Processing Dimension Join Top N Streaming De-duplication

Each record would cost: One state reading and writing One deserialization and serialization One output Bundle Processing Normal aggregation: SELECT SUM( num ) FROM T GROUP BY color

Use heap memory to hold bundle In-memory aggregation before accessing states and serde operations Also ease the downstream loads Bundle Processing Bundled aggregation: table.exec.mini-batch.enabled = true table.exec.mini - batch.allow -latency = “5000 ms ” table.exec.mini-batch.size = 1000 SELECT SUM( num ) FROM T GROUP BY color

Dimension and fact table are popular concepts in data warehouse as well as streaming processing Frequently asked scenarios Reading facts from message queue while dimension data stored in DB, key-value store Enrich the facts with latest dimension data The dimension table itself might also changing Different with regular streaming join Changes of dimension table doesn’t trigger the join Dimension Join

Model dimension table as time-varying relations[1] (TVR), a relation that changes over time Temporal table introduces a new FOR SYSTEM_TIME keyword to access any point in time of the table No need to store whole dimension table in state if it’s a LookupableTableSource Processing Time Dimension Join SELECT o.*, p.* FROM Orders AS o JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.productId = p.productId [1] One SQL to Rule Them All

It’s impractical to do a global streaming sort But it becomes possible if user only cares about the top n elements E.g. Calculate the top 3 sellers for each category Top N SELECT * FROM (   SELECT  *, // you can get like sellerId or other information from this      ROW_NUMBER() OVER ( PARTITION BY category  ORDER BY sales DESC ) AS rowNum   FROM shop_sales )  WHERE rowNum <= 3

Top N OverAggregate Calc … … Rank … … Original Plan Optimized Plan Some other optimization factors: Whether the rank operator has to deal with retraction Whether partition key is part of the primary key Whether the ordered field is monotonic

Primary key needed, and there are basically 2 scenarios: Upstream emits repeated data due to recovery, thus only the first row is meaningful Upstream keeps updating the outputs with key, thus only the latest row meaningful Similar with Top 1 Streaming De-duplication SELECT parimary_key , a, b, c FROM (   SELECT      *,      ROW_NUMBER() OVER (        PARTITION BY parimary_key          ORDER BY proctime ASC ) AS rowNum   FROM T)  WHERE rowNum == 1 SELECT parimary_key , a, b, c FROM (   SELECT      *,      ROW_NUMBER() OVER (        PARTITION BY parimary_key          ORDER BY proctime DESC ) AS rowNum   FROM T)  WHERE rowNum == 1 Keep first row Keep last row

Batch Improvements Hash-based Algorithms Sort-based Algorithms Full TPC-H Support

Finiteness makes sorting practical and efficient Push based multi-threading sorter Mainly borrowed from Flink’s original UnilateralSortMerger Change from pull based to push based Adapted to binary formats / Hotspot code generated Sort aggregation Sort merge join Support inner/left/right/full/semi/anti joins Sort-bases Algorithms

Hash aggregate Hash map based on binary formats Auto detect hash set mode(select distinct) Fallback to sort aggregation in zero-copy way Hash join Mainly borrowed from Flink’s original MutableHashTable Change from pull based to push based Support inner/left/right/full/semi/anti joins Hash-based Algorithms

More data types Support all types of joins Sub query decorrelation Over window enhancements Full TPC-H support More Functionality Coverage

Flink took a big step towards truly unified architecture Blink planner is a state-of-the-art query processor for both batch & streaming Future (Flink 1.10+) Finalize new type system Finalize blink merge Full TPC-DS support Hopefully some feedbacks, to help us improve Summary & Futures

Thank You! Questions?