How The Weather Company Uses Apache Spark to Serve Weather Data Fast at Low Cost

databricks 355 views 29 slides Nov 24, 2020
Slide 1
Slide 1 of 29
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29

About This Presentation

The Weather Company (TWC) collects weather data across the globe at the rate of 34 million records per hour, and the TWC History on Demand application serves that historical weather data to users via an API, averaging 600,000 requests per day. Users are increasingly consuming large quantities of his...


Slide Content

How The Weather Company® Uses Spark
to Serve Weather Data Faster and
Cheaper
Erik Goepfert and Paula Ta-Shma
IBM TWC and IBM Research
By Leveraging IBM Cloud® SQL Query and Cloud Object Storage

The Weather Company started with a
simple mission to

decisions
solutions
Map the
atmosphere
every
15 minutes
Process over
400 terabytes
of data daily
Deliver more than 50 billion requests for
weather information every day and produce
25 billion forecasts daily
Source: Qliksenseinternal report, April 2017; According to internal forecasting system + # of locations in the
world by Lat Lon locations (2 decimal places); 400 terabytes according to internal SUN platform numbers
And has evolved into

Source: ForecastWatch, Three Region Accuracy Overview, 2010-2017,https://www.forecastwatch.com/static/Three_Region_Accuracy_Overview_2010-2017.pdf
History on Demand
Conditions (HoD)
Provides access to a
worldwide, hourly, high-
resolution, gridded
dataset of past weather
conditions via a web API
Global 4 km grid
0.044-degree resolution
34 potential weather properties
34 million records added every hour
Geospatial and temporal search
Point, bounding box, and polygon search over a time range
Usage
Averages 600,000 requests per day
Used by clients primarily for machine learning and data analytics
Supports research in domains such as climate science,energy &
utilities, agriculture, transportation, insurance, and retail

Problems with our previous solution
▪Expensive
▪Our synchronous data access solution is expensive
▪Limited storage capacity
▪Hard storage limits per cluster with our previous cloud provider and storage solution
▪We have reduced the data we provide
▪To lower cost and stay below the storage limit, we've reduced our data to land only, and 20 of the available 34 weather properties
▪Clients are limited to small requests
▪To allow for a synchronous interaction, clients are required to limit the scope of their requests to 2,400 records
▪Slow at retrieving large amounts of data
▪Because of the small query sizes, it is time consuming to retrieve large amounts of data

Our new asynchronous solution
▪More cost-effective
▪Our use of IBM Cloud SQL Query and Cloud Object Storage has resulted in an order of magnitude reduction in cost
▪Unlimited storage
▪With Cloud Object Storage we effectively have an unlimited storage capacity
▪Global weather data coverage with all 34 weather properties
▪With the reduced cost and unlimited storage we no longer have to limit the data we provide
▪Support for large requests
▪With an asynchronous interaction, clients can now submit a single request for everything they're interested in
▪Large amounts of data retrieved quickly with a single query
▪Because we can rely on IBM Cloud SQL Query using Spark behind the scenes, large queries complete relatively quickly

Solution Overview
Serverless approach
▪Pay per use -> Low cost
IBM Cloud SQL Query
▪Serverless SQL powered by Spark
IBM Cloud Object Storage (COS)
▪S3 Compatible API
Apply Best Practices
▪Parquet
▪Geospatial Data Layout

IBM Cloud SQL Query
▪Serverless SQL service built on Apache Spark
▪Supports all Spark native data formats e.g. Parquet, ORC, CSV, Avro, JSON
▪Intuitive UI, no set up/installation required
▪Integrated with Watson Studio notebooks, Cloud Functions and supports REST APIs
▪Query and ETL data on COS directly
▪Also supports Db2
▪High Performance
▪Built-in Catalog –multi-tenant Hive Metastore
▪Data Skipping indexes
▪Low Cost
▪$5/TB scanned
▪Skip over irrelevant data using Catalog and Data Skipping
▪Zero standing cost

SQL Query Catalog
▪Multi-tenant Hive Metastore(HMS)
▪Critical to achieve high performance for Big Data
1.Spark SQL leverages HMS for partition pruning –avoid reading irrelevant partitions
▪More flexible than the Hive Style Partitioning naming convention
2.Significantly reduces time spent on object listing
▪HMS enables listing partitions in parallel –sequential listing can be very slow
3.Stores schema and statistics for Cost Based Optimization
▪Useful to maintain consistency
▪Can consistently replace an entire partition
▪Needed for changing the data layout
▪Future: use Delta/Iceberg/Hudi format for this
Weather/dt=2020-08-17/part-00085.parquet
Weather/dt=2020-08-17/part-00086.parquet
Weather/dt=2020-08-17/part-00087.parquet
Weather/dt=2020-08-17/part-00088.parquet
Weather/dt=2020-08-18/part-00001.parquet
Weather/dt=2020-08-18/part-00002.parquet
Partition MD
Partition MD
HMS

Geospatial Analytics in SQL Query
▪Supports geospatial operations and data types
-According to the SQL/MM standard
-Aggregation, computation and join via native SQL syntax
▪Geodetic Full Earth support
-Increased developer productivity
▪Avoid piece-wise planar projections
-High precision calculations anywhere on earth
-Very large polygons e.g. countries, polar caps etc.
▪Native geohash support
-Fine granularity
-Fast spatial aggregation
▪Geospatial Data Skipping

Data Skipping in SQL Query
▪Avoid reading irrelevant objects
using indexes
▪Complements partition pruning -> object level pruning
▪Stores aggregate metadata per object to enable skipping
decisions
▪Indexes are stored in COS
▪Supports multipleindex types
▪Currently MinMax, ValueList, BloomFilter, Geospatial
▪Underlying data skipping library is extensible
▪New index types can easily be supported
▪Enables data skipping for Queries
with UDFs
▪e.g. ST_Contains, ST_Distanceetc.
▪UDFs are mapped to indexes

How Data Skipping Works
Query
Prune
partitions
Read data
Query
Prune
partitions
Optional file
filter
Read data
Metadata
Filter
Spark SQL Query Execution Flow
Uses Catalyst optimizer and
session extensions API

Data Skipping Example
Weather/dt=2020-08-17/part-00085.parquet
Weather/dt=2020-08-17/part-00086.parquet
Weather/dt=2020-08-17/part-00087.parquet
Weather/dt=2020-08-17/part-00088.parquet
Weather/dt=2020-08-18/part-00001.parquet
Weather/dt=2020-08-18/part-00002.parquet
Data
Object Listing
Example Query
SELECT *
FROM cos://us-geo/twc/WeatherSTOREDASparquet
WHERE temp > 40
Object NameTemp
Min
Temp
Max
...
dt=2020-08-17/part-000857.9726.77
dt=2020-08-17/part-000862.4523.71
dt=2020-08-17/part-000876.4618.62
dt=2020-08-17/part-0008823.6741.02
...
Metadata
Red objectsare not relevant to this query

Data Skipping Example
Weather/dt=2020-08-17/part-00085.parquet
Weather/dt=2020-08-17/part-00086.parquet
Weather/dt=2020-08-17/part-00087.parquet
Weather/dt=2020-08-17/part-00088.parquet
Weather/dt=2020-08-18/part-00001.parquet
Weather/dt=2020-08-18/part-00002.parquet
Data
Object Listing
Example Query
SELECT *
FROM cos://us-geo/twc/WeatherSTOREDASparquet
WHERE temp > 40
Object NameTemp
Min
Temp
Max
...
dt=2020-08-17/part-000857.9726.77
dt=2020-08-17/part-000862.4523.71
dt=2020-08-17/part-000876.4618.62
dt=2020-08-17/part-0008823.6741.02
...
Metadata
Red objectsare not relevant to this query
Data layout is important
to get good skipping

HoDData Layout in Production
gcod/v1/
hourly/year=2019/month=2/
20190201T002000Z-part-00000.parquet
20190201T002000Z-part-00001.parquet

20190218T232000Z-part-00007.parquet
20190218T232000Z-part-00008.parquet
monthly/year=2019/month=1/
part-00000.parquet
part-00001.parquet

part-08191.parquet

HoDData Layout in Production
MonthlyHourly
* boundaries here are an approximation, not based on actual data

Geospatial Data Skipping Example
Example Query
SELECT *
FROMWeatherSTOREDASparquet
WHERE
ST_Contains(ST_WKTToSQL('POLYGON((-78.93
36.00, -78.67 35.78, -79.04 35.90, -78.93 36.00))'),
ST_Point(long, lat))
INTO cos://us-south/results STORED AS parquet
Object Namelat
Min
lat
Max
...
dt=2020-08-17/part-0008535.0236.17
dt=2020-08-17/part-0008643.5944.95
dt=2020-08-17/part-0008734.8640.62
dt=2020-08-17/part-0008823.6725.92
...
Metadata
Red objectsare not relevant to this query
Raleigh Research
Triangle (US)
Map ST Contains UDF
to necessary conditions
on lat, long

Query Rewrite Approach
Example Query
SELECT *
FROMWeatherSTOREDASparquet
WHERE
ST_Contains(ST_WKTToSQL('POLYGON((-78.93
36.00, -78.67 35.78, -79.04 35.90, -78.93 36.00))'),
ST_Point(long, lat))
INTO cos://us-south/results STORED AS parquet
Raleigh Research
Triangle (US)
Rewritten Query
SELECT *
FROMWeatherSTOREDASparquet
WHERE
ST_Contains(ST_WKTToSQL('POLYGON((-78.93
36.00, -78.67 35.78, -79.04 35.90, -78.93 36.00))'),
ST_Point(long, lat))
AND long BETWEEN -79.04 AND -78.67
AND lat BETWEEN 35.78 AND 36.00
INTO cos://us-south/results STORED AS parquet

Benefits of Consolidated Metadata
Query rewrite approach
can leverage MinMax
metadata in
Parquet/ORC formats
Consolidated metadata
approach performs
better
▪Avoids reading footers
▪Better resource allocation
X3.6 faster

X10 Acceleration with Data Skipping and Catalog
Assumes query
rewrite approach
(yellow) is the
baseline
•Requires Parquet/ORC
For other formatsthe
acceleration is much
larger
•e.g.CSV/JSON/Avro
Experiment uses Raleigh Research
Triangle query
X10 speedup
on average

Demo

Demo Stats
▪6.404 TB in Parquet format
▪172,004 objects
▪36 MB per object (on average)
▪21 months of weather data
▪21 partitions
▪Create table: 3.8s
▪Recover partitions: 21.9s
▪Create indexes: 12 min 17.0s
▪Data scanned: 5.72 MB
▪Geospatial query: 1 min 14.0s
▪Data scanned: 20.4 MB
▪Catalog: skips 20 of 21 partitions
▪Data skipped: 8186 of 8190 objects
Performance StatsTWC Demo Dataset Properties

Example Query used by HoDin Production
SELECT *
FROM hod_gcod
WHERE (
year = 2016 AND 10 <= month
OR year BETWEEN 2017 AND 2019
OR year = 2020 AND month <= 3
)
AND date_timeBETWEEN timestamp("2016-10-15 00:00:00Z") AND timestamp("2020-03-11 00:00:00Z")
AND ST_Contains(
ST_Boundingbox(-111.711, 41.081, -109.953, 42.840),
ST_Point(longitude, latitude)
)
INTOcos://us-east/my-results-bucketSTORED ASCSV

Query Runtime for HoDin Production
Querying a 40x40 gridpointarea (25,000 km2) over time

HoDSync vs Async
Querying a 40x40 gridpointbbox(25,000 km2) to retrieve 1 year of data
Synchronous
(previous solution)Asynchronous
(new solution)
Query count8,0001
Total query time2h 15m3m 20s

HoDSync vs Async
▪Limited storage
▪Land only
▪20 weather properties
▪Query result size limit of 2,400 records
▪Unlimited storage
▪Global coverage
▪All 34 weather properties
▪Unlimited query result size
▪An order of magnitude reduction in cost
Asynchronous
(new solution)Synchronous
(previous solution)

Conclusions
▪Order of magnitude cost reduction
▪Order of magnitude performance improvements
▪Enhanced functionality
▪Key factors:
▪Serverless approach with IBM Cloud SQL Query + COS
1.Seamless integration with powerful geospatial library
2.Fully integrated Catalog
3.Geospatial data skipping
▪Our data skipping work is extensible

Thanks!
Contact Info:
Erik [email protected]
Paula [email protected]
Thanks to the team :
Ofer Biran, Dat Bui, LinsongChu, Patrick Dantressangle, Pranita Dewan,
Michael Factor, OshritFeder, Raghu Ganti, Michael Haide, Holly Hassenzahl,
Pete Ihlenfeldt, Guy Khazma, Simon Laws, Gal Lushi, Yosef Moatti, Jeremy
Nachman, Daniel Pittner, MudhakarSrinivasta,TorstenSteinbach
The research leading to these results has received funding from the European Community’s
Horizon 2020 research and innovation program under grant agreement n°779747.

Feedback
Don’t forget to rate
and review this session
Take a look
at our latest blog
Tags