Schema on read is obsolete. Welcome metaprogramming..pdf

lallea 242 views 36 slides Apr 27, 2024
Slide 1
Slide 1 of 36
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36

About This Presentation

How fast can you modify your data collection to include a new field, make all the necessary changes in data processing and storage, and then use that field in analytics or product features? For many companies, the answer is a few quarters, whereas others do it in a day. This data agility latency has...


Slide Content

www.scling.com
Schema on read is obsolete.
Welcome metaprogramming.

Data Innovation Summit, 2024-04-24
Lars Albertsson
Scling

1

www.scling.com
IT craft to factory
2



Security



Waterfall



Application
delivery



Traditional
operations



Traditional
QA



Infrastructure



DevSecOps



Agile



Containers



DevOps



CI/CD



Infrastructure
as code

www.scling.com



Security



Waterfall
Data factories
3



Application
delivery



Traditional
operations



DevSecOps



Traditional
QA



Infrastructure




DB-oriented
architecture



Agile



Containers



DevOps



CI/CD



Infrastructure
as code




Data factories,
data pipelines,
DataOps

www.scling.com
Craft vs industry
4
●Each step steered by human
○Or primitive automation

●Improving artifacts

●Craft is primary competence

●Components made for humans
○Look nice, "easy to use"
○More popular

●Autonomous processes

●Improving process that creates artifacts

●Multitude of competences

●Some components unusable by humans
○Hard, greasy
○Made for integration
○Less popular

www.scling.com
Data engineering in the future
5
DW
~10 year capability gap
"data factory engineering"
Enterprise big data failures
"Modern data stack" -
traditional workflows, new technology





4GL / UML phase of data engineering
Data engineering education

www.scling.com
Efficiency gap, data cost & value
●Data processing produces datasets
○Each dataset has business value

●Proxy value/cost metric: datasets / day
○S-M traditional: < 10
○Bank, telecom, media: 100-1000
6
2014: 6500 datasets / day
2016: 20000 datasets / day
2018: 100000+ datasets / day,
25% of staff use BigQuery
2021: 500B events collected / day
2016: 1600 000 000
datasets / day
Disruptive value of data, machine learning
Financial, reporting
Insights, data-fed features
effort
value

www.scling.com
Data-factory-as-a-service
7


Data lake
●Data factory
○Collected, raw data →
processed, valuable data

●Data pipelines customised for client
○Analytics (BI, reports, A/B testing)
○Data-fed features (autocomplete, search)
○Learning systems (recommendations, fraud)

●Compete with data leaders:
○Quick idea-to-production
○Operational efficiency

{....}
{....}
{....}

www.scling.com
Data agility
8
●Siloed: 6+ months


Cultural work


●Autonomous: 1 month


Technical work


●Coordinated: days

Data lake


Latency?

www.scling.com










●Lowest common denominator = name, type, required
○Types: string, long, double, binary, array, map, union, record
●Schema specification may support additional constraints, e.g. integer range, other collections
What is a schema?
9
Id Name Age Phone
1 "Anna" 34 null
2 "Bob" 42 "08-123456"
Fields
Name Type Required?
In RDBMS, relations are explicit

In lake/stream datasets, relations are implicit

www.scling.com
Schema definitions
10
{
"type" : "record",
"namespace" : "com.mapflat.example",
"name" : "User",
"fields" : [
{ "name" : "id" , "type" : "int" },
{ "name" : "name" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "phone" , "type" : ["null", "string"],
"default": null }
]
}

●RDBMS: Table metadata
●Avro format: JSON/DSL definition
○Definition is bundled with avro data files
○Reused by Parquet format
●pyschema / dataclass
●Scala case classes
●JSON-schema
●JSON: Each record
○One record insufficient to deduce schema


{ "id": 1, "name": "Alice", "age": "34" }
{ "id": 1, "name": "Bob", "age": "42", "phone": "08-123456" }

case class User(id: String, name: String, age: Int,
phone: Option[String] = None)

val users = Seq(User("1", "Alice", 32),
User("2", "Bob", 43, Some("08-123456")))

www.scling.com
Schema on write
11
●Schema defined by writer

●Destination (table / dataset / stream topic) has defined schema
○Technical definition with metadata (e.g. RDMBS, Kafka + registry)
○By convention

●Writes not in compliance are not accepted
○Technically aborted (e.g. RDBMS)
○In violation of intent (e.g. HDFS datasets)

●Can be technically enforced by producer driver
○Through ORM / code generation
○Schema registry lookup

Strict checking philosophy

www.scling.com
Schema on read
12
●Anything (technically) accepted when writing

●Schema defined by reader, at consumption
○Reader may impose requirements on type & value

●In dynamic languages, field propagate implicitly
○E-shopping example:
i.Join order + customer.
ii.Add device_type to order schema
iii.device_type becomes available in downstream datasets

●Violations of constraints are detected at read
○Perhaps long after production?
○By team not owning producer?

Loose checking philosophy

www.scling.com
Dynamic vs static typing
13
Schema on write Schema on read
Static typing Dynamic typing
Strict Loose
Possible
Java:
user.setName("Alice");
user2.getName();

Scala:
user = User(name = "Alice", ...)
user2.name
Java:
user.set("name", "Alice");
user2.get("name");

Python:
user.name = "Alice"
user2.name

www.scling.com
Schema on read or write?
14
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here

www.scling.com
●Expressive
●Custom types
●IDE support









●Avro for data lake storage
Schema definition choice
15
●RDBMS: Table metadata
●Avro: JSON/DSL definition
○Definition is bundled with avro data files
●Parquet
●pyschema / dataclass
●Scala case classes
●JSON-schema
●JSON: Each record
○One record insufficient to deduce schema


case class User(id: String, name: String, age: Int,
phone: Option[String] = None)

val users = Seq(User("1", "Alice", 32),
User("2", "Bob", 43, Some("08-123456")))

www.scling.com
Schema offspring
Test record
difference render
type classes
16
case classes
test equality
type classes
Avro
definitions
Java Avro
codec classes
Java <-> Scala
converters
Avro type
annotations
MySQL
schemas
CSV codecs
Privacy by
design
machinery
Python
Logical types

www.scling.com
Avro codecs
17
case classes
Avro
definitions
Java Avro
codec classes
Java <-> Scala
converters
{
"name": "JavaUser",
{ "name": "age", "type": "int" }
{ "name": "phone", "type": [ "null", "string" ] }
}
public class JavaUser implements SpecificRecord {
public Integer getAge() { ... }
public String getPhone() { ... }
}
object UserConverter extends AvroConverter[User] {
def fromSpecific(u: JavaUser): User
def toSpecific(u: User): JavaUser
}
case class User(age: Int,
phone: Option[String] = None)

www.scling.com
Scalameta
●Parsing and analysis of scala
source code

18
val a = b() + 3
["val", " ", "a", " ", "=", " ", "b",
"(", ")", " ", "+", " ", "3"]
[val, "a", =, Call("b"), +, Int(3)]
[val, Int(a), =,
Call(com.scling.func.b), +, Int(3)]
lex
parse
semantic
analysis

www.scling.com
Scalameta use cases
●Scalafmt

●Scalafix
○Static analysis
○Code transformation

●Online code generation - macros

●Offline code generation
19
// Example from scio 0.7 -> 0.8 upgrade rules
final class FixTensorflow extends SemanticRule("FixTensorflow") {
override def fix(implicit doc: SemanticDocument): Patch =
doc.tree.collect {
case t @ Term.Select(s, Term.Name( "saveAsTfExampleFile" )) =>
Patch.replaceTree(t, q"$s.saveAsTfRecordFile".syntax)
}.asPatch
}

www.scling.com
Schema & syntax tree
20
Defn.Class(
List(Mod.Annot(Init(Type.Name("PrivacyShielded"), , List())), case),
Type.Name("SaleTransaction"),
List(),
Ctor.Primary(
List(),
,
List(
List(
Term.Param(
List(Mod.Annot(Init(Type.Name("PersonalId"), , List()))),
Term.Name("customerClubId"),
Some(Type.Apply(Type.Name("Option"), List(Type.Name("String")))),
None
),
Term.Param(
List(Mod.Annot(Init(Type.Name("PersonalData"), , List()))),
Term.Name("storeId"),
Some(Type.Apply(Type.Name("Option"), List(Type.Name("String")))),
None
),
Term.Param(
List(),
Term.Name("item"),
Some(Type.Apply(Type.Name("Option"), List(Type.Name("String")))),
None
),
Term.Param(List(), Term.Name("timestamp"), Some(Type.Name("String")), None)
)
)
),
Template(List(), List(), Self(, None), List()))
@PrivacyShielded
case class SaleTransaction(
@PersonalId customerClubId: Option[String],
@PersonalData storeId: Option[String],
item: Option[String],
timestamp: String
)

www.scling.com
Quasiquotes
21
val stat: Stat = "val a = b() + 3".parse[Stat].get
val stat: Stat = q"val a = b() + 3"

www.scling.com
Quasiquotes in practice
22
q"""
object $converterName extends AvroConverter[ ${srcClass.clazz.name}] {
import RecordFieldConverters._

type S = $jClassName

def schema: Schema = $javaClassTerm.getClassSchema()

def tag: ClassTag[S] = implicitly[ClassTag[S]]

def datumReader: SpecificDatumReader[S] = new SpecificDatumReader[ $jClassName](classOf[$jClassName])

def datumWriter: SpecificDatumWriter[S] = new SpecificDatumWriter[ $jClassName](classOf[$jClassName])

def fromSpecific(record: $jClassName): ${srcClass.clazz.name} =
${Term.Name(srcClass.clazz.name.value)} (..$fromInits )

def toSpecific(record: ${srcClass.clazz.name}): $jClassName =
new $jClassName(..$specificArgs)
}
"""

www.scling.com
Test equality
Test record
difference render
type classes
23
case classes
test equality
type classes
trait REquality[T] { def equal(value: T, right: T): Boolean }

object REquality {
implicit val double: REquality[Double] = new REquality[Double] {
def equal(left: Double, right: Double): Boolean = {
// Use a combination of absolute and relative tolerance
left === right +- 1e-5.max(left.abs * 1e-5).max(right.abs * 1e-5)
}
}

/** binds the Magnolia macro to the `gen` method */
implicit def gen[T]: REquality[T] = macro Magnolia.gen[T]
}
object Equalities {
implicit val equalityUser: REquality[User] =
REquality.gen[User]
}

www.scling.com
case class User(
age: Int,
@AvroProp("sqlType", "varchar(1012)")
phone: Option[String] = None)
Python + RDBMS
24
case classes
Avro
definitions
Avro type
annotations
MySQL
schemas
Python
{
"name": "User",
{ "name": "age", "type": "int" }
{ "name": "phone",
"type": [ "null", "string" ],
"sqlType": "varchar(1012)",
}
}
class UserEgressJob(CopyToTable):
columns = [
("age", "int"),
("name", "varchar(1012)"),
]

...

www.scling.com
Logical types
25
case classes
Logical types
case t"""Instant""" =>
JObject(List(JField("type", JString("long")), JField("logicalType",
JString("timestamp-micros"))))

case t"""LocalDate""" => JObject(List(JField("type", JString("int")),
JField("logicalType", JString("date"))))

case t"""YearMonth""" => JObject(List(JField("type", JString("int"))))

case t"""JObject""" => JString("string")
●Avro logical types
○E.g. date → int, timestamp → long
○Default is timestamp-millis
■Great for year > 294441 (!)

●Custom logical types
○Time
○Collections
○Physical

www.scling.com
Schema on read or write?
26
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here

www.scling.com
Hydration boilerplate
27

www.scling.com
Chimney - case class transformer
28
●Commonality in schema classes
○Copy + a few more fields
○Drop fields

●Statically typed
○Forgot a field - error
○Wrong type - error

www.scling.com
Chimney in real code
29

www.scling.com
Stretching the type system
30
●Fail: mixup kW and kWh

●Could be a compile-time error. Should be.

●Physical dimension libraries
○Boost.Units - C++
○Coulomb - Scala

www.scling.com















Data
lake






Private
pond






Cold
store
Ingest prepared for deletion
31
Mutation






Landing
pond
Append +
delete
Immutable,
limited
retention

www.scling.com
●PII fields encrypted
●Per-user decryption key table
●Clear single user key => oblivion

-Extra join + decrypt
-Decryption (user) id needed
+Multi-field oblivion
+Single dataset leak → no PII leak
+Handles transformed PII fields
Lost key pattern
32

www.scling.com
Shieldformation
33
@PrivacyShielded
case class Sale(
@PersonalId customerClubId: Option[String],
@PersonalData storeId: Option[String],
item: Option[String],
timestamp: String
)
case class SaleShielded(
shieldId: Option[String],
customerClubIdEncrypted: Option[String],
storeIdEncrypted: Option[String],
item: Option[String],
timestamp: String
)
case class SaleAnonymous(
item: Option[String],
timestamp: String
)
object SaleAnonymize extends SparkJob {
...
}
ShieldForm
object SaleExpose extends SparkJob {
...
}
object SaleShield extends SparkJob {
...
}
case class Shield(
shieldId: String,
personId: Option[String],
keyStr: Option[String],
encounterDate: String
)

www.scling.com
Shield
Shieldformation & lost key
34
SaleShield
Sale
Sale
Shielded
Shield
Deletion
requests
Customer
History
Exposed egress
SaleExpose
Limited retention
SaleAnonymize
Sale
Anonymous
Sale
Stats

www.scling.com
Schema on write!
35
DB
DB
DB
Service
Service
Export
Business
intelligenceChange agility important here
Production stability important here

www.scling.com
Data factory track record
36
Time to
first flow
Staff size 1st flow
effort, weeks
1st flow cost
(w * 50K ?)
Time to
innovation
Flows 1y
after first
Media 1+ years 10-30 1500? 100M (0.5-1B)1+ year ?
Finance 2 years 10-50 2000? 100M? Years 10?
Media 3 weeks 4.5 - 8 15 750K 3 months 30
Retail 7 weeks 1-3 7 500K * 6 months 70
Telecom 12 weeks2-5 30 1500K 6 months 50
Consumer
products
20+ weeks1.5 30+ 1200+K 6+ months20
Construction 8 weeks 0.5 4 150K * 7 months 10
Manufacturing8 weeks 0.5 4 200K * 6 months ?