End-to-end pipeline agility - Berlin Buzzwords 2024
lallea
183 views
55 slides
Jun 11, 2024
Slide 1 of 55
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
About This Presentation
We describe how we achieve high change agility in data engineering by eliminating the fear of breaking downstream data pipelines through end-to-end pipeline testing, and by using schema metaprogramming to safely eliminate boilerplate involved in changes that affect whole pipelines.
A quick poll on ...
We describe how we achieve high change agility in data engineering by eliminating the fear of breaking downstream data pipelines through end-to-end pipeline testing, and by using schema metaprogramming to safely eliminate boilerplate involved in changes that affect whole pipelines.
A quick poll on agility in changing pipelines from end to end indicated a huge span in capabilities. For the question "How long time does it take for all downstream pipelines to be adapted to an upstream change," the median response was 6 months, but some respondents could do it in less than a day. When quantitative data engineering differences between the best and worst are measured, the span is often 100x-1000x, sometimes even more.
A long time ago, we suffered at Spotify from fear of changing pipelines due to not knowing what the impact might be downstream. We made plans for a technical solution to test pipelines end-to-end to mitigate that fear, but the effort failed for cultural reasons. We eventually solved this challenge, but in a different context. In this presentation we will describe how we test full pipelines effectively by manipulating workflow orchestration, which enables us to make changes in pipelines without fear of breaking downstream.
Making schema changes that affect many jobs also involves a lot of toil and boilerplate. Using schema-on-read mitigates some of it, but has drawbacks since it makes it more difficult to detect errors early. We will describe how we have rejected this tradeoff by applying schema metaprogramming, eliminating boilerplate but keeping the protection of static typing, thereby further improving agility to quickly modify data pipelines without fear.
Size: 5.4 MB
Language: en
Added: Jun 11, 2024
Slides: 55 pages
Slide Content
www.scling.com
End-to-end pipeline agility
Berlin Buzzwords, 2024-06-10
Lars Albertsson
Scling - data factory as a service
1
www.scling.com
Myth:
●We are all doing quite ok
●2-10x leader-to-rear span
The great capability divide
2
capability in X
# orgs
www.scling.com
Myth:
●We are all doing quite ok
●2-10x leader-to-rear span
The great capability divide
3
capability in X
# orgs
capability in X
# orgs
Reality:
●Few leaders in each area
●100-10000x leader-to-rear span
www.scling.com
Capability KPIs
DORA research / State of DevOps report:
●Deployment frequency
●Lead time for changes
●Change failure rate
●Time to restore service
Small elite
~1000x span
4
Observed differences in data organisations:
●Lead time from idea to production
●Time to mend / change pipeline
●Number of pipelines / developer
●Number of datasets / day / developer
Small elite
100 - 10000x span (or more)
www.scling.com
Efficiency gap, data cost & value
●Data processing produces datasets
○Each dataset has business value
●Proxy value/cost metric: datasets / day
○S-M traditional: < 10
○Bank, telecom, media: 100-1000
5
2014: 6500 datasets / day
2016: 20000 datasets / day
2018: 100000+ datasets / day,
25% of staff use BigQuery
2021: 500B events collected / day
2016: 1600 000 000
datasets / day
Disruptive value of data, machine learning
Financial, reporting
Insights, data-fed features
effort
value
www.scling.com
Enabling innovation
6
"The actual work that went into
Discover Weekly was very little,
because we're reusing things we
already had."
https://youtu.be/A259Yo8hBRs
https://youtu.be/ZcmJxli8WS8
https://musically.com/2018/08/08/daniel-ek-would-have-killed-discover-weekly-before-launch/
"Discover Weekly wasn't a great
strategic plan and 100 engineers.
It was 3 engineers that decided to
build something."
"I would have killed it. All of a sudden,
they shipped it. It’s one of the most
loved product features that we have."
-Daniel Ek, CEO
www.scling.com
Swedish BigCorp 1:
Enterprise innovation
7
Swedish BigCorp 2:
Only one committee? Hold my beer.
Before we could build this internal data
tool, we had to submit an application
and get it approved by a committee at
the headquarters.
Swedish municipiality:
We started the AI project with a design
phase for a few months, where we did
not write any code!
www.scling.com
Data factory track record
8
Time to first
flow
Staff size 1st flow effort,
weeks
1st flow cost SEKTime to
innovation
# Datasets /
day after 1y
# Flows
after 1y
Spotify (new gen)weeks ~30-50 60? 2M - 10000s 100s
Media 1+ years 10-30 1500? 100M (0.5-1B) 1+ year ~100 ~10
Finance 2 years 10-50 2000? 100M? Years < 100 < 10
Media 3 weeks 4.5 - 8 15 750K 3 months ~2000 30
Retail 7 weeks 1-3 7 500K 6 months ~3500 70
Telecom 12 weeks 2-5 30 1500K 6 months ~500 50
Consumer
products
20+ weeks 1.5 30+ 1200+K 6+ months ~200 20
Construction 8 weeks 0.5 4 150K 7 months 10-100 * 10
Manufacturing 8 weeks 0.5 4 200K 6 months ? ?
* External bottlenecks
www.scling.com 14
Potential test scopes
●Unit/component
●Single job
●Multiple jobs
●Pipeline, including service
●Full system, including client
Choose stable interfaces
Each scope has a cost
Job
Service
App
Storage
Storage
Job
Storage
Job
www.scling.com 15
Recommended test scopes
●Single job
●Multiple jobs
●Pipeline, including service
Job
Service
App
Storage
Storage
Job
Storage
Job
www.scling.com
Testing single batch job
16
Job
Standard Scalatest/JUnit harness
file://test_input/ file://test_output/
1. Generate input 2. Run in local mode 3. Verify output
f() p()
Runs well in
CI / from IDE
www.scling.com
Cognitive waste
●Why do we have 25 time formats?
○ISO 8601, UTC assumed
○ISO 8601 + timezone
○Millis since epoch, UTC
○Nanos since epoch, UTC
○Millis since epoch, user local time
○…
○Float of seconds since epoch, as string.
WTF?!?
●my-kafka-topic-name, your_topic_name
17
"I don't know what will break downstream"
paralyses many data organisations.
www.scling.com
●Both can be extended with ingress (Kafka), egress DBs
Testing batch pipelines - two options
18
Standard Scalatest harness
file://test_input/ file://test_output/
1. Generate input 2. Run custom multi-job
Test job with sequence of jobs
3. Verify output
f() p()
A:
Customised workflow manager setup
+ Runs in CI
+ Runs in IDE
+ Quick setup
- Multi-job
maintenance
p()
+ Tests workflow logic
+ More authentic
- Workflow mgr setup
for testability
- Difficult to debug
- Dataset handling
with Python
www.scling.com
What about a layer of indirection?
20
file://test_data/
Testing?
Prod
Data access layer
-In conflict with autonomous culture
-Adds complexity to experimental cycle
-In conflict with data component ecosystems
+Explicit
+Debuggable
www.scling.com
Aspect-oriented testing
Aka hideous monkey patching.
Packaged in prod image.
Triggered by environment variable.
Rules for DAG transformation
21
www.scling.com
Monkey patching extracts a price
Better than indirection? Depends on cultural cost of coordination.
Our plans ahead: Keep warp zone, replace monkey patching with layer of indirection
22
-Implicit
-Difficult to debug
framework
-Disharmony with
higher workflow
abstractions
+Minimal code adaptation
+Can test pipelines owned by
other teams
+Seams in third-party
components
Great value from full pipeline testing!
www.scling.com
Performance is essential. No, not that performance.
●Bitten by bug in org.apache.spark.sql.KeyValueGroupedDataset.flatMapGroups() .
○Method is covered by one test case. WTF?
●Spark startup is slow. 10-30 seconds.
●What about plain Scala collections + minimal data DSL wrapper?
○Up to 10x faster on join benchmark for < 10 GB data.
○10x faster in production on small datasets.
○10x faster tests.
○90% saved CPU resources.
○Easier to program + debug.
●Performance dimensions that actually matter for agility & value creation.
23
www.scling.com
What conclusion from this graph?
COVID-19 fatalities / day in Sweden
24
Fatalities collected during 2 day
Fatalities collected during 4 days
Fatalities collected during 10 days
www.scling.com
Normalise data collection to compare
25
Graph by Adam Altmejd, @adamaltmejd
www.scling.com
Build current state from immutable
events + dumps
Address cumulative state
at arbitrary time
Data flow / ops paradigms
26
Immutable
Functional
Democratised
Mutable
Object-oriented
Exclusive
Microservices
Shared DBs
Data
warehousing
Modern data
warehousing
Data lake
Frozen lake
Mutate current
state
Stream
processing
Lakehouse
www.scling.com
Incompleteness recovery
27
www.scling.com
Fast data or complete data
28
Delay: 0
Delay: 4
Delay: 12
www.scling.com
Life of an error, batch pipelines
29
●My processing job, bad code!
1. Revert serving datasets to old
2. Fix bug
3. Remove faulty datasets
4. Deploy
5. Backfill is automatic (Luigi)
Done!
●Low cost of error
○Reactive QA
○Production environment sufficient
www.scling.com
Life of an error, frozen lake
30
●My processing job, bad code!
1. Revert serving datasets to old
2. Fix bug
3. Bump pipeline version
4. Deploy
5. Backfill is automatic (Luigi)
Done!
●Low cost of error
○Reactive QA
○Production environment sufficient
www.scling.com
Life of a change, batch pipelines
31
●Forgiving environment
○Machine errors
○Human errors
●Friendly to experiments
○"Dark pipelines" run in parallel
●Operationally efficient
○Separate dev, test, staging environments
not necessary
○Self-healing
∆?
www.scling.com
Dynamic version rollout
CredibilityScore
FraudCandidate
Order
32
www.scling.com
Dynamic DAGs
CredibilityScore
FraudCandidate
Order
33
www.scling.com
Schema on write
34
●Schema defined by writer
●Destination (table / dataset / stream topic) has defined schema
○Technical definition with metadata (e.g. RDMBS, Kafka + registry)
○By convention
●Writes not in compliance are not accepted
○Technically aborted (e.g. RDBMS)
○In violation of intent (e.g. HDFS datasets)
●Can be technically enforced by producer driver
○Through ORM / code generation
○Schema registry lookup
Strict checking philosophy
www.scling.com
Schema on read
35
●Anything (technically) accepted when writing
●Schema defined by reader, at consumption
○Reader may impose requirements on type & value
●In dynamic languages, fields propagate implicitly
○E-shopping example:
i.Join order + customer.
ii.Add device_type to order schema
iii.device_type becomes available in downstream datasets
●Violations of constraints are detected at read
○Perhaps long after production?
○By team not owning producer?
Loose checking philosophy
www.scling.com
Schema on read or write?
36
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here
www.scling.com
●Expressive
●Custom types
●IDE support
●Avro for data lake storage
Schema definition choice
37
●RDBMS: Table metadata
●Avro: JSON/DSL definition
○Definition is bundled with avro data files
●Parquet
●pyschema / dataclass
●Scala case classes
●JSON-schema
●JSON: Each record
○One record insufficient to deduce schema
case class User(id: String, name: String, age: Int,
phone: Option[String] = None)
val users = Seq(User("1", "Alice", 32),
User("2", "Bob", 43, Some("08-123456")))
www.scling.com
Schema offspring
Test record
difference render
type classes
38
case classes
test equality
type classes
Avro
definitions
Java Avro
codec classes
Java <-> Scala
converters
Avro type
annotations
MySQL
schemas
CSV codecs
Privacy by
design
machinery
Python
Logical types
●Offline code generation
40
// Example from scio 0.7 -> 0.8 upgrade rules
final class FixTensorflow extends SemanticRule("FixTensorflow") {
override def fix(implicit doc: SemanticDocument): Patch =
doc.tree.collect {
case t @ Term.Select(s, Term.Name( "saveAsTfExampleFile" )) =>
Patch.replaceTree(t, q"$s.saveAsTfRecordFile".syntax)
}.asPatch
}
www.scling.com
Test equality
Test record
difference render
type classes
41
case classes
test equality
type classes
www.scling.com
case class User(
age: Int,
@AvroProp("sqlType", "varchar(1012)")
phone: Option[String] = None)
Python + RDBMS
42
case classes
Avro
definitions
Avro type
annotations
MySQL
schemas
Python
{
"name": "User",
{ "name": "age", "type": "int" }
{ "name": "phone",
"type": [ "null", "string" ],
"sqlType": "varchar(1012)",
}
}
class UserEgressJob(CopyToTable):
columns = [
("age", "int"),
("name", "varchar(1012)"),
]
...
www.scling.com
Logical types
43
case classes
Logical types
case t"""Instant""" =>
JObject(List(JField("type", JString("long")), JField("logicalType",
JString("timestamp-micros"))))
case t"""LocalDate""" => JObject(List(JField("type", JString("int")),
JField("logicalType", JString("date"))))
case t"""YearMonth""" => JObject(List(JField("type", JString("int"))))
case t"""JObject""" => JString("string")
●Avro logical types
○E.g. date → int, timestamp → long
○Default is timestamp-millis
■Great for year > 294441 (!)
www.scling.com
Schema on read or write?
44
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here
www.scling.com
Hydration boilerplate
45
www.scling.com
Chimney - case class transformer
46
●Commonality in schema classes
○Copy + a few more fields
○Drop fields
●Statically typed
○Forgot a field - error
○Wrong type - error
www.scling.com
Chimney in real code
47
www.scling.com
Data
lake
Private
pond
Cold
store
Ingest prepared for deletion
48
Mutation
www.scling.com
●PII fields encrypted
●Per-user decryption key table
●Clear single user key => oblivion
-Extra join + decrypt
-Decryption (user) id needed
+Multi-field oblivion
+Single dataset leak → no PII leak
+Handles transformed PII fields
Lost key pattern
49
www.scling.com
Shieldformation
50
@PrivacyShielded
case class Sale(
@PersonalId customerClubId: Option[String],
@PersonalData storeId: Option[String],
item: Option[String],
timestamp: String
)
case class SaleShielded(
shieldId: Option[String],
customerClubIdEncrypted: Option[String],
storeIdEncrypted: Option[String],
item: Option[String],
timestamp: String
)
case class SaleAnonymous(
item: Option[String],
timestamp: String
)
object SaleAnonymize extends SparkJob {
...
}
ShieldForm
object SaleExpose extends SparkJob {
...
}
object SaleShield extends SparkJob {
...
}
case class Shield(
shieldId: String,
personId: Option[String],
keyStr: Option[String],
encounterDate: String
)
www.scling.com
Shield
Shieldformation & lost key
51
SaleShield
Sale
Sale
Shielded
Shield
Deletion
requests
Customer
History
Exposed egress
SaleExpose
Limited retention
SaleAnonymize
Sale
Anonymous
Sale
Stats
www.scling.com
Schema on write!
52
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here
www.scling.com 53
Agility is essential
●Innovation, cost of operations
www.scling.com 54
How long from idea to
pipeline in production?
6-12 weeks
A manager heard that
some companies do it
in hours. What would
you do to get there?
I push back on
unrealistic
management
requirements.
Agility is essential
●Innovation, cost of operations
www.scling.com 55
How long from idea to
pipeline in production?
6-12 weeks
A manager heard that
some companies do it
in hours. What would
you do to get there?
I push back on
unrealistic
management
requirements.
Agility is essential
●Innovation, cost of operations