Gluent New World #02 - SQL-on-Hadoop : A bit of History, Current State-of-the-Art, and Looking towards the Future

rittmanmead 3,471 views 79 slides Apr 19, 2016
Slide 1
Slide 1 of 93
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56
Slide 57
57
Slide 58
58
Slide 59
59
Slide 60
60
Slide 61
61
Slide 62
62
Slide 63
63
Slide 64
64
Slide 65
65
Slide 66
66
Slide 67
67
Slide 68
68
Slide 69
69
Slide 70
70
Slide 71
71
Slide 72
72
Slide 73
73
Slide 74
74
Slide 75
75
Slide 76
76
Slide 77
77
Slide 78
78
Slide 79
79
Slide 80
80
Slide 81
81
Slide 82
82
Slide 83
83
Slide 84
84
Slide 85
85
Slide 86
86
Slide 87
87
Slide 88
88
Slide 89
89
Slide 90
90
Slide 91
91
Slide 92
92
Slide 93
93

About This Presentation

Hadoop and NoSQL platforms initially focused on Java developers and slow but massively-scalable MapReduce jobs as an alternative to high-end but limited-scale analytics RDBMS engines. Apache Hive opened-up Hadoop to non-programmers by adding a SQL query engine and relational-style metadata layered o...


Slide Content

[email protected] www.rittmanmead.com @rittmanmead
Gluent New World #02: 

SQL-on-Hadoop with Mark Rittman
Mark Rittman, CTO, Rittman Mead
April 2016

[email protected] www.rittmanmead.com @rittmanmead 2
•Mark Rittman, Co-Founder of Rittman Mead
‣Oracle ACE Director, specialising in Oracle BI&DW
‣14 Years Experience with Oracle Technology
‣Regular columnist for Oracle Magazine
•Author of two Oracle Press Oracle BI books
‣Oracle Business Intelligence Developers Guide
‣Oracle Exalytics Revealed
‣Writer for Rittman Mead Blog :

http://www.rittmanmead.com/blog
•Email : [email protected]
•Twitter : @markrittman
About the Speaker

[email protected] www.rittmanmead.com @rittmanmead 3
•Why Hadoop? And what are the key Hadoop platform features?
•Introducing SQL-on-Hadoop, and Apache Hive
•How Hive works, and how it’s not just about SELECTing data
•Solving Hive’s ad-hoc query performance problem
•So what’s all this about Apache Drill?
•…. and Oracle Big Data SQL, IBM Big SQL?
•Apache Spark, and Spark SQL
•Security, Hadoop and SQL-on-Hadoop
•Selecting a SQL-on-Hadoop query engine
Agenda

[email protected] www.rittmanmead.com @rittmanmead
•Everyone’s talking about Hadoop and “Big Data”
Hadoop is the Big Hot Topic In IT / Analytics

[email protected] www.rittmanmead.com @rittmanmead
Highly Scalable (and Affordable) Cluster Computing
•Enterprise High-End RDBMSs such as Oracle can scale into the petabytes, using clustering
‣Sharded databases (e.g. Netezza) can scale further but with complexity / single workload trade-offs
•Hadoop was designed from outside for massive horizontal scalability - using cheap hardware
•Anticipates hardware failure and makes multiple copies of data as protection
•More nodes you add, more stable it becomes
•And at a fraction of the cost of traditional

RDBMS platforms

[email protected] www.rittmanmead.com @rittmanmead
•Store and analyze huge volumes of structured and unstructured data
•In the past, we had to throw away the detail
•No need to define a data model during ingest
•Supports multiple, flexible schemas
•Separation of storage from compute engine
•Allows multiple query engines and frameworks

to work on the same raw datasets
6
Store Everything Forever - And Process in Many Ways
Hadoop Data Lake
Webserver

Log Files (txt)
Social Media

Logs (JSON)
DB Archives

(CSV)
Sensor Data

(XML)
`Spatial & Graph

(XML, txt)
IoT Logs

(JSON, txt)
Chat Transcripts

(Txt)
DB Transactions

(CSV, XML)
Blogs, Articles

(TXT, HTML)
Raw Data Processed Data
NoSQL Key-Value

Store DB Tabular Data

(Hive Tables)
Aggregates

(Impala Tables)
NoSQL Document 

Store DB

[email protected] www.rittmanmead.com @rittmanmead 7
•Data for customer 360 system typically landed into a Hadoop & NoSQL-based
•Applies aggregation, joining and machine-learning processes to extract insights
Design Pattern : “Data Lake” or “Data Reservoir”
Data Transfer Data Access
Data Factory
Data Reservoir
Business
Intelligence Tools
Hadoop Platform
File Based
Integration
Stream
Based
Integration
Data streams
Discovery & Development Labs
Safe & secure Discovery and Development
environment
Data sets and
samples
Models and
programs
Marketing /
Sales Applications
Models
Machine
Learning
Segments
Operational Data
Transactions
Customer
Master ata
Unstructured Data
Voice + Chat
Transcripts
ETL Based
Integration
Raw
Customer Data
Data stored in
the original
format (usually
files) such as
SS7, ASN.1,
JSON etc.
Mapped
Customer Data
Data sets
produced by
mapping and
transforming
raw data

[email protected] www.rittmanmead.com @rittmanmead 8
•Combine with a traditional data warehouse to add storage, support for new datatypes
•Land raw data in real-time into Hadoop, then process and store
Combine with Traditional Data Warehouse

[email protected] www.rittmanmead.com @rittmanmead 9
•Hadoop is the overall framework for enabling low-cost, scalable cluster computing
‣HDFS cluster filesystem stores the data, in a process/query neutral form (files)
‣YARN resource manager allocates resources to Hadoop jobs
‣MapReduce and other processing frameworks 

then work on that data
•Data is decoupled from the engine that processes it
•Layers can be swapped out (Mesos for YARN etc)
•Hadoop takes care of the overall cluster framework
Key Hadoop Platform Technologies
Hadoop Distributed Filesystem (HDFS)
YARN Resource Manager
Query and Processing Engines
Batch

(MapReduce)
In-Memory

(Spark)
Streaming 

(Spark, Storm)
Graph + Search

(Solr, Giraph)
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data

sounds good

but I’m a DBA

it’s all files

I don’t know MapReduce

or Scala

or

or whatever the latest

made-up Hadoop language is

[email protected] www.rittmanmead.com @rittmanmead
Introducing SQL-on-Hadoop
•Hadoop is not a cheap substitute for enterprise DW
platforms - don’t use it like this
•But adding SQL processing and abstraction can help
in many scenarios:
•Query access to data stored in Hadoop as an
archive
•Aggregating, sorting, filtering data
•Set-based transformation capabilities for other
frameworks (e.g. Spark)
•Ad-hoc analysis and data discovery in-real time
•Providing tabular abstractions over complex
datatypes
19
Hadoop Distributed Filesystem (HDFS)
YARN Resource Manager
Query and Processing Engines
Batch

(MapReduce)
In-Memory

(Spark)
Streaming 

(Spark, Storm)
Graph + Search

(Solr, Giraph)
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data
SQL

Engine
SQL

Engine

[email protected] www.rittmanmead.com @rittmanmead 20
•Modern SQL-on-Hadoop engines often provide connectivity

to data sources outside of the Hadoop cluster
‣Traditional DW platforms
‣No-SQL databases e.g. MongoDB
‣Files, JDBC etc
•Provide a framework for data integration

and data federation, using JDBC drivers
Enables Integration with External (And Internal) Data
Hadoop Distributed Filesystem (HDFS)
YARN Resource Manager
Query and Processing Engines
In-Memory

(Spark)
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data
SQL

Engine
20
NoSQL Key-Value

Store DB

[email protected] www.rittmanmead.com @rittmanmead 21
•Most Traditional data warehousing vendors offer a Hadoop integration option
•Oracle Big Data SQL
•IBM Big SQL etc
•Leverage lower-level SQL-on-Hadoop

metadata but use own server process
•Allows DBAs to write SQL using RDBMS

SQL dialect, run across relational, Hadoop

and NoSQL servers
Hadoop Distributed Filesystem (HDFS)
YARN Resource Manager
Query and Processing Engines
Oracle

Big Data SQL

Server
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data
21
NoSQL Key-Value

Store DB
Platform for Traditional DW Integration with Hadoop
Oracle

RDBMS

So how do they work?

[email protected] www.rittmanmead.com @rittmanmead
•Original SQL-on-Hadoop engine developed at Facebook, now within the Hadoop project
•Allows users to query Hadoop data using SQL-like language
•Tabular metadata layer that overlays files, can interpret semi-structured data (e.g. JSON)
•Generates MapReduce code to return required data
•Extensible through SerDes and Storage Handlers
•JDBC and ODBC drivers for most platforms/tools
•Perfect for set-based access + batch ETL work
23
Apache Hive : SQL Metadata + Engine over Hadoop
YARN Resource Manager
Hadoop Distributed Filesystem (HDFS)
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data
2323
MapReduce Processing Framework
Apache Hive SQL Processing Engine
HiveQL SQL Commands
Java JARs
Submitted Jobs

[email protected] www.rittmanmead.com @rittmanmead
•Queries come in via JDBC/ODBC, the Hive Thrift Server,

from the CLI or via Hue (for example)
•The Hive Metastore (data dictionary) maps files and

other Hadoop data structures onto tables and columns
•The Hive SQL engine parses, plans and then executes

the query, using an execution plan similar to Oracle,

SQL Server and other RBDMS engines
•MapReduce code is then auto-generated, and submitted

to YARN, and then run on the Hadoop cluster
24
Apache Hive Logical Architecture
Hive Thrift Server
JDBC / ODBC
Parser Planner
Execution Engine
Metastore
HueCLI
MapReduce
HDFS
hive> select count(*) from src_customer;

Total MapReduce jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

set hive.exec.reducers.bytes.per.reducer=

In order to limit the maximum number of reducers:

set hive.exec.reducers.max=

In order to set a constant number of reducers:

set mapred.reduce.tasks=

Starting Job = job_201303171815_0003, Tracking URL = 

http://localhost.localdomain:50030/jobdetails.jsp…

Kill Command = /usr/lib/hadoop-0.20/bin/

hadoop job -Dmapred.job.tracker=localhost.localdomain:8021 

-kill job_201303171815_0003


2013-04-17 04:06:59,867 Stage-1 map = 0%, reduce = 0%

2013-04-17 04:07:03,926 Stage-1 map = 100%, reduce = 0%

2013-04-17 04:07:14,040 Stage-1 map = 100%, reduce = 33%

2013-04-17 04:07:15,049 Stage-1 map = 100%, reduce = 100%

Ended Job = job_201303171815_0003

OK

25

Time taken: 22.21 seconds
HiveQL

Query
MapReduce

Job submitted
Results 

returned

[email protected] www.rittmanmead.com @rittmanmead
•Data integration tools such as Oracle Data Integrator can load and process Hadoop data
•BI tools such as Oracle Business Intelligence 12c can report on Hadoop data
•Generally use MapReduce and Hive to access data
‣ODBC and JDBC access to Hive tabular data
‣Allows Hadoop unstructured/semi-structured

data on HDFS to be accessed like RDBMS
Provides a SQL Interface for BI + ETL Tools
Access direct Hive or extract using ODI12c
for structured OBIEE dashboard analysis
What pages are people visiting?
Who is referring to us on Twitter?
What content has the most reach?

T : +44 (0) 1273 911 268 (UK) or (888) 631-1410 (USA) or 

+61 3 9596 7186 (Australia & New Zealand) or +91 997 256 7970 (India)
E : [email protected]
W : www.rittmanmead.com
Connecting to Hive using Beeline CLI
•From the command-line, either use Hive CLI, or beeline CLI
‣HUE (“Hadoop User Experience”) provides Web interface into Hive (think Oracle Apex)
[iot@cdh-node1 ~]$ beeline -u jdbc:hive2://cdh-node1:10000 -n iot -p welcome1 -d org.apache.hive.jdbc.HiveDriver
Connecting to jdbc:hive2://cdh-node1:10000

Connected to: Apache Hive (version 1.1.0-cdh5.5.1)

Driver: Hive JDBC (version 1.1.0-cdh5.5.1)

Transaction isolation: TRANSACTION_REPEATABLE_READ

Beeline version 1.1.0-cdh5.5.1 by Apache Hive
0: jdbc:hive2://cdh-node1:10000> show tables;

+-----------------------------------+--+

| tab_name |

+-----------------------------------+--+

| flight_delays |

| my_second_table |

| oracle_analytics_tweets |

+-----------------------------------+--+
8 rows selected (0.137 seconds)

0: jdbc:hive2://cdh-node1:10000>
Add SQL*Developer

[email protected] www.rittmanmead.com @rittmanmead
•Hive is extensible in three major ways that help with accessing and integrating new data sets
•SerDes : Serializer-Deserializers that interpret semi-structured sources + make tabular
•UDFs + Hive Streaming : Add user-defined functions and whole-row external processing
•File Formats : make use of compressed and/or optimised file storage
•Storage Handlers : use storage other than HDFS (e.g. MongoDB) as data source
Hive Extensibility - The “Swiss Army Knife” of Hadoop
Client
Client
HDFS Fileformats
JDBC / ODBC
Metastore
MapReduce
UDF/UDAFs
SerDe
Scripts
HBase
MongoDB
Parser
Execution Engine
HiveQL
Planner
Storage Hdlrs
TextFile
Parquet

[email protected] www.rittmanmead.com @rittmanmead
•Extend Hive by adding new computation and aggregation capabilities
•UDFs (row-based), UDAFs (aggregation) and UDTFs (table functions)
Hive Extensibility through UDFs and UDAFs
add jar target/JsonSplit-1.0-SNAPSHOT.jar;
create temporary function json_split 

as 'com.pythian.hive.udf.JsonSplitUDF';
create table json_example (json string);
load data local inpath 'split_example.json' 

into table json_example;
SELECT ex.* FROM json_example 

LATERAL VIEW explode(json_split(json_example.json)) ex;ADD JAR ./ext.jar;
CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF';
SELECT
adTable.name,
adTable.surname
FROM people
lateral view process_names(name) adTable as name, surname;

[email protected] www.rittmanmead.com @rittmanmead
•Allows data to be stored in optimised storage format
‣Column-store for analytics
‣Self-describing, splittable storage

for general-purpose use
‣Compressed data
‣Semi-structured (e.g. log) data
29
SerDes & Storage Handlers Further Decouple Storage
Hadoop Distributed Filesystem (HDFS)
Query and Processing EnginesMapReduce
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data
SQL

Engine
NoSQL Key-Value

Store DB
RegEx SerdeParquet SerDe
JSON SerDe
NoSQL Key-Value

Store DB
MongoDB

Store Handler
MongoDB

Store Handler

[email protected] www.rittmanmead.com @rittmanmead 30
•Splittability - can the file be split into blocks and processed in parallel
‣CSV files can be split by file line; XML files can’t because of opening and closing tags
•Ability to compress - CSV files can’t be block compressed, impact on space / performance
•Support for schema evolution - does the file contain in-built schema information that self-describes
the data?
File Formats in Hadoop Are Important
2016-01-28T09:30:28Z,2016-01-28T11:56:24Z,145.933

2016-01-29T00:19:35Z,2016-01-29T01:36:49Z,77.233

2016-01-29T02:10:35Z,2016-01-29T02:32:18Z,21.717

2016-01-29T03:08:07Z,2016-01-29T03:16:11Z,8.067

2016-01-29T03:51:24Z,2016-01-29T06:57:44Z,186.333

2016-01-29T07:05:50Z,2016-01-29T07:13:21Z,7.517

2016-01-29T07:25:53Z,2016-01-29T07:30:23Z,4.5

2016-01-29T23:30:00Z,2016-01-30T07:00:30Z,450.5

2016-01-31T23:30:00Z,2016-02-01T07:30:00Z,480

2016-02-02T00:35:54Z,2016-02-02T02:10:54Z,95
CSV Extract from Apple Health
•Human readable, splittable
•No ability to block compress
•No in-built self-describing metadata
•Timestamps will need special processing
•Store final data in parquet format to
address some of these concerns
{"entities": {"user_mentions": [], "media": [],
"hashtags": [], "urls": []}, "text": "Off to visit
our office in Bangalore in 15 mins. It'll be good
to meet up with Venkat again, plus his team of Ram
and Jay.", "created_at": "2010-09-01 00:00:00
+0000", "source": "<a href=\"http://twitter.com\"
rel=\"nofollow\">Twitter Web Client</a>", "id_str":
"22684302309", "geo": {}, "id": 22684302309,
"user": {"verified": false, "name": "Mark Rittman",
"profile_image_url_https": "https://pbs.twimg.com/
profile_images/702537100890087425/
rAlqgrGX_normal.jpg", "protected": false, "id_str":
"14716125", "id": 14716125, "screen_name":
"markrittman"}}
JSON Records from Twitter
•Human readable, splittable
•No ability to block compress (+verbose)
•Built self-describing metadata
•Less mature SerDe support

[email protected] www.rittmanmead.com @rittmanmead 31
•Beginners usually store data in HDFS using text file formats (CSV) but these have limitations
•Apache AVRO often used for general-purpose processing
‣Splitability, schema evolution, in-built metadata, support for block compression
•Parquet now commonly used with Impala due to column-orientated storage
‣Mirrors work in RDBMS world around column-store
‣Only return (project) the columns you require across a wide table
Specialised File Formats - Parquet and AVRO

[email protected] www.rittmanmead.com @rittmanmead 32
Example HiveQL Commands to Create + Populate Table
create table health_sleep_analysis_tmp (

asleep_start_ts timestamp,

asleep_end_ts timestamp,

mins_asleep float)

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'

WITH SERDEPROPERTIES (

"separatorChar" = “,",

"quoteChar" = "'",

"escapeChar" = "\"

)

STORED AS TEXTFILE;
create table health_sleep_analysis

stored as parquet

as

select from_unixtime(unix_timestamp(asleep_start, "yyyy-MM-dd'T'hh:mm:ss'Z'")) asleep_start_ts,

from_unixtime(unix_timestamp(asleep_end, "yyyy-MM-dd'T'hh:mm:ss'Z'")) end_start_ts,

mins_asleep

from health_sleep_analysis_tmp;
•Define temporary Hive table to store start and end times/dates as strings,

as we can’t do the string>timestamp conversion using the LOAD DATA
command
•Use the OpenCSVSerde file format so that we can specify delimiters, quote
chars and escape chars for file data
•Store as regular uncompressed human-readable text file
LOAD DATA INPATH '/user/iot/Health/apple_health_sleep_analysis_noheader.csv'
OVERWRITE INTO TABLE health_sleep_analysis_tmp;
•Load the data file into that temporary Hive table
•Now re-load that temporary data into more
optimised Parquet format files, suitable for ad-hoc
analytic querying
•Convert the timestamps currently held in generic
string datatype fields into more optimal TIMESTAMP
datatypes using a Hive UDF

[email protected] www.rittmanmead.com @rittmanmead
•One of several third-party SerDes available to download from Github
Use of Third-Party (Community) Serde - JSONSerde
CREATE EXTERNAL TABLE tweets(
id string,
created_at string,
source string,
favorited boolean,
retweeted_status struct<text:string,
user:struct<screen_name:string,name:string>,
retweet_count:int>,
entities struct<urls:array
<struct<expanded_url:string>>, 

user_mentions:array<struct<screen_name:string,name:string>>,
hashtags:array<struct<text:string>>>,
text string,
user struct<screen_name:string,name:string,friends_count:int,followers_count:int,
statuses_count:int,verified:boolean,utc_offset:int,time_zone:string>,
in_reply_to_screen_name string
)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
STORED AS TEXTFILE
LOCATION '/user/iot/tweets/';
•Note the use of STRUCT and ARRAY datatypes
•Used to handle arrays of hashtags, URLs etc in tweets
Just select the JSON elements that we want from the
overall schema in JSON records
Created as an external Hive table, so overlays schema on
existing directory of files

[email protected] www.rittmanmead.com @rittmanmead 34
•Hive SELECT statement against nested columns returns data as arrays
•Can parse programatically, or create further views or CTAS tables to split out array
Support for Nested (Array)-Type Structures
hive> select entities, user from tweets
> limit 3;
OK
{"urls":[{"expanded_url":"http://www.rittmanmead.com/
biforum2013"}],"user_mentions":[],"hashtags":[]}
{"screen_name":"markrittman","name":"Mark
Rittman","friends_count":null,"followers_count":null,"statuses_count":null,"ver
ified":false,"utc_offset":null,"time_zone":null}
{"urls":[{"expanded_url":"http://www.bbc.co.uk/news/
technology-22299503"}],"user_mentions":[],"hashtags":[]}
{"screen_name":"markrittman","name":"Mark
Rittman","friends_count":null,"followers_count":null,"statuses_count":null,"ver
ified":false,"utc_offset":null,"time_zone":null}
{"urls":[{"expanded_url":"http://pocket.co/seb2e"}],"user_mentions":
[{"screen_name":"ArtOfBI","name":"Christian Screen"},
{"screen_name":"wiseanalytics","name":"Lyndsay Wise"}],"hashtags":[]}
{"screen_name":"markrittman","name":"Mark
Rittman","friends_count":null,"followers_count":null,"statuses_count":null,"ver
ified":false,"utc_offset":null,"time_zone":null}
How to you work with these values?
CREATE TABLE tweets_expanded
stored as parquet
AS select
tweets.id,
tweets.created_at,
tweets.user.screen_name as user_screen_name,
tweets.user.friends_count as user_friends_count,
tweets.user.followers_count as user_followers_count,
tweets.user.statuses_count as user_tweets_count,
tweets.text,
tweets.in_reply_to_screen_name,
tweets.retweeted_status.user.screen_name as retweet_user_screen_name,
tweets.retweeted_status.retweet_count as retweet_count,
tweets.entities.urls[0].expanded_url as url1,
tweets.entities.urls[1].expanded_url as url2,
tweets.entities.hashtags[0].text as hashtag1,
tweets.entities.hashtags[1].text as hashtag2,
tweets.entities.hashtags[2].text as hashtag3,
tweets.entities.hashtags[3].text as hashtag4
from tweets;
Create a copy of the table in Parquet storage format
“Denormalize” the array by selecting individual elements
CREATE view tweets_expanded_view
AS select
tweets.id,
tweets.created_at,
tweets.user.screen_name as user_screen_name,
tweets.user.friends_count as user_friends_count,
tweets.user.followers_count as user_followers_count,
tweets.user.statuses_count as user_tweets_count,
tweets.text,
tweets.in_reply_to_screen_name,
tweets.retweeted_status.user.screen_name as retweet_user_screen_name,
tweets.retweeted_status.retweet_count as retweet_count,
tweets.entities.urls[0].expanded_url as url1,
tweets.entities.urls[1].expanded_url as url2,
tweets.entities.hashtags[0].text as hashtag1,
tweets.entities.hashtags[1].text as hashtag2,
tweets.entities.hashtags[2].text as hashtag3,
tweets.entities.hashtags[3].text as hashtag4
from tweets;
… or create as a view (not all BI tools support views though)

[email protected] www.rittmanmead.com @rittmanmead
•Use HiveQL to create aggregations, select individual columns (JSON elements) from data
•Use WHERE clause to limit data returned & ORDER BY to sort - as per normal SQL
35
Calculating Aggregations, Filtering Tweet Data
select text, hashtag1, hashtag2 from tweets_expanded 

where hashtag1 = ‘obiee’;
Column selection only = just MAP task
select in_reply_to_screen_name, count(*) as total_replies_to
from tweets_expanded
group by in_reply_to_screen_name
order by total_replies_to desc
limit 10;
Selection and aggregation = MAP() and REDUCE task

[email protected] www.rittmanmead.com @rittmanmead
•Hive MR jobs can have multiple stages
•MapReduce Stages, Metastore operations
•File Move / Rename etc
36
Multi-Stage MapReduce Jobs
SELECT
LOWER(hashtags.text),
COUNT(*) AS total_count
FROM (
SELECT * FROM tweets WHERE regexp_extract(created_at,"(2015)*",1) = "2015"
) tweets
LATERAL VIEW EXPLODE(entities.hashtags) t1 AS hashtags
GROUP BY LOWER(hashtags.text)
ORDER BY total_count DESC
LIMIT 15
1
2

[email protected] www.rittmanmead.com @rittmanmead
Multi-Step HiveQL Transforms - Tweet Sentiment
create external table load_tweets(id string,text STRING) 

ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe' 

LOCATION '/user/iot/tweets';
create table split_words as 

select id as id,split(text,' ') as words 

from load_tweets;
create table tweet_word as 

select id as id,word 

from split_words 

LATERAL VIEW explode(words) w as word;
create table dictionary

(word string,rating int) 

ROW FORMAT DELIMITED 

FIELDS TERMINATED BY ‘ ';
create table word_join as 

select tweet_word.id,tweet_word.word,dictionary.rating 

from tweet_word 

LEFT OUTER JOIN dictionary 

ON(tweet_word.word =dictionary.word);
select t.text, r.rating from tweets_expanded t
join (select id,AVG(rating) as rating 

from word_join 

GROUP BY word_join.id) r on t.id = r.id
order by r.rating;
LOAD DATA INPATH 'afinn.txt' 

into TABLE dictionary;
1
2
3
4
5
6
7
Take all the text within a set of tweets, and explode-out
all the words into a table, one row per word
Load in a dictionary file that we’ll use to determine the
sentiment of words in these tweets
Join the words and the dictionary sentiment scores
together, so every word used with any of the tweets has
a sentiment score we can use
Now average-out the sentiment scores for each word
within a tweet, and return the tweet text and those
averages listed in descending sentiment order

[email protected] www.rittmanmead.com @rittmanmead 38
•Not all join types are available in Hive - joins must be equality joins
•No sequences, no primary keys on tables
•Generally need to stage Oracle or other external data into Hive before joining to it
•Hive latency - not good for small microbatch-type work
‣But other alternatives exist - Spark, Impala etc
•Don’t assume that HiveQL == Oracle SQL
‣Test assumptions before committing to platform
•Hive is INSERT / APPEND only - no updates, deletes etc
‣But HBase may be suitable for CRUD-type loading
SQL Considerations : Using Hive vs. Regular Oracle SQL
vs.

[email protected] www.rittmanmead.com @rittmanmead 39
•Based on BigTable paper from Google, 2006, Dean et al.
‣“Bigtable is a sparse, distributed, persistent multi-dimensional sorted map.”Key Features:
‣Distributed storage across cluster of machines – Random, online read and write data access
‣Schemaless data model (“NoSQL”)
‣Self-managed data partitions
•Why would you use it with Hive?
‣Allows you to do update and delete

activity rather than just Hive append-only
‣Very fast for incremental loading
‣Can define Hive tables over HBase ones,

allowing OBIEE to then access them
What is HBase?

[email protected] www.rittmanmead.com @rittmanmead 40
•HBase Shell CLI allows you to create HBase tables
•GET and PUT commands can then be used to add/update cells, query cells etc
Creating HBase Tables using HBase Shell
hbase shell

create 'carriers','details'
create 'geog_origin','origin'
create 'geog_dest','dest'
create 'flight_delays','dims','measures'
put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles'
put 'geog_dest','LAX','dest:city','Los Angeles, CA'
put 'geog_dest','LAX','dest:state','California'
put 'geog_dest','LAX','dest:id','12892'
hbase(main):015:0> scan 'geog_dest'
ROW                                    COLUMN+CELL

LAX                                   column=dest:airport_name, timestamp=1432067861347, value=Los Angeles, CA: Los Angeles

LAX                                   column=dest:city, timestamp=1432067861375, value=Los Angeles,CA

LAX                                   column=dest:id, timestamp=1432067862018,value=12892

LAX                                   column=dest:state, timestamp=1432067861404,value=California

1 row(s) in 0.0240 seconds

[email protected] www.rittmanmead.com @rittmanmead 41
•Direct extract from salesforce.com into HBase 

using Python and add-in packages
‣Python packages extend functionality 

by adding APIs, integration etc
‣Happybase, Beatbox and Pyhs2 packages 

installed along with Python
•All free and open-source
Programmatically Loading HBase Tables using Python
import pyhs2
import happybase

connection = happybase.Connection('bigdatalite')
flight_delays_hbase_table = connection.table('test1_flight_delays')
b = flight_delays_hbase_table.batch(batch_size=10000)

with pyhs2.connect(host='bigdatalite',
               port=10000,
               authMechanism="PLAIN",
               user='oracle',
               password='welcome1',
               database='default') as conn:
    with conn.cursor() as cur:
        #Execute query
        cur.execute("select * from flight_delays_initial_load")

        #Fetch table results
        for i in cur.fetch():
            b.put(str(i[0]),{'dims:year': i[1],
                             'dims:carrier': i[2],
                             'dims:orig': i[3],
                             'dims:dest': i[4],
                             'measures:flights': i[5],
                             'measures:late': i[6],
                             'measures:cancelled': i[7],
                             'measures:distance': i[8]})
b.send()

[email protected] www.rittmanmead.com @rittmanmead 42
•Create Hive tables over the HBase ones to provide SQL load/query capabilities
‣Uses HBaseStorageHandler Storage Handler for HBAse
‣HBase columns mapped to Hive columns using SERDEPROPERTIES
Create Hive Table Metadata over HBase Tables
CREATE EXTERNAL TABLE hbase_flight_delays
(key string,
  year string,
  carrier string,
  orig string,
  dest string,
  flights string,
  late   string,
  cancelled string,
  distance string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES
("hbase.columns.mapping" = ":key,dims:year,dims:carrier,dims:orig,dims:dest,

measures:flights,measures:late,measures:cancelled,measures:distance")
TBLPROPERTIES ("hbase.table.name" = "test1_flight_delays");

[email protected] www.rittmanmead.com @rittmanmead 43
•Use HiveQL commands INSERT INTO TABLE … SELECT to load (merge) new data
•Use HiveQL SELECT query to retrieve data from HBase table
Load and Query HBase using HiveQL
insert into table hbase_flight_delays              
select * from flight_delays_initial_load;      
        
Total jobs = 1
...
Total MapReduce CPU Time Spent: 11 seconds 870 msec
OK
Time taken: 40.301 seconds
select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key
from hbase_flight_delays;

Total jobs = 1
...
Total MapReduce CPU Time Spent: 14 seconds 660 msec
OK
200000  1  200000
Time taken: 53.076 seconds, Fetched: 1 row(s)

[email protected] www.rittmanmead.com @rittmanmead 44
•But Parquet (and HDFS) have significant limitation for real-time analytics applications
‣Append-only orientation, focus on column-store 

makes streaming ingestion harder
•Cloudera Kudu aims to combine 

best of HDFS + HBase
‣Real-time analytics-optimised
‣Supports updates to data
‣Fast ingestion of data
‣Accessed using SQL-style tables

and get/put/update/delete API
Cloudera Kudu - Combining Best of HBase and Column-Store

[email protected] www.rittmanmead.com @rittmanmead 45
•Kudu storage used with Impala - create tables using Kudu storage handler
•Can now UPDATE, DELETE and INSERT into Hadoop tables, not just SELECT and LOAD DATA
Example Impala DDL + DML Commands with Kudu
CREATE TABLE `my_first_table` (
`id` BIGINT,
`name` STRING
)
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'my_first_table',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'id'
);
INSERT INTO my_first_table VALUES (99, "sarah");
INSERT IGNORE INTO my_first_table VALUES (99, "sarah");
UPDATE my_first_table SET name="bob" where id = 3;
DELETE FROM my_first_table WHERE id < 3;
DELETE c FROM my_second_table c, stock_symbols s WHERE c.name = s.symbol;

only one problem…

Hive is slow

too slow

for ad-hoc querying

[email protected] www.rittmanmead.com @rittmanmead 50
•MapReduce’s great innovation was to break processing down into distributed jobs
•Jobs that have no functional dependency on each other, only upstream tasks
•Provides a framework that is infinitely scalable and very fault tolerant
•Hadoop handled job scheduling and resource management
‣All MapReduce code had to do was provide the “map” and “reduce” functions
‣Automatic distributed processing
‣Slow but extremely powerful
Hadoop 1.0 and MapReduce

[email protected] www.rittmanmead.com @rittmanmead 51
•A typical Hive or Pig script compiles down into multiple MapReduce jobs
•Each job stages its intermediate results to disk
•Safe, but slow - write to disk, spin-up separate JVMs for each job
MapReduce - Scales By Writing Intermediate Results to Disk
SELECT
LOWER(hashtags.text),
COUNT(*) AS total_count
FROM (
SELECT * FROM tweets WHERE regexp_extract(created_at,"(2015)*",1) = "2015"
) tweets
LATERAL VIEW EXPLODE(entities.hashtags) t1 AS hashtags
GROUP BY LOWER(hashtags.text)
ORDER BY total_count DESC
LIMIT 15
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 5.34 sec HDFS Read: 10952994 HDFS Write: 5239 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 2.1 sec HDFS Read: 9983 HDFS Write: 164 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 440 msec
OK
1
2

[email protected] www.rittmanmead.com @rittmanmead 52
•MapReduce 2 (MR2) splits the functionality of the JobTracker

by separating resource management and job scheduling/monitoring
•Introduces YARN (Yet Another Resource Manager)
•Permits other processing frameworks to MR
‣For example, Apache Spark
•Maintains backwards compatibility with MR1
•Introduced with CDH5+
MapReduce 2 and YARN
Node

Manager
Node

Manager
Node

Manager
Resource

Manager
Client
Client

[email protected] www.rittmanmead.com @rittmanmead 53
•Runs on top of YARN, provides a faster execution engine than MapReduce for Hive, Pig etc
•Models processing as an entire data flow graph (DAG), rather than separate job steps
‣DAG (Directed Acyclic Graph) is a new programming style for distributed systems
‣Dataflow steps pass data between them as streams, rather than writing/reading from disk
•Supports in-memory computation, enables Hive on Tez (Stinger) and Pig on Tez
•Favoured In-memory / Hive v2 

route by Hortonworks
Apache Tez
Input Data
TEZ DAG
Map()
Map()
Map()
Reduce()
Output Data
Reduce()
Reduce()
Reduce()
Input Data
Map()
Map()
Reduce()
Reduce()

[email protected] www.rittmanmead.com @rittmanmead 54
Tez Advantage - Drop-In Replacement for MR with Hive, Pig
set hive.execution.engine=mr
set hive.execution.engine=tez
4m 17s
2m 25s

[email protected] www.rittmanmead.com @rittmanmead 56
•Cloudera’s answer to Hive query response time issues
•MPP SQL query engine running on Hadoop, bypasses MapReduce for
direct data access
•Mostly in-memory, but spills to disk if required
•Uses Hive metastore to access Hive table metadata
•Similar SQL dialect to Hive - not as rich though and no support for Hive
SerDes, storage handlers etc
Cloudera Impala - Fast, MPP-style Access to Hadoop Data

[email protected] www.rittmanmead.com @rittmanmead 57
How Impala Works
Impala

Daemon
HDFS

DataNode
SQL

App
ODBC /

JDBC
HDFS

DataNode
HDFS

DataNode
HDFS

DataNode
Impala

Daemon
Impala

Daemon
Impala

Daemon
Hive

MetaStore
Impala

StateStore
•Cloudera-based solution for ad-hoc SQL-on-Hadoop
•MPP SQL query engine running on Hadoop, with
daemons running on each Hadoop node
•In contrast to jobs being submitted via YARN
•Mostly in-memory, but spills to disk if required
•Uses Hive metastore to access Hive table metadata
•Similar SQL dialect to Hive - not as rich though and
no support for Hive SerDes, storage handlers etc

[email protected] www.rittmanmead.com @rittmanmead 58
•Log into Impala Shell, run INVALIDATE METADATA command to refresh Impala table list
•Run SHOW TABLES Impala SQL command to view tables available
•Run COUNT(*) on main ACCESS_PER_POST table to see typical response time
Enabling Hive Tables for Impala
[oracle@bigdatalite ~]$ impala-shell
Starting Impala Shell without Kerberos authentication
[bigdatalite.localdomain:21000] > invalidate metadata;
Query: invalidate metadata
Fetched 0 row(s) in 2.18s
[bigdatalite.localdomain:21000] > show tables;
Query: show tables
+-----------------------------------+
| name |
+-----------------------------------+
| access_per_post |
| access_per_post_cat_author |
| … |
| posts |
|——————————————————————————————————- +
Fetched 45 row(s) in 0.15s
[bigdatalite.localdomain:21000] > select count(*) 

from access_per_post;
Query: select count(*) from access_per_post
+----------+
| count(*) |
+----------+
| 343 |
+----------+
Fetched 1 row(s) in 2.76s

[email protected] www.rittmanmead.com @rittmanmead 59
•Significant improvement over Hive response time
•Now makes Hadoop suitable for ad-hoc querying
Significantly-Improved Ad-Hoc Query Response Time vs Hive
|
Logical Query Summary Stats: Elapsed time 2, Response time 1, Compilation time 0 (seconds)
Logical Query Summary Stats: Elapsed time 50, Response time 49, Compilation time 0 (seconds)
Simple Two-Table Join against Hive Data Only
Simple Two-Table Join against Impala Data Only
vs

What about integration

with my Data Warehouse?

[email protected] www.rittmanmead.com @rittmanmead 61
•Most Traditional data warehousing vendors offer a Hadoop integration option
•Oracle Big Data SQL
•IBM Big SQL etc
•Leverage lower-level SQL-on-Hadoop

metadata but use own server process
•Allows DBAs to write SQL using RDBMS

SQL dialect, run across relational, Hadoop

and NoSQL servers
Hadoop Distributed Filesystem (HDFS)
YARN Resource Manager
Query and Processing Engines
Oracle

Big Data SQL

Server
Unstructured /

Semi-Structured

Log Data
Offloaded

Archive

Data
Social Graphs

& Networks
Smart Meter

& Sensor Data
61
NoSQL Key-Value

Store DB
Platform for Traditional DW Integration with Hadoop
Oracle

RDBMS

[email protected] www.rittmanmead.com @rittmanmead 62
•Originally Part of Oracle Big Data 4.0 (BDA-only) but now available for commodity Hadoop installs
‣Also requires Oracle Database 12c (no longer dependent on Exadata from Big Data SQL 4.0)
‣Extends Oracle Data Dictionary to cover Hive
•Extends Oracle SQL and SmartScan to Hadoop
•Extends Oracle Security Model over Hadoop
‣Fine-grained access control
‣Data redaction, data masking
‣Uses fast c-based readers where possible

(vs. Hive MapReduce generation)
Oracle Big Data SQL
Exadata

Storage Servers
Hadoop

Cluster
Exadata Database

Server
Oracle Big

Data SQL
SQL Queries
SmartScan SmartScan

[email protected] www.rittmanmead.com @rittmanmead 63
•Oracle Database 12c 12.1.0.2.0 with Big Data SQL option can view Hive table metadata
‣Linked by Exadata configuration steps to one or more BDA clusters
•DBA_HIVE_TABLES and USER_HIVE_TABLES exposes Hive metadata
•Oracle SQL*Developer 4.0.3, with Cloudera Hive drivers, can connect to Hive metastore
View Hive Table Metadata in the Oracle Data Dictionary
SQL> col database_name for a30
SQL> col table_name for a30
SQL> select database_name, table_name
2 from dba_hive_tables;
DATABASE_NAME TABLE_NAME
------------------------------ ------------------------------
default access_per_post
default access_per_post_categories
default access_per_post_full
default apachelog
default categories
default countries
default cust
default hive_raw_apache_access_log

[email protected] www.rittmanmead.com @rittmanmead 64
•Big Data SQL accesses Hive tables through external table mechanism
‣ORACLE_HIVE external table type imports Hive metastore metadata
‣ORACLE_HDFS requires metadata to be specified
•Access parameters cluster and tablename specify Hive table source and BDA cluster
Hive Access through Oracle External Tables + Hive Driver
CREATE TABLE access_per_post_categories(
hostname varchar2(100),
request_date varchar2(100),
post_id varchar2(10),
title varchar2(200),
author varchar2(100),
category varchar2(100),
ip_integer number)
organization external
(type oracle_hive
default directory default_dir
access parameters(com.oracle.bigdata.tablename=default.access_per_post_categories));

[email protected] www.rittmanmead.com @rittmanmead 65
•Brings query-offloading features similar to Exadata

to Oracle Big Data Appliance
•Query across both Oracle and Hadoop sources
•Intelligent query optimisation applies SmartScan

close to ALL data
•Use same SQL dialect across both sources
•Apply same security rules, policies, 

user access rights across both sources
Extending SmartScan, and Oracle SQL, Across All Data

hold on…

“where we’re going

we don’t need roads”

“where we’re going

we don’t need roads”

[email protected] www.rittmanmead.com @rittmanmead
•Apache Drill is another SQL-on-Hadoop project that focus on schema-free data discovery
•Inspired by Google Dremel, innovation is querying raw data with schema optional
•Automatically infers and detects schema from semi-structured datasets and NoSQL DBs
•Join across different silos of data e.g. JSON records, Hive tables and HBase database
•Aimed at different use-cases than Hive - 

low-latency queries, discovery 

(think Endeca vs OBIEE)
Introducing Apache Drill - “We Don’t Need No Roads”

[email protected] www.rittmanmead.com @rittmanmead
•Most modern datasource formats embed their schema in the data (“schema-on-read”)
•Apache Drill makes these as easy to join to traditional datasets as “point me at the data”
•Cuts out unnecessary work in defining Hive schemas for data that’s self-describing
•Supports joining across files,

databases, NoSQL etc
Self-Describing Data - Parquet, AVRO, JSON etc

[email protected] www.rittmanmead.com @rittmanmead
•Files can exist either on the local filesystem, or on HDFS
•Connection to directory or file defined in storage configuration
•Can work with CSV, TXT, TSV etc
•First row of file can provide schema (column names)
Apache Drill and Text Files
SELECT * FROM dfs.`/tmp/csv_with_header.csv2`;
+-------+------+------+------+
| name | num1 | num2 | num3 |
+-------+------+------+------+
| hello | 1 | 2 | 3 |
| hello | 1 | 2 | 3 |
| hello | 1 | 2 | 3 |
| hello | 1 | 2 | 3 |
| hello | 1 | 2 | 3 |
| hello | 1 | 2 | 3 |
| hello | 1 | 2 | 3 |
+-------+------+------+------+
7 rows selected (0.12 seconds)
SELECT * FROM dfs.`/tmp/csv_no_header.csv`;
+------------------------+
| columns |
+------------------------+
| ["hello","1","2","3"] |
| ["hello","1","2","3"] |
| ["hello","1","2","3"] |
| ["hello","1","2","3"] |
| ["hello","1","2","3"] |
| ["hello","1","2","3"] |
| ["hello","1","2","3"] |
+------------------------+
7 rows selected (0.112 seconds)

[email protected] www.rittmanmead.com @rittmanmead
•JSON (Javascript Object Notation) documents are
often used for data interchange
•Exports from Twitter and other consumer services
•Web service responses and other B2B interfaces
•A more lightweight form of XML that is “self-
describing”
•Handles evolving schemas, and optional attributes
•Drill treats each document as a row, and has features
to
•Flatten nested data (extract elements from arrays)
•Generate key/value pairs for loosely structured data
Apache Drill and JSON Documents
use dfs.iot;
show files;
select in_reply_to_user_id, text from `all_tweets.json`
limit 5;
+---------------------+------+
| in_reply_to_user_id | text |
+---------------------+------+
| null | BI Forum 2013 in Brighton has now sold-out |
| null | "Football has become a numbers game |
| null | Just bought Lyndsay Wise’s Book |
| null | An Oracle BI "Blast from the Past" |
| 14716125 | Dilbert on Agile Programming |
+---------------------+------+
5 rows selected (0.229 seconds)
select name, flatten(fillings) as f 

from dfs.users.`/donuts.json` 

where f.cal < 300;

[email protected] www.rittmanmead.com @rittmanmead
•Drill can connect to Hive to make use of metastore (incl. multiple Hive metastores)
•NoSQL databases (HBase etc)
•Parquet files (native storage format - columnar + self describing)
Apache Drill and Hive, HBase, Parquet Sources etc
USE hbase;
SELECT * FROM students;
+-------------+-----------------------+-----------------------------------------------------+
| row_key | account | address |
+-------------+-----------------------+------------------------------------------------------+
| [B@e6d9eb7 | {"name":"QWxpY2U="} | {"state":"Q0E=","street":"MTIzIEJhbGxtZXIgQXY="} |
| [B@2823a2b4 | {"name":"Qm9i"} | {"state":"Q0E=","street":"MSBJbmZpbml0ZSBMb29w"} |
| [B@3b8eec02 | {"name":"RnJhbms="} | {"state":"Q0E=","street":"NDM1IFdhbGtlciBDdA=="} |
| [B@242895da | {"name":"TWFyeQ=="} | {"state":"Q0E=","street":"NTYgU291dGhlcm4gUGt3eQ=="} |
+-------------+-----------------------+----------------------------------------------------------------------+
SELECT firstname,lastname FROM 

hiveremote.`customers` limit 10;`

+------------+------------+
| firstname | lastname |
+------------+------------+
| Essie | Vaill |
| Cruz | Roudabush |
| Billie | Tinnes |
| Zackary | Mockus |
| Rosemarie | Fifield |
| Bernard | Laboy |
| Marianne | Earman |
+------------+------------+
SELECT * FROM dfs.`iot_demo/geodata/region.parquet`;
+--------------+--------------+-----------------------+
| R_REGIONKEY | R_NAME | R_COMMENT |
+--------------+--------------+-----------------------+
| 0 | AFRICA | lar deposits. blithe |
| 1 | AMERICA | hs use ironic, even |
| 2 | ASIA | ges. thinly even pin |
| 3 | EUROPE | ly final courts cajo |
| 4 | MIDDLE EAST | uickly special accou |
+--------------+--------------+-----------------------+

[email protected] www.rittmanmead.com @rittmanmead
•Drill developed for real-time, ad-hoc data exploration with schema discovery on-the-fly
•Individual analysts exploring new datasets, leveraging corporate metadata/data to help
•Hive is more about large-scale, centrally curated set-based big data access
•Drill models conceptually as JSON, vs. Hive’s tabular approach
•Drill introspects schema from whatever it connects to, vs. formal modeling in Hive
Apache Drill vs. Apache Hive
Interactive Queries

(Data Discovery, Tableau/VA)
Reporting Queries

(Canned Reports, OBIEE)
ETL

(ODI, Scripting, Informatica)
Apache Drill Apache Hive
Interactive Queries
100ms - 3mins
Reporting Queries
3mins - 20mins
ETL & Batch Queries
20mins - hours

but…

what’s all this about “Spark”?

[email protected] www.rittmanmead.com @rittmanmead 78
•Another DAG execution engine running on YARN
•More mature than TEZ, with richer API and more vendor support
•Uses concept of an RDD (Resilient Distributed Dataset)
‣RDDs like tables or Pig relations, but can be cached in-memory
‣Great for in-memory transformations, or iterative/cyclic processes
•Spark jobs comprise of a DAG of tasks operating on RDDs
•Access through Scala, Python or Java APIs
•Related projects include
‣Spark SQL
‣Spark Streaming
Apache Spark

[email protected] www.rittmanmead.com @rittmanmead 79
•Native support for multiple languages 

with identical APIs
‣Python - prototyping, data wrangling
‣Scala - functional programming features
‣Java - lower-level, application integration
•Use of closures, iterations, and other 

common language constructs to minimize code
•Integrated support for distributed +

functional programming
•Unified API for batch and streaming
Rich Developer Support + Wide Developer Ecosystem
scala> val logfile = sc.textFile("logs/access_log")
14/05/12 21:18:59 INFO MemoryStore: ensureFreeSpace(77353) 

called with curMem=234759, maxMem=309225062
14/05/12 21:18:59 INFO MemoryStore: Block broadcast_2 

stored as values to memory (estimated size 75.5 KB, free 294.6 MB)
logfile: org.apache.spark.rdd.RDD[String] = 

MappedRDD[31] at textFile at <console>:15

scala> logfile.count()
14/05/12 21:19:06 INFO FileInputFormat: Total input paths to process : 1
14/05/12 21:19:06 INFO SparkContext: Starting job: count at <console>:1
...
14/05/12 21:19:06 INFO SparkContext: Job finished: 

count at <console>:18, took 0.192536694 s
res7: Long = 154563
scala> val logfile = sc.textFile("logs/access_log").cache
scala> val biapps11g = logfile.filter(line => line.contains("/biapps11g/"))
biapps11g: org.apache.spark.rdd.RDD[String] = FilteredRDD[34] at filter at <console>:17
scala> biapps11g.count()
...
14/05/12 21:28:28 INFO SparkContext: Job finished: count at <console>:20, took 0.387960876 s
res9: Long = 403

[email protected] www.rittmanmead.com @rittmanmead 80
•Spark SQL, and Data Frames, allow RDDs in Spark to be processed using SQL queries
•Bring in and federate additional data from JDBC sources
•Load, read and save data in Hive, Parquet and other structured tabular formats
Spark SQL - Adding SQL Processing to Apache Spark
val accessLogsFilteredDF = accessLogs
.filter( r => ! r.agent.matches(".*(spider|robot|bot|slurp).*"))
.filter( r => ! r.endpoint.matches(".*(wp-content|wp-admin).*")).toDF()
.registerTempTable("accessLogsFiltered")

val topTenPostsLast24Hour = sqlContext.sql("SELECT p.POST_TITLE, p.POST_AUTHOR, COUNT(*) 

as total 

FROM accessLogsFiltered a 

JOIN posts p ON a.endpoint = p.POST_SLUG 

GROUP BY p.POST_TITLE, p.POST_AUTHOR 

ORDER BY total DESC LIMIT 10 ")

// Persist top ten table for this window to HDFS as parquet file

topTenPostsLast24Hour.save("/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet" 

, "parquet", SaveMode.Overwrite)

Hadoop is insecure
and has fragmented security
…doesn’t it?
but …

[email protected] www.rittmanmead.com @rittmanmead 82
Consistent Security and Audit Now Emerging on Platform

[email protected] www.rittmanmead.com @rittmanmead 83
•Clusters by default are unsecured (vunerable to account spoofing) & need Kerberos enabled
•Data access controlled by POSIX-style permissions on HDFS files
•Hive and Impala can Apache Sentry RBAC
‣Result is data duplication and complexity
‣No consistent API or abstracted security model
Hadoop Security Initially Was a Mess
/user/mrittman/scratchpad
/user/ryeardley/scratchpad
/user/mpatel/scratchpad
/user/mrittman/scratchpad
/user/mrittman/scratchpad
/data/rm_website_analysis/logfiles/incoming
/data/rm_website_analysis/logfiles/archive
/data/rm_website_analysis/tweets/incoming
/data/rm_website_analysis/tweets/archive

[email protected] www.rittmanmead.com @rittmanmead 84
•Use standard Oracle Security over Hadoop & NoSQL
‣Grant & Revoke Privileges
‣Redact Data
‣Apply Virtual Private Database
‣Provides Fine-grain Access Control
•Great solution to extend existing Oracle

security model over Hadoop datasets
Oracle Big Data SQL : Extend Oracle Security to Hadoop
Redacted
data
subset
SQL
JSON
Customer data
in Oracle DB
DBMS_REDACT.ADD_POLICY(
object_schema => 'txadp_hive_01',
object_name => 'customer_address_ext',
column_name => 'ca_street_name',
policy_name => 'customer_address_redaction',
function_type => DBMS_REDACT.RANDOM,
expression => 'SYS_CONTEXT(''SYS_SESSION_ROLES'', 

''REDACTION_TESTER'')=''TRUE'''
);

[email protected] www.rittmanmead.com @rittmanmead 85
•Provides a higher level, logical abstraction for data (ie Tables or Views)
‣Can be used with Spark & Spark SQL, with Predicate pushdown, projection
•Returns schemed objects (instead of paths and bytes) in similar way to HCatalog
•Unified data access path allows platform-wide performance improvements
•Secure service that does not execute arbitrary user code
‣Central location for all authorization checks using Sentry metadata.
Cloudera RecordService

[email protected] www.rittmanmead.com @rittmanmead 87
Choosing a SQL-on-Hadoop Engine
The original SQL-on-Hadoop engine
Maximum compatibility with Hadoop
… but designed for batch processing
Plug-in replacement for MapReduce

Works via YARN and submitting jobs
Speeds-up Hive but long-term future?
Daemon-based MPP engines
Impala is more mature
Drill innovates around data-discovery
Adds SQL access and set-based
processing to Spark
Useful for query federation
Vendor-provided RBDMS-Hadoop
integration bridges

You don’t need to learn

Java, or MapReduce,

or Scala

or Toupee

SQL-on-Hadoop isn’t

just Hive

Check-out these Developer VMs:
http://www.cloudera.com/documentation/enterprise/5-3-x/topics/
cloudera_quickstart_vm.html 


http://hortonworks.com/products/sandbox/
http://www.oracle.com/technetwork/database/bigdata-appliance/oracle-
bigdatalite-2104726.html
https://www.mapr.com/products/mapr-sandbox-hadoop/download-sandbox-drill

http://www.rittmanmead.com

[email protected] www.rittmanmead.com @rittmanmead
Gluent New World #02: 

SQL-on-Hadoop with Mark Rittman
Mark Rittman, CTO, Rittman Mead
April 2016