End-to-end pipeline agility - Berlin Buzzwords 2024

lallea 183 views 55 slides Jun 11, 2024
Slide 1
Slide 1 of 55
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55

About This Presentation

We describe how we achieve high change agility in data engineering by eliminating the fear of breaking downstream data pipelines through end-to-end pipeline testing, and by using schema metaprogramming to safely eliminate boilerplate involved in changes that affect whole pipelines.

A quick poll on ...


Slide Content

www.scling.com
End-to-end pipeline agility

Berlin Buzzwords, 2024-06-10
Lars Albertsson
Scling - data factory as a service

1

www.scling.com
Myth:
●We are all doing quite ok
●2-10x leader-to-rear span
The great capability divide
2
capability in X
# orgs

www.scling.com
Myth:
●We are all doing quite ok
●2-10x leader-to-rear span
The great capability divide
3
capability in X
# orgs
capability in X
# orgs
Reality:
●Few leaders in each area
●100-10000x leader-to-rear span

www.scling.com
Capability KPIs
DORA research / State of DevOps report:

●Deployment frequency
●Lead time for changes
●Change failure rate
●Time to restore service

Small elite
~1000x span
4
Observed differences in data organisations:

●Lead time from idea to production
●Time to mend / change pipeline
●Number of pipelines / developer
●Number of datasets / day / developer

Small elite
100 - 10000x span (or more)

www.scling.com
Efficiency gap, data cost & value
●Data processing produces datasets
○Each dataset has business value

●Proxy value/cost metric: datasets / day
○S-M traditional: < 10
○Bank, telecom, media: 100-1000
5
2014: 6500 datasets / day
2016: 20000 datasets / day
2018: 100000+ datasets / day,
25% of staff use BigQuery
2021: 500B events collected / day
2016: 1600 000 000
datasets / day
Disruptive value of data, machine learning
Financial, reporting
Insights, data-fed features
effort
value

www.scling.com
Enabling innovation
6
"The actual work that went into
Discover Weekly was very little,
because we're reusing things we
already had."
https://youtu.be/A259Yo8hBRs
https://youtu.be/ZcmJxli8WS8
https://musically.com/2018/08/08/daniel-ek-would-have-killed-discover-weekly-before-launch/
"Discover Weekly wasn't a great
strategic plan and 100 engineers.
It was 3 engineers that decided to
build something."
"I would have killed it. All of a sudden,
they shipped it. It’s one of the most
loved product features that we have."
-Daniel Ek, CEO

www.scling.com
Swedish BigCorp 1:







Enterprise innovation
7
Swedish BigCorp 2:

Only one committee? Hold my beer.












Before we could build this internal data
tool, we had to submit an application
and get it approved by a committee at
the headquarters.
Swedish municipiality:







We started the AI project with a design
phase for a few months, where we did
not write any code!

www.scling.com
Data factory track record
8
Time to first
flow
Staff size 1st flow effort,
weeks
1st flow cost SEKTime to
innovation
# Datasets /
day after 1y
# Flows
after 1y
Spotify (new gen)weeks ~30-50 60? 2M - 10000s 100s
Media 1+ years 10-30 1500? 100M (0.5-1B) 1+ year ~100 ~10
Finance 2 years 10-50 2000? 100M? Years < 100 < 10
Media 3 weeks 4.5 - 8 15 750K 3 months ~2000 30
Retail 7 weeks 1-3 7 500K 6 months ~3500 70
Telecom 12 weeks 2-5 30 1500K 6 months ~500 50
Consumer
products
20+ weeks 1.5 30+ 1200+K 6+ months ~200 20
Construction 8 weeks 0.5 4 150K 7 months 10-100 * 10
Manufacturing 8 weeks 0.5 4 200K 6 months ? ?
* External bottlenecks

www.scling.com
Data agility
9
●Siloed: 6+ months


Cultural work


●Autonomous: 1 month


Technical work


●Coordinated: days

Data lake


Latency?

www.scling.com
●Oozie (~2007)
●Luigi (2010 / 2012)
○Asset-based
●Airflow, Pinball (2015)
○Task-based
●Dagster (asset-based), Prefetch, Argo, …
Workflow orchestration - key to DataOps success
10
●Data assets (Target)
●Jobs that build assets (Task)
●Data sensors (ExternalTask)
●Asset & job parameters
●Job dependencies (requires())
●(State)
●Data management as code
○No ClickOps

●Simple, debuggable tools

●Industrial, not craft. Work with process, not with data.

●Build higher abstraction layers

www.scling.com
On-prem pipeline deployment pipeline
11
source
repo
Luigi DSL, jars, config
my-pipe-7.tar.gz
Luigi
daemon
> pip install my-pipe-7.tar.gz
Worker
Worker
Worker
Worker
Worker
Worker
Worker
Worker
Redundant cron schedule,
higher frequency
All that a pipeline needs, installed atomically


10 * * * * luigi --module mymodule MyDaily
Standard deployment artifact Standard artifact store
/

www.scling.com
Cloud native deployment
12
source
repo
Luigi DSL, jars, config
my-pipe:7
Luigi
daemon
Worker
Worker
Worker
Worker
Worker
Worker
Worker
Worker
Redundant cron schedule,
higher frequency
kind: CronJob
spec:
schedule: "10 * * * *"
command: "luigi --module mymodule MyDaily"
Docker image Docker registry
S3 / GCS
Dataproc /
EMR

/

www.scling.com
Simpler cloud native deployment
13
source
repo
Luigi DSL, jars, config
my-pipe:7
Luigi
daemon
Worker
Worker
Worker
Worker
Worker
Worker
Worker
spark-submit
--master=local
Redundant cron schedule,
higher frequency
kind: CronJob
spec:
schedule: "10 * * * *"
command: "luigi --module mymodule MyDaily"
Docker image Docker registry
S3 / GCS
/

www.scling.com 14
Potential test scopes
●Unit/component
●Single job
●Multiple jobs
●Pipeline, including service
●Full system, including client

Choose stable interfaces

Each scope has a cost



Job
Service
App
Storage
Storage
Job
Storage
Job

www.scling.com 15
Recommended test scopes
●Single job
●Multiple jobs
●Pipeline, including service



Job
Service
App
Storage
Storage
Job
Storage
Job

www.scling.com
Testing single batch job
16
Job
Standard Scalatest/JUnit harness
file://test_input/ file://test_output/
1. Generate input 2. Run in local mode 3. Verify output
f() p()
Runs well in
CI / from IDE

www.scling.com
Cognitive waste
●Why do we have 25 time formats?
○ISO 8601, UTC assumed
○ISO 8601 + timezone
○Millis since epoch, UTC
○Nanos since epoch, UTC
○Millis since epoch, user local time
○…
○Float of seconds since epoch, as string.
WTF?!?

●my-kafka-topic-name, your_topic_name
17
"I don't know what will break downstream"

paralyses many data organisations.

www.scling.com














●Both can be extended with ingress (Kafka), egress DBs
Testing batch pipelines - two options
18
Standard Scalatest harness
file://test_input/ file://test_output/
1. Generate input 2. Run custom multi-job
Test job with sequence of jobs
3. Verify output
f() p()
A:
Customised workflow manager setup
+ Runs in CI
+ Runs in IDE
+ Quick setup
- Multi-job
maintenance
p()
+ Tests workflow logic
+ More authentic
- Workflow mgr setup
for testability
- Difficult to debug
- Dataset handling
with Python

f()
B:

www.scling.com
Warped pipeline testing
19
file://test_input/ file://test_output/
Workflow manager DAG "warp" rewrite
1. Generate input 3. Verify output
f() p()
Stream
Stream
Production

www.scling.com
What about a layer of indirection?
20
file://test_data/
Testing?
Prod
Data access layer
-In conflict with autonomous culture

-Adds complexity to experimental cycle

-In conflict with data component ecosystems




+Explicit

+Debuggable

www.scling.com
Aspect-oriented testing
Aka hideous monkey patching.
Packaged in prod image.
Triggered by environment variable.

Rules for DAG transformation
21

www.scling.com
Monkey patching extracts a price








Better than indirection? Depends on cultural cost of coordination.

Our plans ahead: Keep warp zone, replace monkey patching with layer of indirection

22
-Implicit

-Difficult to debug
framework

-Disharmony with
higher workflow
abstractions
+Minimal code adaptation

+Can test pipelines owned by
other teams

+Seams in third-party
components
Great value from full pipeline testing!

www.scling.com
Performance is essential. No, not that performance.
●Bitten by bug in org.apache.spark.sql.KeyValueGroupedDataset.flatMapGroups() .
○Method is covered by one test case. WTF?

●Spark startup is slow. 10-30 seconds.

●What about plain Scala collections + minimal data DSL wrapper?
○Up to 10x faster on join benchmark for < 10 GB data.
○10x faster in production on small datasets.
○10x faster tests.
○90% saved CPU resources.
○Easier to program + debug.

●Performance dimensions that actually matter for agility & value creation.
23

www.scling.com
What conclusion from this graph?
COVID-19 fatalities / day in Sweden
24
Fatalities collected during 2 day
Fatalities collected during 4 days
Fatalities collected during 10 days

www.scling.com
Normalise data collection to compare
25
Graph by Adam Altmejd, @adamaltmejd

www.scling.com
Build current state from immutable
events + dumps





Address cumulative state
at arbitrary time





Data flow / ops paradigms
26
Immutable
Functional
Democratised
Mutable
Object-oriented
Exclusive
Microservices
Shared DBs
Data
warehousing
Modern data
warehousing
Data lake
Frozen lake
Mutate current
state





Stream
processing
Lakehouse

www.scling.com
Incompleteness recovery
27

www.scling.com
Fast data or complete data
28
Delay: 0
Delay: 4
Delay: 12

www.scling.com
Life of an error, batch pipelines
29
●My processing job, bad code!

1. Revert serving datasets to old
2. Fix bug
3. Remove faulty datasets
4. Deploy
5. Backfill is automatic (Luigi)
Done!

●Low cost of error

○Reactive QA

○Production environment sufficient

www.scling.com
Life of an error, frozen lake
30
●My processing job, bad code!

1. Revert serving datasets to old
2. Fix bug
3. Bump pipeline version
4. Deploy
5. Backfill is automatic (Luigi)
Done!

●Low cost of error

○Reactive QA

○Production environment sufficient

www.scling.com
Life of a change, batch pipelines
31
●Forgiving environment
○Machine errors
○Human errors

●Friendly to experiments
○"Dark pipelines" run in parallel

●Operationally efficient
○Separate dev, test, staging environments
not necessary
○Self-healing

∆?

www.scling.com
Dynamic version rollout
CredibilityScore
FraudCandidate
Order
32

www.scling.com
Dynamic DAGs
CredibilityScore
FraudCandidate
Order
33

www.scling.com
Schema on write
34
●Schema defined by writer

●Destination (table / dataset / stream topic) has defined schema
○Technical definition with metadata (e.g. RDMBS, Kafka + registry)
○By convention

●Writes not in compliance are not accepted
○Technically aborted (e.g. RDBMS)
○In violation of intent (e.g. HDFS datasets)

●Can be technically enforced by producer driver
○Through ORM / code generation
○Schema registry lookup

Strict checking philosophy

www.scling.com
Schema on read
35
●Anything (technically) accepted when writing

●Schema defined by reader, at consumption
○Reader may impose requirements on type & value

●In dynamic languages, fields propagate implicitly
○E-shopping example:
i.Join order + customer.
ii.Add device_type to order schema
iii.device_type becomes available in downstream datasets

●Violations of constraints are detected at read
○Perhaps long after production?
○By team not owning producer?

Loose checking philosophy

www.scling.com
Schema on read or write?
36
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here

www.scling.com
●Expressive
●Custom types
●IDE support









●Avro for data lake storage
Schema definition choice
37
●RDBMS: Table metadata
●Avro: JSON/DSL definition
○Definition is bundled with avro data files
●Parquet
●pyschema / dataclass
●Scala case classes
●JSON-schema
●JSON: Each record
○One record insufficient to deduce schema


case class User(id: String, name: String, age: Int,
phone: Option[String] = None)

val users = Seq(User("1", "Alice", 32),
User("2", "Bob", 43, Some("08-123456")))

www.scling.com
Schema offspring
Test record
difference render
type classes
38
case classes
test equality
type classes
Avro
definitions
Java Avro
codec classes
Java <-> Scala
converters
Avro type
annotations
MySQL
schemas
CSV codecs
Privacy by
design
machinery
Python
Logical types

www.scling.com
Scalameta: schema → syntax tree
39
Defn.Class(
List(Mod.Annot(Init(Type.Name("PrivacyShielded"), , List())), case),
Type.Name("SaleTransaction"),
List(),
Ctor.Primary(
List(),
,
List(
List(
Term.Param(
List(Mod.Annot(Init(Type.Name("PersonalId"), , List()))),
Term.Name("customerClubId"),
Some(Type.Apply(Type.Name("Option"), List(Type.Name("String")))),
None
),
Term.Param(
List(Mod.Annot(Init(Type.Name("PersonalData"), , List()))),
Term.Name("storeId"),
Some(Type.Apply(Type.Name("Option"), List(Type.Name("String")))),
None
),
Term.Param(
List(),
Term.Name("item"),
Some(Type.Apply(Type.Name("Option"), List(Type.Name("String")))),
None
),
Term.Param(List(), Term.Name("timestamp"), Some(Type.Name("String")), None)
)
)
),
Template(List(), List(), Self(, None), List()))
@PrivacyShielded
case class SaleTransaction(
@PersonalId customerClubId: Option[String],
@PersonalData storeId: Option[String],
item: Option[String],
timestamp: String
)

www.scling.com
Scalameta use cases
●Scalafmt

●Scalafix
○Static analysis
○Code transformation

●Online code generation - macros

●Offline code generation
40
// Example from scio 0.7 -> 0.8 upgrade rules
final class FixTensorflow extends SemanticRule("FixTensorflow") {
override def fix(implicit doc: SemanticDocument): Patch =
doc.tree.collect {
case t @ Term.Select(s, Term.Name( "saveAsTfExampleFile" )) =>
Patch.replaceTree(t, q"$s.saveAsTfRecordFile".syntax)
}.asPatch
}

www.scling.com
Test equality
Test record
difference render
type classes
41
case classes
test equality
type classes

www.scling.com
case class User(
age: Int,
@AvroProp("sqlType", "varchar(1012)")
phone: Option[String] = None)
Python + RDBMS
42
case classes
Avro
definitions
Avro type
annotations
MySQL
schemas
Python
{
"name": "User",
{ "name": "age", "type": "int" }
{ "name": "phone",
"type": [ "null", "string" ],
"sqlType": "varchar(1012)",
}
}
class UserEgressJob(CopyToTable):
columns = [
("age", "int"),
("name", "varchar(1012)"),
]

...

www.scling.com
Logical types
43
case classes
Logical types
case t"""Instant""" =>
JObject(List(JField("type", JString("long")), JField("logicalType",
JString("timestamp-micros"))))

case t"""LocalDate""" => JObject(List(JField("type", JString("int")),
JField("logicalType", JString("date"))))

case t"""YearMonth""" => JObject(List(JField("type", JString("int"))))

case t"""JObject""" => JString("string")
●Avro logical types
○E.g. date → int, timestamp → long
○Default is timestamp-millis
■Great for year > 294441 (!)

●Custom logical types
○Time
○Collections
○Physical

www.scling.com
Schema on read or write?
44
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here

www.scling.com
Hydration boilerplate
45

www.scling.com
Chimney - case class transformer
46
●Commonality in schema classes
○Copy + a few more fields
○Drop fields

●Statically typed
○Forgot a field - error
○Wrong type - error

www.scling.com
Chimney in real code
47

www.scling.com















Data
lake






Private
pond






Cold
store
Ingest prepared for deletion
48
Mutation






Landing
pond
Append +
delete
Immutable,
limited
retention

www.scling.com
●PII fields encrypted
●Per-user decryption key table
●Clear single user key => oblivion

-Extra join + decrypt
-Decryption (user) id needed
+Multi-field oblivion
+Single dataset leak → no PII leak
+Handles transformed PII fields
Lost key pattern
49

www.scling.com
Shieldformation
50
@PrivacyShielded
case class Sale(
@PersonalId customerClubId: Option[String],
@PersonalData storeId: Option[String],
item: Option[String],
timestamp: String
)
case class SaleShielded(
shieldId: Option[String],
customerClubIdEncrypted: Option[String],
storeIdEncrypted: Option[String],
item: Option[String],
timestamp: String
)
case class SaleAnonymous(
item: Option[String],
timestamp: String
)
object SaleAnonymize extends SparkJob {
...
}
ShieldForm
object SaleExpose extends SparkJob {
...
}
object SaleShield extends SparkJob {
...
}
case class Shield(
shieldId: String,
personId: Option[String],
keyStr: Option[String],
encounterDate: String
)

www.scling.com
Shield
Shieldformation & lost key
51
SaleShield
Sale
Sale
Shielded
Shield
Deletion
requests
Customer
History
Exposed egress
SaleExpose
Limited retention
SaleAnonymize
Sale
Anonymous
Sale
Stats

www.scling.com
Schema on write!
52
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here

www.scling.com 53
Agility is essential
●Innovation, cost of operations

●Cultural & technical challenges

●Leave data warehousing behind
Josh Baer: Powering Spotify's audio
personalization platform

www.scling.com 54
How long from idea to
pipeline in production?
6-12 weeks
A manager heard that
some companies do it
in hours. What would
you do to get there?
I push back on
unrealistic
management
requirements.
Agility is essential
●Innovation, cost of operations

●Cultural & technical challenges

●Leave data warehousing behind
Josh Baer: Powering Spotify's audio
personalization platform

www.scling.com 55
How long from idea to
pipeline in production?
6-12 weeks
A manager heard that
some companies do it
in hours. What would
you do to get there?
I push back on
unrealistic
management
requirements.
Agility is essential
●Innovation, cost of operations

●Cultural & technical challenges

●Always improve - never be content
○Weeks → hours → seconds
Josh Baer: Powering Spotify's audio
personalization platform
Code & continuous improvement