FailureEnrichers - Flink Meetup Bay Area.pptx

PanagiotisGarefalaki 19 views 36 slides Feb 25, 2025
Slide 1
Slide 1 of 36
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36

About This Presentation

In this talk, we’ll explore the types of production failures and the unique impact that each has on
your pipeline’s SLAs. We’ll then see how FLIP-304 and the new Pluggable Failure Handling
Interface enables users to implement custom failure handlers using Flink’s generic plugin
framework....


Slide Content

Flink’s Pluggable Failure Handling: deal with streaming errors the smart way!

About me ‹#› Panagiotis (Panos) Garefalakis ( @pgaref ) Software Engineer - SPA - Confluent Flink runtime team - Apache Flink contributor Apache Hive, ORC Comiter & PMC Member respectively PhD in Distributed Systems, Imperial College London 2020

Overview ‹#› Intro Flink framework and how users leverage it to implement streaming applications Background Main components of Flink’s distributed execution runtime and failure handling Implementation Introduce Pluggable Failure Enrichers component as part of the JobMaster Demo Custom Failure Enrichers in just 4 steps and a Confluent Cloud demo! Lessons Learned Mistakes to avoid when running your own Failure Enrichers Summary Key points and useful links 1. 2. 3. 4. 5. 6.

Kafka Databases Key/Value Stores Files Apps Sources Real-time Stream Processing Sinks Stream Processing with Flink

Real-time Stream Processing Stream Processing with Flink Kafka Databases Key/Value Stores Files Apps Sources Sinks Operator Edge Job Graph Job

Writing Streaming A pps INSERT INTO results SELECT color, COUNT(*) FROM events WHERE color <> orange GROUP BY color; results COUNT WHERE color <> orange events GROUP BY color FILTER

Writing Streaming Apps INSERT INTO results SELECT color, COUNT(*) FROM events WHERE color <> orange GROUP BY color; GROUP BY color events results COUNT WHERE color <> orange FILTER

Writing Streaming Apps INSERT INTO results SELECT color, COUNT(*) FROM events WHERE color <> orange GROUP BY color; GROUP BY color events results COUNT WHERE color <> orange FILTER

Writing Streaming Apps INSERT INTO results SELECT color, COUNT(*) FROM events WHERE color <> orange GROUP BY color ; GROUP BY color events results COUNT WHERE color <> orange FILTER

Writing Streaming Apps INSERT INTO results SELECT color, COUNT(*) FROM events WHERE color <> orange GROUP BY color; GROUP BY color events results COUNT WHERE color <> orange FILTER 1 4 … 1 3 …

Running Streaming Apps Task Manager Task Slot State Backend Task Slot Client Task Manager Task Slot State Backend Task Slot Job Manager Dispatcher Resource Manager Job Master Scheduler REST Endpoint Slot Allocator Checkpoint Coordinator

Running Streaming Apps Task Manager Task Slot State Backend Task Slot Client Task Manager Task Slot State Backend Task Slot Submit Job Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler REST Endpoint Checkpoint Coordinator Slot Allocator

Running Streaming Apps Task Manager Task Slot State Backend Task Slot Client Task Manager Task Slot State Backend Task Slot Assign Slot Submit Job Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator REST Endpoint Slot Allocator

Running Streaming Apps Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit Task Submit Job Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator REST Endpoint Slot Allocator

Running Streaming Apps Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit /Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator REST Endpoint Slot Allocator

Local Failures Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Exception! REST Endpoint Permissions Errors Serialization Errors ClassClast Erros etc. Slot Allocator

Global Failures Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator REST Endpoint Checkpoint Errors Op Coordinator Errors etc. Slot Allocator

Failure Handling Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Failure Handler REST Endpoint Slot Allocator

Failure Handling Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Failure Handler REST Endpoint Restart Task Permissions Exception? Expose OOM Errors to Users? Slot Allocator

Extending Failure Handling ‹#› Enrich f ailures with extra metadata (e.g., type of failure) Expose failures to downstream consumers (e.g, notification systems) Support custom logic (pluggable interface)

Pluggable Failure Enrichers Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Failure Handler Failure Enrichers REST Endpoint FLIP-304 Flink 1.18 Submit Job Slot Allocator

Pluggable Failure Enrichers Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Failure Handler Failure Enrichers REST Endpoint FLIP-304 Flink 1.18 Type Classifier Slot Allocator

Pluggable Failure Enrichers Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Failure Handler Failure Enrichers REST Endpoint FLIP-304 Flink 1.18 Type Classifier ClassCast Exception Handle Task Failure Slot Allocator

Pluggable Failure Enrichers Task Manager Task Slot State Backend Task Slot Data Shuffle Client Task Manager Task Slot State Backend Task Slot Submit/Stop/Cancel Tasks, Checkpoint Submit Job Results Job Manager Dispatcher Resource Manager Job Master Execution Graph Scheduler Checkpoint Coordinator Failure Handler Failure Enrichers REST Endpoint FLIP-304 Flink 1.18 Type Classifier Handle Task Failure exceptio nName: ClassCastException "failureLabels": { " type ": " USER " } ClassCast Exception Slot Allocator

Failure Enricher Implementation FLIP-304 Flink 1.18 public class TypeClassifier implements FailureEnricher { private static final String typeKey = "TYPE"; @Override public Set<String> getOutputKeys () { return Stream.of( typeKey ).collect(Collectors.toSet()); } @Override public CompletableFuture<Map<String, String>> processFailure (Throwable cause , final Context ctx) { final Map<String, String> labels = new HashMap(); if (ExceptionUtils.findThrowable( cause , ClassCastException .class).isPresent()) { labels .put( typeKey , " USER "); } else { labels .put( typeKey , " SYSTEM "); } return CompletableFuture .completedFuture( labels ); } } Step 1: I mplement your enricher

Failure Enricher Implementation FLIP-304 Flink 1.18 public class TypeClassifierFactory implements FailureEnricherFactory { @Override public FailureEnricher createFailureEnricher (Configuration conf) { return new TypeClassifier (); } } Step 2: Create an enricher factory Step 3: Package jar Step 4: Modify Flink configuration jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure. TypeClassifier META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory

Confluent Cloud

Flink UI ‹#›

Flink UI ‹#›

Lessons Learned Documentation https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/advanced/failure_enrichers FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers Apache Flink 1.18 https://www.confluent.io/blog/announcing-apache-flink-1-18 Failure Enrichers might throw exceptions too, make sure they are properly handled! There is no way to enforce no exceptions are thrown (pluggable component) and this could result to throwing away labels Bundle Failure Enrichers’ dependencies when you are using third party libraries ! PluginLoader only allows whitelisted classes of the parent / system classloader Logs and system-tests are your friends!

Summary Documentation https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/advanced/failure_enrichers FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers Apache Flink 1.18 https://www.confluent.io/blog/announcing-apache-flink-1-18 Flink service providers deal with a plethora of f ailures types coming from different sources and followed by a variety of corrective actions Pluggable Failure Enrichers allow for custom logic (classification, tagging, alerting etc.) custom metadata labels asynchronous execution simple implementation and packaging (independent jars)

Backup Slides ‹#›

Icons ‹#› Central Nervous System Early Production Streaming Stream Designer Data Everywhere Kafka Cluster Database Databases Data Lake DB Warehouse Data Center Cloud Cloud to Cloud Hybrid Cloud Cloud Dev Equal Cloud Cloud Management Server On-premise Serverless Replicator Operator Kafka KSQL Rocket ksqlDB KSQL Circle Connector Microservices Schema Registry Streams Event Streams Number of Data Sources IOT Cluster Partition Rebalancing Stream Processing Cookbook Data Governance Apps Service Apps Custom Apps Logs Data Stacks Stack Overflow Storage Platform Data In Data Out Data Add Branch Processing Real-time Aggregate Data Frameworks CLI Dev Scale Combine Join Architect # of Producers F or the complete, most updated collection of Icons please go to: https://cnfl.io/Icons

Icons ‹#› Webinar Developer Onboard Offboard Filter Globe Infinity Settings Monitoring Anomaly Detection Analytics Real-time Analytics Real-time Processing Process Data Upload Download Computer Devices Computer / DB / Cloud Status Open Source Web Confirmed RSS MQTT Message Quotes Interview # of Topics Person People People Manager Career Enablement Roadmap Search Solution Send Features Company Policies Docs Invoice Blog Podcast Video Book Table Email Print Continuous Learning Lock Key Warning Hacker Bug GDPR CCPA Shield Shield Open Machine Learning Eye For the complete, most updated collection of Icons please go to: https://cnfl.io/Icons

Icons ‹#› Shirt Food Catalyst Box Sparkly New Manufacturing Venue Government Business Marketplace Ecommerce Sale Money Telecom Support Gaming Healthcare Computer Love Partner Hand Arm Benefit Thumbs Up Swipe Select Promote Awareness Target Car Truck Puzzle Lightening Star Question Check Workday Speed Time Coming Soon Time / Money ROI TCO Data in Terabytes Per Day # of Events Per Day Calendar Payday Docker Transfer Expand / Shrink Add Balance Rest Trophy Certificate Badge For the complete, most updated collection of Icons please go to: https://cnfl.io/Icons