Data Stream Management Authors Lukasz Golab & M . Tamer Özsu Supervised by Dr. Sakti Pramanik Presented by AKM Tauhidul Islam
Outline Introduction Motivation Problem Statement Definitions Data Stream Management System (DSMS) Streaming Data Warehouse (SDW) Discussion
Introduction Stream data - Produced incrementally over time, rather than being available in full before its processing begins Examples: Applications: Sensor Networks - E.g . TinyDB Network Traffic Analysis - E.g. Traffic statistics and critical condition detection. Financial Tickers - On-line analysis of stock prices, discover correlations, identify trends. Transaction Log Analysis - E.g . Web click streams and telephone calls Transaction data streams Log Streams Credit card purchases, Telecommunications, Web Accesses Climate Data GPS tracking Sensor networks IP networks
Motivation Massive data sets: Huge numbers of users, e.g., AT&T long-distance: ~ 300M calls/day AT&T IP backbone: ~ 10B IP flows/day Highly detailed measurements, e.g., NOAA: satellite-based measurements of earth geodetics Huge number of measurement points, e.g., Sensor networks with huge number of sensors Near real-time analysis ISP: controlling service levels NOAA: tornado detection using weather radar Hospital: Patient monitoring Traditional data feeds Simple queries (e.g., value lookup) needed in real-time Complex queries (e.g., trend analyses) performed off-line
Problem Statement DBMS DSMS Data Persistent Relations Streams, time windows Data Access Random Sequential, One-pass Updates Arbitrary Append Only Update Rates Relatively Low High , bursty Processing Model Query Driven Data driven Queries One time Continuous Query Plans Fixed Adaptive Query Optimizations One Query Multi-query Query Answers Exact Exact or Approximate Latency Relatively High Low Data Warehouse SDW Data Historical Recent and Historical Update Frequency Low High Update Propagation Synchronous Asynchronous ETL Process Complex Fast, Light-weight Fig : Comparison of Data Stream Management Systems and Streaming Data Warehouses with traditional database and warehouse systems
Definitions Non-blocking Execution : Query operator Q doesn’t require entire input Monotonicity : All previous results preserved Q( т ) € Q( т ’), for query operator Q, where т <= т ’ Q is monotonic only if non-blocking Delta : Doesn’t hold monotonicity property , produce update result at time т , negative / Positive delta Punctuation : Special tuple containing a predicate that is guaranteed to be satisfied by the remainder of the data stream Heartbeat : Punctuations that govern timestamps of future tuples Average slowdown = Tuple response time/ shortest processing time
Outline Introduction Data Stream Management System (DSMS) Stream Data Models Query Language & Semantics Query Processing Query Optimization Streaming Data Warehouse (SDW) Discussion
DSMS Input Buffer/Monitor Captures streaming inputs May collect statistics on streams Random sampling Working storage Stores recent stream data Used for query processing Local Storage Used for metadata Foreign key mapping Naming translation Query Processor Convert queries into execution plans Change plans for different workloads / input rates Contains buffers, operator queues Deploys scheduling methods Continuous Query Repository Results May input to users, to other applications Stored in an SDW for further analysis Fig : i ) Abstract reference architecture of a DSMS & ii) A traditional DBMS
Stream Data Models Base Streams – Produced by sources, append only Derived streams – produced by continuous queries Streams have fixed schema <timestamp, source IP Addr , source port, destination IP Addr , destination port, size> Data Stream Models Describe underlying signals S : [l ... N] -> R Aggregate model – Range value for a signal Cash Register model – Partial non-negative range value Turnstile model – Partial range value Reset model – Range value; Reset previous value of a signal Stream Windows – important to user and query points of view Fixed window Sliding window Landmark window Jumping window – update every k-ticks or k-arrivals Tumbling window - update every k-ticks or k-arrivals , k = window size
Query Language & Semantics Query Algebra Stream-to-stream Mixed Algebra Query Operators – Similar syntax to DBMS, very different semantics Relation-like query operators Selection, projection, union – stateless operators Join – window joins Aggregate operators DSMS exclusive operators Buffered sort operator Random sampling operator User defined aggregate functions (UDAF) Query Languages GSQL CQL ESL
Query Operators Selections, (duplicate preserving) projections are straightforward Local , per-element operators Duplicate eliminating projection is like grouping Projection needs to include ordering attribute No restriction for position ordered streams Aggregate expressions: distributive: sum, count, min, max algebraic: average holistic: count-distinct, median Fig: Simple continuous query operators: i ) - Selection , ii ) Count , iii) Negation
Query Operators Join operators problematic on streams May need to join arbitrarily far apart stream tuples Operations on implicit / explicit windows SELECT * FROM S 1 , S2 WHERE Sl.attr = S2.attr GROUP BY Sl.timestamp /60 AS minute SELECT * FROM S 1 , S2 WHERE Sl.attr = S2.attr GROUP BY IS1 . timestamp| - |S2.timestampl <= w SELECT * FROM S1 [RANGE w] , S2 [RANGE w] WHERE Sl.attr = S2.attr Fig: Simple continuous query operators: i ) Join , ii) Sliding window join with state
Query Processing Declarative queries ->Logical query plan -> Physical Plan Directed Acyclic Graphs (nodes->operators, edges -> data flow) Queries sharing memory/streams combined to a single plan Fig: a) Query plan for two queries: i ) a join of streams S l and S 2 with a selection predicate on S l , and 2) an aggregate on S 2 . b) A continuous query with selection and tumbling window aggregation Scheduling FIFS, Round Robin – simple, not efficient Operators with higher throughput – low latency Operators with min processing & selectivity – smaller queue Heartbeats & Punctuations Typically issued by sources Reduce amount of states needed by operators Prevent operators doing unnecessary tasks Query plans can also issue heartbeats to avoid pipeline stalls and delayed results SELECT minute , SUM(size) FROM s WHERE destination_port <= 80 GROUP BY timestamp/60 AS minute
Query Processing Cont.. Queries as views & Negative tuples Negative tuples implemented by sign on explicit windows Explicit windows on time or count based Generated negative tuples processed by cascading operators Negative tuple on aggregate operators Count – easy to compute Max/Min – Memory intensive Twice as many tuples are considered Possible avoiding for monotonic operators Tag tuples with expiration time Operators known as weak non-monotonic Fig: a) Maintaining a view over a sliding window join using negative tuples b) Finding the maximum element in a sliding window
Query Optimization Finds efficient query plans DBMS focus on minimizing I/O while DSMS try to reduce cost per unit Static Analysis and Query Rewriting Ensures query can be evaluated in non-blocking fashion with limited memory S(A,B,C), T(D,E) ∏ A ( б A=D & A>I0 & D<20 (S x T) ) , Yes ∏ A ( б A=D (S x T) ), No ∏ A ( б B<D & A>I0 & D<20 (S x T) ), Yes, if no duplicate Common Rules Evaluate inexpensive predicates before complex ones Performing selections before joins Rules for continuous query operators only Selections and explicit time-based windows commute Selections and explicit count-based windows don’t commute Rewrite based on input(s) constraints Join of unbounded streams if matching tuples arrive at most t time units apart Multi Query Optimization Fig : Separate and shared query plans for Ql and Q2
Operator Optimization Join Need to remove expired tuples Expiration in each time tick costly Periodic removal reduce cost but increase join processing cost Probe streams with fewer matches Aggregation Synopses allow efficient re-computations Prefix synopses Suitable for sub-tractable aggregates For ex: Sum, Count Interval synopses Suitable for distributive aggregates For ex: Min, Max Need to access log b intervals Basic interval synopses require b accesses Holistic aggregates require additional info in synopses Algebraic aggregates computed from derived info Avg = Sum / Count Fig : i ) Prefix synopses, ii) Interval synopses, iii) Basic interval synopses
Query Optimization Load Shedding & Approximation Random sampling Semantic load shedding to drop less important Objective is to minimize the drop in accuracy Challenging for complex query plan with multiple streams and operators Load Balancing Write part of stream if possible Adaptive Query Optimization Query cost-per-unit time may change Query plan dynamically re-ordered on speed, selectivity and queue length Trade-off between resulting adaptivity and overhead of dynamic routing Distributed Query Optimization Parallelizing and distributing the system itself Split query plan across nodes Partition the streams Shifting partial computation to the sources In-network processing reduce the communication overhead
Outline Introduction Data Stream Management System (DSMS) Streaming Data Warehouse (SDW) Data ETL Update Propagation Data Expiration Update Scheduling Query Processing on SDW Discussion
SDW Data streams/feeds arrive periodically ETL process - data cleaning, standardization and so on Table types Base tables – Sourced directly from raw files Derived tables – Materialized view over base or other derived table Update scheduler selects files update order Based on dependencies and workloads Fig : Abstract reference architecture of a SDW
ETL Simple tasks – un-compression, standardization Complex tasks Joining new data with descriptive attributes relations Relations R are disk based Data buffer at main memory Mesh Join Access blocks of R in sequential order Tuple removed from buffer when join to all blocks of R Loading data into tables Tables are partitioned into timestamp ranges Affect small number or recent partitions Fig : Partitioning a table on a timestamp attribute
Update Propagation Goals Propagate changes across layers of derived tables Avoid recomputing an entire derived table Efficiently identify partition dependency Partition dependencies may not be obvious from the SQL specification Fig : Updating a partitioned derived table Fig : Partition dependency
Data Expiration Tuples may have variable lifetime Tables can be partitioned on insertion and expiration timestamps Partitions may not have equal size One solution is to assign updates in round robin fashion Fig : Partitioning a table on two attributes: insertion and expiration timestamp
Update Scheduling External sources push new data So many data feeds and derived tables Resource usage control by using scheduler Minimize data staleness Priority weighted staleness metric to select tables which minimize it most Fig : plot of the staleness of a SDW table over time
Query Processing Overhead of partitioned tables Too small partitions are difficult to manage Too big ones need to be recomputed as new data arrives Solution : Bigger partitions as data become old Data Availability and Concurrency control Tables are updated frequently Queries should not be blocked and output consistent data Solution : Multi-version concurrency control at partition level
Discussion End-to-end data stream management DSMS allows relational like queries as well as pattern matching and event processing queries Query semantics are different than traditional ones SDW research problems introduced recently Didn’t cover data mining techniques, fault tolerance and distributed processing in the lecture
References Data stream management, Luckasz Golab & M. Tamer Özsu Data stream management system – introduction, concepts and issues. Morton Lindeberg , University of Oslo