Cluster computing frameworks such as Hadoop or Spark are tremendously beneficial in processing and deriving insights from data. However, long query latencies make these frameworks sub-optimal choices to power interactive applications. Organizations frequently rely on dedicated query layers, such as ...
Cluster computing frameworks such as Hadoop or Spark are tremendously beneficial in processing and deriving insights from data. However, long query latencies make these frameworks sub-optimal choices to power interactive applications. Organizations frequently rely on dedicated query layers, such as relational databases and key/value stores, for faster query latencies, but these technologies suffer many drawbacks for analytic use cases. In this session, we discuss using Druid for analytics and why the architecture is well suited to power analytic applications.
User-facing applications are replacing traditional reporting interfaces as the preferred means for organizations to derive value from their datasets. In order to provide an interactive user experience, user interactions with analytic applications must complete in an order of milliseconds. To meet these needs, organizations often struggle with selecting a proper serving layer. Many serving layers are selected because of their general popularity without understanding the possible architecture limitations.
Druid is an analytics data store designed for analytic (OLAP) queries on event data. It draws inspiration from Google’s Dremel, Google’s PowerDrill, and search infrastructure. Many enterprises are switching to Druid for analytics, and we will cover why the technology is a good fit for its intended use cases.
An Introduction to Druid Nishant Bangarwa Software Developer
2 Agenda History and Motivation Introduction Data Storage Format Druid Architecture – Indexing and Querying Data Druid In Production Recent Improvements
HISTORY Druid Open sourced in late 2012 Initial Use case Power ad-tech analytics product Requirements Query a ny combination of metrics and dimensions Scalability : trillions of events/day Real-time : data freshness Streaming Ingestion Interactive : low latency queries
4 How Big is the initial use case ?
5 MOTIVATION Business Intelligence Queries Arbitrary slicing and dicing of data Interactive real time visualizations on Complex data streams Answer BI questions How many unique male visitors visited my website last month ? How many products were sold last quarter broken down by a demographic and product category ? Not interested in dumping entire dataset
Introduction
7 What is Druid ? Column-oriented distributed datastore Sub-Second query times Realtime streaming ingestion Arbitrary slicing and dicing of data Automatic Data Summarization Approximate algorithms ( hyperLogLog , theta) Scalable to petabytes of data Highly available
Realtime Nodes Historical Nodes 11 Druid Architecture Batch Data Event Historical Nodes Broker Nodes Realtime Index Tasks Streaming Data Historical Nodes Handoff
12 Druid Architecture Batch Data Queries Metadata Store Coordinator Nodes Zookeeper Historical Nodes Broker Nodes Realtime Index Tasks Streaming Data Handoff
Storage Format
Druid: Segments Data in Druid is stored in Segment Files. Partitioned by time Ideally, segment files are each smaller than 1GB. If files are large, smaller time partitions are needed. Time Segment 1: Monday Segment 2: Tuesday Segment 3: Wednesday Segment 4: Thursday Segment 5_2: Friday Segment 5_1: Friday
15 Example Wikipedia Edit Dataset timestamp page language city country … added deleted 2011-01-01T00:01:35Z Justin Bieber en SF USA 10 65 2011-01-01T00:03:63Z Justin Bieber en SF USA 15 62 2011-01-01T00:04:51Z Justin Bieber en SF USA 32 45 2011-01-01T00:05:35Z Ke$ha en Calgary CA 17 87 2011-01-01T00:06:41Z Ke$ha en Calgary CA 43 99 2011-01-02T00:08:35Z Selena Gomes en Calgary CA 12 53 Timestamp Dimensions Metrics
16 Data Rollup timestamp page language city country … added deleted 2011-01-01T00:01:35Z Justin Bieber en SF USA 10 65 2011-01-01T00:03:63Z Justin Bieber en SF USA 15 62 2011-01-01T00:04:51Z Justin Bieber en SF USA 32 45 2011-01-01T00:05:35Z Ke$ha en Calgary CA 17 87 2011-01-01T00:06:41Z Ke$ha en Calgary CA 43 99 2011-01-02T00:08:35Z Selena Gomes en Calgary CA 12 53 timestamp page language city country count sum_added sum_deleted min_added max_added …. 2011-01-01T00:00:00Z Justin Bieber en SF USA 3 57 172 10 32 2011-01-01T00:00:00Z Ke$ha en Calgary CA 2 60 186 17 43 2011-01-02T00:00:00Z Selena Gomes en Calgary CA 1 12 53 12 12 Rollup by hour
17 Dictionary Encoding Create and store Ids for each value e.g. page column Values - Justin Bieber, Ke$ha , Selena Gomes Encoding - Justin Bieber : 0, Ke$ha : 1, Selena Gomes: 2 Column Data - [0 0 0 1 1 2] city column - [0 0 0 1 1 1] timestamp page language city country … added deleted 2011-01-01T00:01:35Z Justin Bieber en SF USA 10 65 2011-01-01T00:03:63Z Justin Bieber en SF USA 15 62 2011-01-01T00:04:51Z Justin Bieber en SF USA 32 45 2011-01-01T00:05:35Z Ke$ha en Calgary CA 17 87 2011-01-01T00:06:41Z Ke$ha en Calgary CA 43 99 2011-01-02T00:08:35Z Selena Gomes en Calgary CA 12 53
18 Bitmap Indices Store Bitmap Indices for each value Justin Bieber -> [0, 1, 2] -> [1 1 1 0 0 0] Ke$ha -> [3, 4] -> [0 0 0 1 1 0] Selena Gomes -> [5] -> [0 0 0 0 0 1] Queries Justin Bieber or Ke$ha -> [1 1 1 0 0 0] OR [0 0 0 1 1 0] -> [1 1 1 1 1 0] language = en and country = CA -> [1 1 1 1 1 1] AND [0 0 0 1 1 1] -> [0 0 0 1 1 1] Indexes compressed with Concise or Roaring encoding timestamp page language city country … added deleted 2011-01-01T00:01:35Z Justin Bieber en SF USA 10 65 2011-01-01T00:03:63Z Justin Bieber en SF USA 15 62 2011-01-01T00:04:51Z Justin Bieber en SF USA 32 45 2011-01-01T00:01:35Z Ke$ha en Calgary CA 17 87 2011-01-01T00:01:35Z Ke$ha en Calgary CA 43 99 2011-01-01T00:01:35Z Selena Gomes en Calgary CA 12 53
19 Approximate Sketch Columns timestamp page userid language city country … added deleted 2011-01-01T00:01:35Z Justin Bieber user1111111 en SF USA 10 65 2011-01-01T00:03:63Z Justin Bieber user1111111 en SF USA 15 62 2011-01-01T00:04:51Z Justin Bieber user2222222 en SF USA 32 45 2011-01-01T00:05:35Z Ke$ha user3333333 en Calgary CA 17 87 2011-01-01T00:06:41Z Ke$ha user4444444 en Calgary CA 43 99 2011-01-02T00:08:35Z Selena Gomes user1111111 en Calgary CA 12 53 timestamp page language city country count sum_added sum_deleted min_added Userid_sketch …. 2011-01-01T00:00:00Z Justin Bieber en SF USA 3 57 172 10 {sketch} 2011-01-01T00:00:00Z Ke$ha en Calgary CA 2 60 186 17 {sketch} 2011-01-02T00:00:00Z Selena Gomes en Calgary CA 1 12 53 12 {sketch} Rollup by hour
Approximate Sketch Columns Better rollup for high cardinality columns e.g userid Reduced storage size Use Cases Fast approximate distinct counts Approximate histograms Funnel/retention analysis Limitation Not possible to do exact counts filter on individual row values
Indexing Data
Indexing Service Indexing is performed by Overlord Middle Managers Peons Middle Managers spawn peons which runs ingestion tasks Each peon runs 1 task Task definition defines which task to run and its properties
23 Streaming Ingestion : Realtime Index Tasks Ability to ingest streams of data Stores data in write-optimized structure Periodically converts write-optimized structure to read-optimized segments Event query-able as soon as it is ingested Both push and pull based ingestion
Streaming Ingestion : Tranquility Helper library for coordinating streaming ingestion Simple API to send events to druid Transparently Manages Realtime index Task Creation Partitioning and Replication Schema Evolution Can be used with your favourite ETL framework e.g Flink , Nifi , Samza , Spark, Storm At-least once ingestion
Kafka Indexing Service (experimental) Supports Exactly once ingestion Messages pulled by Kafka Index Tasks Each Kafka Index Task consumes from a set of partitions with specific start and end offset Each message verified to ensure sequence Kafka Offsets and corresponding segments persisted in same metadata transaction atomically Kafka Supervisor e mbedded inside overlord Manages kafka index tasks Retry failed tasks Task 1 Task 2 Task 3
Batch Ingestion HadoopIndexTask Peon launches Hadoop MR job Mappers read data Reducers create Druid segment files Index Task Runs in single JVM i.e peon Suitable for data sizes(<1G ) Integrations with Apache HIVE and Spark for Batch Ingestion
Querying Data
Querying Data from Druid Druid supports JSON Queries over HTTP In built SQL (experimental) Querying libraries available for Python R Ruby Javascript Clojure PHP Multiple Open source UI tools
29 JSON Over HTTP HTTP Rest API Queries and results expressed in JSON Multiple Query Types Time Boundary Timeseries TopN GroupBy Select Segment Metadata
In built SQL (experimental) Apache Calcite based parser and planner Ability to connect druid to any BI tool that supports JDBC SQL via JSON over HTTP Supports Approximate queries APPROX_COUNT_DISTINCT(col ) Ability to do Fast Approx TopN queries APPROX_QUANTILE(column, probability)
Integrated with multiple Open Source UI tools Superset – Developed at AirBnb In Apache Incubation since May 2017 Grafana – Druid plugin Metabase With in-built SQL, connect with any BI tool supporting JDBC
Druid in Production
Druid in Production Is Druid suitable for my Use case ? Will Druid meet my performance requirements at scale ? How complex is it to Operate and Manage Druid cluster ? How to monitor a Druid cluster ? High Availability ? How to upgrade Druid cluster without downtime ? Security ?
Suitable Use Cases Powering Interactive user facing applications Arbitrary slicing and dicing of large datasets User behavior analysis measuring distinct counts retention analysis funnel analysis A/B testing Exploratory analytics/root cause analysis Not interested in dumping entire dataset
Performance and Scalability : Fast Facts Most Events per Day 300 Billion Events / Day ( Metamarkets ) Most Computed Metrics 1 Billion Metrics / Min ( Jolata ) Largest Cluster 200 Nodes ( Metamarkets ) Largest Hourly Ingestion 2TB per Hour (Netflix)
Simplified Druid Cluster Management with Ambari Install, configure and manage Druid and all external dependencies from Ambari Easy to enable HA, Security, Monitoring …
Simplified Druid Cluster Management with Ambari
Monitoring a Druid Cluster Each Druid Node emits metrics for Query performance Ingestion Rate JVM Health Query Cache performance System health E mitted as JSON objects to a runtime log file or over HTTP to other services Emitters available for Ambari Metrics Server, Graphite, S tatsD , Kafka Easy to implement your own metrics emitter
Monitoring using Ambari Metrics Server HDP 2.6.1 contains pre-defined grafana dashboards Health of Druid Nodes Ingestion Query performance Easy to create new dashboards and setup alerts Auto configured when both Druid and Ambari Metrics Server are installed
Monitoring using Ambari Metrics Server
Monitoring using Ambari Metrics Server
High Availability Deploy Coordinator/Overlord on multiple instances Leader election in zookeeper Broker – install multiple brokers Use druid Router/ Any Load balancer to route queries to brokers Realtime Index Tasks – create redundant tasks. Historical Nodes – create load rule with replication factor >= 2 (default = 2)
44 Rolling Upgrades Shared Nothing Architecture Maintain backwards compatibility Data redundancy Upgrade one Druid Component at a time No Downtime 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 3
Security Supports Authentication via Kerberos/ SPNEGO Easy Wizard based kerberos security enablement via Ambari Druid KDC server 1 kinit user 2 Token User Browser 4 Valid cookie 1 kinit user 2 Token 3 Negotiate
46 Summary Easy installation and management via Ambari Real-time Ingestion latency < seconds. Query latency < seconds. Arbitrary slice and dice big data like ninja No more pre-canned drill downs. Query with more fine-grained granularity. High availability and Rolling deployment capabilities Secure and Production ready Vibrant and Active community Available as Tech Preview in HDP 2.6.1
Extending Core Druid Plugin Based Architecture leverage Guice in order to load extensions at runtime Possible to add extension to Add a new deep storage implementation Add a new Firehose for Ingestion Add Aggregators Add Complex metrics Add new Query types Add new Jersey resources Bundle your extension with all the other Druid extensions
Performance : Approximate Algorithms Ability to Store Approximate Data Sketches for high cardinality columns e.g userid Reduced storage size Use Cases Fast approximate distinct counts Approximate Top-K queries Approximate histograms Funnel/retention analysis Limitation Not possible to do exact counts filter on individual row values
Superset Python backend Flask app builder Authentication Pandas for rich analytics SqlAlchemy for SQL toolkit Javascript frontend R eact, NVD3 Deep integration with Druid
Superset Rich Dashboarding Capabilities: Treemaps
Superset Rich Dashboarding Capabilities: Sunburst
Superset UI Provides Powerful Visualizations Rich library of dashboard visualizations: Basic: Bar Charts Pie Charts Line Charts Advanced: Sankey Diagrams Treemaps Sunburst Heatmaps And More!