Cobrix – a COBOL Data Source for Spark

Hadoop_Summit 2,935 views 40 slides Apr 23, 2019
Slide 1
Slide 1 of 40
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40

About This Presentation

The financial industry operates on a variety of different data and computing platforms. Integrating these different sources into a centralized data lake is crucial to support reporting and analytics tools.

Apache Spark is becoming the tool of choice for big data integration analytics due it’s sca...


Slide Content

Cobrix –A COBOL data
source for Spark
RuslanIushchenko(ABSA), Felipe Melo(ABSA)

•Who we are?
•Motivation
•Mainframe files and copybooks
•Loading simple files
•Loading hierarchical databases
•Performance and results
•Cobrixin ABSA Big Data space
Outline
2/40

About us
. ABSA is a Pan-African financial services provider
-With Apache Spark at the core of its data engineering
. We fill gaps in the Hadoop ecosystem, when we find them
. Contributions to Apache Spark
. Spark-related open-source projects (https://github.com/AbsaOSS)
-Spline-a data lineage tracking and visualization tool
-ABRiS-Avro SerDefor structured APIs
-Atum-Data quality library for Spark
-Enceladus-A dynamic data conformance engine
-Cobrix-A Cobol library for Spark (focus of this presentation)
3/40

The market for Mainframes is strong, with no signs of cooling down.
Mainframes
Are used by 71%of Fortune 500companies
Are responsible for 87%of all credit card transactionsin the world
Are part of the IT infrastructure of 92out of the 100 biggest banksin the world
Handle 68%of the world’s production IT workloads, while accounting for only 6%
of IT costs.
For companies relying on Mainframes, becoming data-centric can be
prohibitively expensive
High cost of hardware
Expensive business model for data science related activities
Source: http://blog.syncsort.com/2018/06/mainframe/9-mainframe-statistics/
Business Motivation
4/40

Technical Motivation
•The process above takes 11 days for a 600GB file
•Legacy data models (hierarchical)
•Need for performance, scalability, flexibility, etc
•SPOILER alert: we brought it to 1.1 hours 5/40
MainframesPC
Fixed-length
Text Files
CSVHDFS
1. Extract2. Transform
4. Load3. Transform
Proprietary
Tools

•Run analytics / Spark on mainframes
•Message Brokers (e.g. MQ)
•Sqoop
•Proprietary solutions
•But ...
•Pricey
•Slow
•Complex (specially for legacy systems)
•Require human involvement
What can you do?
6/40

How Cobrixcan help
•Decreasing human involvement
•Simplifying the manipulation of hierarchical structures
•Providing scalability
•Open-source
7/40

Mainframe file
(EBCDIC)
Schema
(copybook)
Apache
Spark Application
CobrixOutput
(Parquet, JSON, CSV…)dfdfdf
transformations
Writer
...
Cobrix–a custom Spark data source
8/40

A copybookis a schema definition
A data fileis a collection of binary records
Name: █ █ █ █ Age: █ █
Company: █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █ █
Zip: █ █ █ █ █
Name: JOHNAge: 32
Company: FOO.COM
Phone #: +2311-327
Zip: 12000
A * N J O H N G A 3 2 S H
K D K S I A S S A S K A S
A L , S D F O O . C O M X
L Q O K ( G A } S N B W E
S < N J X I C W L D H J P
A S B C + 2 3 1 1 -3 2 7
C = D 1 2 0 0 0 F H 0 D .
9/40

Similar to IDLs of Avro, Thrift, Protocol Buffers, etc.
structCompany {
1:required i64id,
2:required stringname,
3:optional list<string>contactPeople
}
Thrift
messageCompany {
required int64id = 1;
required stringname = 2;
repeated stringcontact_people = 3;
}
10COMPANY.
15ID PIC9(12) COMP.
15NAME PICX(40).
15CONTACT-PEOPLE PICX(20)
OCCURS10.
COBOL
recordCompany {
int64id;
stringname;
array<string>contactPeople;
}
10/40

valdf=spark
.read
.format("cobol")
.option("copybook","data/example.cob")
.load("data/example")
01RECORD.
05COMPANY-IDPIC 9(10).
05COMPANY-NAMEPIC X(40).
05ADDRESSPIC X(60).
05REG-NUMPIC X(8).
05ZIP PIC X(6).
A * N J O H N G A 3 2 S H K D K S I
A S S A S K A S A L , S D F O O . C
O M X L Q O K ( G A } S N B W E S <
N J X I C W L D H J P A S B C + 2 3
COMPANY_IDCOMPANY_NAMEADDRESSREG_NUMZIP
100ABCD Ltd.10 Garden st.879123703120
101ZjkLPj11 Park ave.123397123111
102Robotrd Inc. 12 Forest st.038297912000
103Xingzhoug8 Mountst.238901231222
104Example.co123 Tech str.312900119000
Loading Mainframe Data
11/40

valspark = SparkSession.builder() .appName("Example").getOrCreate()
valdf=spark
.read
.format("cobol")
.option("copybook","data/example.cob")
.load("data/example")
// ...Business logic goes here...
df.write.parquet("data/output")
This App is
●Distributed
●Scalable
●Resilient
EBCDIC to Parquet examples
12/40

A * N J O H N G A 3 2 S H K D K S I A
S S A S K A S A L , S D F O O . C O M
X L Q O K ( G A } S N B W E S < N J X
I C W L D H J P A S B C + 2 3 1 1 -3
2 7 C = D 1 2 0 0 0 F H 0 D . A * N J
O H N G A 3 2 S H K D K S I A S S A S
K A S A L , S D F O O . C O M X L Q O
K ( G A } S N B W E S < N J X I C W L
D H J P A S B C + 2 B W E S < N J X P
FIRST-NAME: █ █ █ █ █ █ LAST-NAME: █ █ █ █ █COMPANY-NAME: █ █ █ █ █ █ █ █ █ █ █ █ █ █
•Redefined fields AKA
•Unchecked unions
•Untagged unions
•Variant type fields
•Several fields occupy the same space
01RECORD.
05IS-COMPANY PIC 9(1).
05COMPANY.
10COMPANY-NAME PIC X(40).
05PERSONREDEFINES COMPANY.
10FIRST-NAME PIC X(20).
10LAST-NAME PIC X(20).
05ADDRESS PIC X(50).
05ZIP PIC X(6).
Redefined Fields
13/40

01RECORD.
05IS-COMPANY PIC 9(1).
05COMPANY.
10COMPANY-NAME PIC X(40).
05PERSONREDEFINES COMPANY.
10FIRST-NAME PIC X(20).
10LAST-NAME PIC X(20).
05ADDRESS PIC X(50).
05ZIP PIC X(6).
•Cobrixapplies all redefines for each
record
•Some fields can clash
•It’s up to the user to apply business logic
to separate correct and wrong data
IS_COMPANYCOMPANYPERSONADDRESSZIP
1{“COMPANY_NAME”: “September Ltd.”}{“FIRST_NAME”: “Septem”,
“LAST_NAME”: “berLtd.”}
74 Lawn ave., Denver39023
0{“COMPANY_NAME”: “Beatrice
Gagliano”}
{“FIRST_NAME”: “Beatrice”,
“LAST_NAME”: “Gagliano”}
10 Garden str.33113
1{“COMPANY_NAME”: “Beatrice
Gagliano”}
{“FIRST_NAME”: “Januar”,
“LAST_NAME”: “y Inc.”}
122/1 Park ave.31234
Redefined Fields
14/40

df.select($"IS_COMPANY",
when($"IS_COMPANY"=== true, "COMPANY_NAME")
.otherwise(null).as("COMPANY_NAME"),
when($"IS_COMPANY"=== false, "CONTACTS").otherwise(null).as("FIRST_NAME")),
...
IS_COMPANYCOMPANY_NAMEFIRST_NAMELAST_NAMEADDRESSZIP
1September Ltd.74 Lawn ave., Denver39023
0 BeatriceGagliano10 Garden str.33113
1January Inc.122/1 Park ave.31234
Clean Up Redefined Fields + flatten structs
15/40

Hierarchical DBs
•Several record types
•AKA segments
•Each segment type has its
own schema
•Parent-childrelationships
between segments
COMPANY
ID: █ █ █ █ █ █ █ █ █
Name: █ █ █ █ █ █ █ █ █ █ █ █
Address: █ █ █ █ █ █ █ █ █
CONTACT-PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █
Root segment
Child segment
CONTACT-PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █…
Child segment
16/40

•The combined copybook has to contain all the segments as redefined
fields:
01COMPANY-DETAILS.
05SEGMENT-ID PIC X(5).
05COMPANY-ID PIC X(10).
05COMPANY.
10NAME PIC X(15).
10ADDRESS PIC X(25).
10REG-NUM PIC 9(8) COMP.
05CONTACT REDEFINES COMPANY.
10PHONE-NUMBER PIC X(17).
10CONTACT-PERSON PIC X(28).
common data
segment 1
segment 2
COMPANY
Name: █ █ █ █ █ █ █ █ █ █ █
Address: █ █ █ █ █ █ █ █ █
Reg-Num: █ █ █ █ █ █
CONTACT
Phone #: █ █ █ █ █ █ █
Contact Person: █ █ █ █
█ █ █ █ █ █ █ █ █ █ █
Defining a Copybook
17/40

•The code snippet for reading the data:
valdf= spark
.read
.format("cobol")
.option("copybook", "/path/to/copybook.cpy")
.option("is_record_sequence", "true")
.load("examples/multisegment_data")
Reading all the segments
18/40

•The dataset for the whole copybook:
•Invalid redefines are highlighted
SEGMENT_IDCOMPANY_IDCOMPANYCONTACT
C1005918818[ ABCD Ltd. ][ invalid ]
P1005918818[ invalid ][ Cliff Wallingford ]
C1036146222[ DEFG Ltd. ][ invalid ]
P1036146222[ invalid ][ Beatrice Gagliano ]
C1045855294[ Robotrd Inc. ][ invalid ]
P1045855294[ invalid ][ Doretha Wallingford ]
P1045855294[ invalid ][ Deshawn Benally ]
P1045855294[ invalid ][ Willis Tumlin ]
C1057751949[ Xingzhoug ][ invalid ]
P1057751949[ invalid ][ Mindy Boettcher ]
Reading all the segments
19/40

A * N J O
H N G A 3
2 S H K D
K S I A S
SA S K A
S A L , S
D F O O.
C O M X L
Q O K ( G
A } S N B
W E S < N
J X I C W
L D H J P
A S B C +
2 3 1 1 -
3 2 7 C =
D 1 2 0 0
0 F H 0 D
. K A I O
D A P D F
C J S C D
A D F R J
F D F C L
COMPANY
ID: 280003941
Name: Example.com█
Address: 10█Garden
PERSON
Name: JANE█ █ █ █
ROBERTS█ █
Phone #: +935280
PERSON
Name: JANE█ █ █ █
ROBERTS█ █
Phone #: +935280
COMPANY
ID: 280003941
Name: Example.com█
Address: 10█Garden
COMPANY
ID: 280003941
Name: Example.com█
Address: 10█Garden
IdNameAddressReg_Num
100Example.com10 Garden st.8791237
101ZjkLPj11 Park ave.1233971
102Robotrd Inc. 12 Forest st.0382979
103Xingzhoug8 Mountst.2389012
104ABCD Ltd.123 Tech str.3129001
Company_IdContact_PersonPhone_Number
100Jane+32186331
100Colyn+23769123
102Robert+12389679
102Teresa+32187912
102Laura+42198723
Separate segments by dataframes
20/40

•Filter segment #1 (companies)
valdfCompanies=
df.filter($"SEGMENT_ID"==="C")
.select($"COMPANY_ID",
$"COMPANY.NAME".as($"COMPANY_NAME"),
$"COMPANY.ADDRESS",
$"COMPANY.REG_NUM")
Company_IdCompany_NameAddressReg_Num
100ABCD Ltd.10 Garden st.8791237
101ZjkLPj11 Park ave.1233971
102Robotrd Inc. 12 Forest st.0382979
103Xingzhoug8 Mountst.2389012
104Example.co123 Tech str.3129001
Reading root segments
21/40

•Filter segment #2 (people)
valdfContacts= df
.filter($"SEGMENT_ID"==="P")
.select($"COMPANY_ID",
$"CONTACT.CONTACT_PERSON",
$"CONTACT.PHONE_NUMBER")
Company_IdContact_PersonPhone_Number
100Marry+32186331
100Colyn+23769123
102Robert+12389679
102Teresa+32187912
102Laura+42198723
Reading child segments
22/40

Company_IdCompany_NameAddressReg_Num
100ABCD Ltd.10 Garden st.8791237
101ZjkLPj11 Park ave.1233971
102Robotrd Inc. 12 Forest st.0382979
103Xingzhoug8 Mountst.2389012
104Example.co123 Tech str.3129001
Company_IdContact_PersonPhone_Number
100Marry+32186331
100Colyn+23769123
102Robert+12389679
102Teresa+32187912
102Laura+42198723
Company_IdCompany_NameAddressReg_NumContact_PersonPhone_Number
100ABCD Ltd.10 Garden st.8791237Marry+32186331
100ABCD Ltd.10 Garden st.8791237Colyn+23769123
102Robotrd Inc. 12 Forest st.0382979Robert+12389679
102Robotrd Inc. 12 Forest st.0382979Teresa+32187912
102Robotrd Inc. 12 Forest st.0382979Laura+42198723
The two segments can now be joined by Company_Id
23/40

•Joining segments 1 and 2
valdfJoined=
dfCompanies.join_outer(dfContacts, "COMPANY_ID")
Results:
Company_IdCompany_NameAddressReg_NumContact_PersonPhone_Number
100ABCD Ltd.10 Garden st.8791237Marry+32186331
100ABCD Ltd.10 Garden st.8791237Colyn+23769123
102Robotrd Inc. 12 Forest st.0382979Robert+12389679
102Robotrd Inc. 12 Forest st.0382979Teresa+32187912
102Robotrd Inc. 12 Forest st.0382979Laura+42198723
Joining in Spark is easy
24/40

•The joined table can also be denormalizedfor document storage
valdfCombined=
dfJoined
.groupBy($"COMPANY_ID",
$"COMPANY_NAME",
$"ADDRESS",
$"REG_NUM")
.agg(
collect_list(
struct($"CONTACT_PERSON",
$"PHONE_NUMBER"))
.as("CONTACTS"))
{
"COMPANY_ID": "8216281722",
"COMPANY_NAME": "ABCD Ltd.",
"ADDRESS": "74 Lawn ave., New York",
”REG_NUM": "33718594",
"CONTACTS": [
{
"CONTACT_PERSON": "CasseyNorgard",
"PHONE_NUMBER": "+(595) 641 62 32"
},
{
"CONTACT_PERSON": "VerdieDeveau",
"PHONE_NUMBER": "+(721) 636 72 35"
},
{
"CONTACT_PERSON": "OteliaBatman",
"PHONE_NUMBER": "+(813) 342 66 28"
}
]
}
Denormalizedata
25/40

Restore parent-child relationships
•In our example we had
COMPANY_ID field that is
present in all segments
•In real copybooks this is not
the case
•What can we do?
COMPANY
ID: █ █ █ █ █ █ █ █ █
Name: █ █ █ █ █ █ █ █ █ █ █ █
Address: █ █ █ █ █ █ █ █ █
CONTACT-PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █
Root segment
Child segment
CONTACT-PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █…
Child segment
26/40

01COMPANY-DETAILS.
05SEGMENT-ID PIC X(5).
05COMPANY.
10NAME PIC X(15).
10ADDRESS PIC X(25).
10REG-NUM PIC 9(8) COMP.
05CONTACT REDEFINES COMPANY.
10PHONE-NUMBER PIC X(17).
10CONTACT-PERSON PIC X(28).
•If COMPANY_ID is not part
of all segments
Cobrixcan generate it for you
valdf= spark
.read
.format("cobol")
.option("copybook", "/path/to/copybook.cpy")
.option("is_record_sequence", "true")
.option("segment_field", "SEGMENT-ID").option("segment_id_level0", "C")
.option("segment_id_prefix", "ID")
.load("examples/multisegment_data")
No COMPANY-IDId Generation
27/40

01COMPANY-DETAILS.
05SEGMENT-ID PIC X(5).
05COMPANY.
10NAME PIC X(15).
10ADDRESS PIC X(25).
10REG-NUM PIC 9(8) COMP.
05CONTACT REDEFINES COMPANY.
10PHONE-NUMBER PIC X(17).
10CONTACT-PERSON PIC X(28).
•Seg0_Id can be used to restore
parent-child relationship
between segments
No COMPANY-ID
SEGMENT_IDSeg0_IdCOMPANYCONTACT
CID_0_0[ ABCD Ltd. ][ invalid ]
PID_0_0[ invalid ][ Cliff Wallingford ]
CID_0_2[ DEFG Ltd. ][ invalid ]
PID_0_2[ invalid ][ Beatrice Gagliano ]
CID_0_4[ Robotrd Inc. ][ invalid ]
PID_0_4[ invalid ][ Doretha Wallingford ]
PID_0_4[ invalid ][ Deshawn Benally ]
Id Generation
28/40

•When transferred from a mainframe a hierarchical database becomes
•A sequence of records
•To read next record a previous record should be read first
•A sequential format by it's nature
•How to make it scalable?
A * N J O H N G A 3 2 S H K D K S I A S SA S K A S A L , S D F O O. C O M X L Q O K ( G A } S N B W E S < N J
X I C W L D H J P A S B C + 2 3 1 1 -3 2 7 C = D 1 2 0 0 0 F H 0 D . K A I O D A P D F C J S C D C D C W E P 1
9 1 2 3 –3 1 2 2 1 . 3 1 F A D F L 1 7
COMPANY
ID: █ █ █ █ █ █ █ █ █
Name: █ █ █ █ █ █ █ █ █ █ █ █
Address: █ █ █ █ █ █ █ █ █
COMPANY
ID: █ █ █ █ █ █ █ █ █
Name: █ █ █ █ █ █ █ █ █ █ █ █
Address: █ █ █ █ █ █ █ █ █
PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █
A data file
PERSON
Name: JANE█ █ █ █
ROBERTS█ █
Phone #: +935280
PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █
PERSON
Name: █ █ █ █ █ █ █ █
█ █ █ █ █ █ █ █ █
Phone #: █ █ █ █ █ █ █
COMPANY
ID: 280003941
Name: Example.com█
Address: 10█Garden
Variable Length Records (VLR)
29/40

Performance challenge of VLRs
•Naturally sequential files
•To read next record the prior
record need to be read first
•Each record had a length
field
•Acts as a pointer to the next
record
•No record delimiter when
reading a file from the middle
VLR structure
30/40
0
20
40
60
80
100
120
140
160
180
0 10 20 30 40 50 60 70
MB/s
Number of Spark cores
Throughput, variable record length
Sequential processing
10 MB/s

Spark on HDFS
Blocks
Partitions
Data Node
Spark Executor
Blocks
Partitions
Data Node
Spark Executor
Blocks
Partitions
Data Node
Spark Executor
. . .
HDFS Namenode
Spark Driver
31/40

Data node 1Data node 2Data node 3
HDFS
Cobrix
1-Readheaders
List(offsets,lengths)
Spark
3 -Parse records
In parallel from
parallelized offsets
and lengths
Spark cluster
2 -Parallelize
Offsets and lengths
Parallelizing Sequential Reads
32/40

Data node 1Data node 2Data node 3
HDFS
NamenodeCobrix
1-ReadVLR1header
offset=3000
length=1002 -where is
offset 3000
until 3100?
3 -Check nodes
3, 18and 41. Spark4 -Load VLR 1
Preferred location
is node 3
5 -Launch task
On executor
hosted on node 3
Spark cluster
Enabling Data Locality for VLRs
33/40

Throughput when sparse indexes are used
•Experiments were ran
on our lab cluster
•4 nodes
•380 cores
•10 Gbit network
•Scalable–for bigger
files when using more
executors the
throughput is bigger
34/40
0
20
40
60
80
100
120
140
160
180
0 10 20 30 40 50 60 70
MB/s
Number of Spark cores
Throughput, variable record length
10 GB file
20 GB file
40 GB file
Sequential

Comparison versus fixed length record performance
●Distribution and locality is
handled completely by Spark
●Parallelism is achieved using
sparse indexes
35/40
0
20
40
60
80
100
120
140
160
180
0 10 20 30 40 50 60 70
MB/s
Number of Spark cores
Throughput, fixed length records
40 GB file
150 MB/s
0
20
40
60
80
100
120
140
160
180
0 10 20 30 40 50 60 70
MB/s
Number of Spark cores
Throughput, variable record length
40 GB file
Sequential
145 MB/s
10 MB/s

Cobrixin ABSA Data Infrastructure -Batch
Mainframes
180+
sources
HDFS
EnceladusCobrix
Spark
Spline
2. Parse
3. Conform
4. Track Lineage5. Re-ingest
6. Consume
36/40

Cobrixin ABSA Data Infrastructure -Stream
Mainframes
180+
sources
Cobrix
ParserABRiS
Enceladus
Kafka
Spline
1.Ingest2.Parse3.Avro
4.Conform
5. Track Lineage
6. Consume
37/40

Cobrixin ABSA Data Infrastructure
Mainframes
180+
sources
HDFS
Cobrix
ParserABRiS
Enceladus
Kafka
Cobrix
Spark
Spline
1.Ingest
2. Parse
3. Conform
4. Track Lineage5. Re-ingest
6. Consume
2.Parse3.Avro
4.Conform
5. Track Lineage
6. Consume
Batch
Stream
38/40

●Thanks to the following people the project was made possible and for all
the help along the way:
○Andrew Baker, Francois Cillers, Adam Smyczek,
Jan Scherbaum, Peter Moon, Clifford Lategan,
Rekha Gorantla, MohitSuryavanshi, NielSteyn
•Thanks to the authors of the original COBOL parser:
○Ian De Beer, Rikusde Milander
(https://github.com/zenaptix-lab/copybookStreams)
Acknowledgment
39/40

•Combine expertise to make access mainframe data in Hadoop seamless
•Our goal is to support the widest range of use cases possible
•Report a bug !
•Request new feature !
•Create a pull request !"#$
Our home: https://github.com/AbsaOSS/cobrix
Your Contribution is Welcome
40/40