Hasso-Plattner-Institut25 Jahre HPI
Hasso-Plattner-Institut25 Jahre HPI
Login
 

Destream: Decentralized Window Aggregation with Ad-hoc Queries

Wang Yue

Chair for Data Engineering Systems

Hasso Plattner Institute

Office: F-1.06
Tel .: + 49- (0) 331 5509-280
Email: Wang.Yue (at) hpi.de
Links: Homepage

Supervisor: Prof. Dr. Tilmann Rabl

 

 

We are focusing on the efficient window aggregation in distributed networks regarding to stream processing. In particular, we are interested in finding the approach that shares multiple queries among different nodes and slicing windows with multiple queries in an effective and efficient way.

Overview

In the past decade, we are witnessing an explosion of Internet of Things (IoT) applications in many domains, from environmental condition monitoring to industrial process control and health care. By 2025, the number of IoT devices is expected to be over 75 billion. The massive amounts of devices consist of decentralized networks and continuously generate data streams. Current stream processing engines (SPEs) such as Apache Flink, Apache Spark Streaming, and Apache Storm are developed for processing large-scale and high-velocity data streams. They perform efficient data aggregation that allows processing millions of events per second and are widely used in industry and research.

An example of a decentralized network use case is a smart driving system. The smart driving system aims to assist with traffic management and traffic estimation. With that system, drivers can be alerted in case of dangerous situations or simply avoid traffic congestion. Sensors are deployed on vehicles and generate data events continuously (e.g. speed, location, and temperature). The events then are collected and sent to data centers. Also, users input queries, such as calculating average speed of all trucks. Smart driving systems perform queries based on data events and then output results. There are two features of this example, multiple data streams that are from IoT devices are distributed in large networks, and there are multiple ad-hoc queries that need to be performed simultaneously.

To efficiently process data from decentralized networks, current SPEs are deployed in data centers and centralize data from distributed IoT devices to that centers. Then, SPEs process those data and output results. Therefore, all data produced by IoT devices are required to be collected via networks to data centers. That leads to high network costs especially when large-scale data is transferred between devices and SPEs. In addition, due to all computations centralized in data centers, SPEs have to scale up their clusters to deal with large-scale data. To reduce network overhead and improve performance, state-of-the-art [1] off-loads part of the data processing that would be performed in the data center to devices closer to the data streams. Instead of sending every event, those devices produce and transfer partial results. 

To process unbounded data streams, SPEs group data into fixed-size windows and process each window separately. Depending on different queries, SPEs define windows with different window types, aggregation functions, window measures, and selection predicates. Data analysis often involves many simultaneous queries and this generates many concurrent windows. Due to these windows processing the same data there are overlaps between windows. So reusing partial results from window overlaps can avoid redundant computation and reduces resource consumption. In our previous work, we designed Desis[2] that can push down multiple queries that have decomposable functions and the time-based measure to edge nodes. Desis also can share partial results between queries on edge nodes. In addition, we proposed Deco[3] and Dema[4] that process queries with non-decomposable functions and the count-based measure in local nodes. For multiple queries that have different selection predicates, current SPEs collect all data to their data centers and perform aggregations there. But they cannot process and share their partial results locally.

Approach

We now present Destream a decentralized aggregation technique that can process multiple ad-hoc queries in decentralized networks. There are two kinds of ad-hoc queries: long continuous queries and short continuous queries. Long continuous queries contain many windows and they output results periodically. Short continuous queries openly have one window, and system drop queries once results are generated. To avoid redundant computations between concurrent windows, we use window slicing. It reuses partial results of each overlap instead of processing all windows individually. This technique not only improves the performance of our system but also reduces the consumption of computational resources. Before describing our approach, we defined the terms query-group and punctuation.

Query-Group: the query-group is a set of queries that partial results can be shared between. For multiple queries, current systems need to create separate windows to process events. Our aggregation engine puts queries that can be shared into one query-group and produces slices instead of windows. For queries that have different selection predicates, they can be put into the same query-group since there are overlaps between their windows. Also, different query-groups are processed separately and Desis needs to maintain the context of each groups, i.e., the list of slices and aggregation operators.

In a decentralized setup, there are local nodes, intermediate nodes, and one root node in topology. Local nodes can connect to the root node directly or via intermediate nodes. For complex networks, more intermediate nodes are interconnected between the local and root nodes, and there are more hops from a local node to the root node. Compared to centralized aggregation, our system can execute aggregation on local nodes and intermediate nodes. It can dramatically minimize the data amount sent by each node and reduce aggregation steps done on the root node. In the following discussion, we discuss decentralized aggregation that processes different selection predicates, window types, window measures and aggregation functions.

In figure, we show three example queries and they are all time-based, Qa has tumbling windows and executes sum functions. Its selection predicate is speed > 50. Qb and Qc execute average functions and they have sliding and user-defined windows. Their selection predicates are speed > 25 and speed > 75, respectively. In the root node, the system puts Qa, Qb, and Qc into a query-group and outputs window attributes. All windows produced from that query-group can be shared, as they have overlaps. Then, the message manager distributes the window attributes of this query-group to local and intermediate nodes. On a local node, the aggregation engine splits windows into local slices according to the window attributes that are received from the root node. Then, the aggregation engine executes incremental aggregations for every event. Otherwise, the aggregation engine may get stuck since it has to spend a long time iterating and computing all events in the slice. Once a local slice ends, the aggregation engine executes its aggregation functions and transmits partial results to the intermediate nodes. Also, if a local slice is ended by an end punctuation (ep), the system will mark this slice with the ep. On an intermediate node, our system creates an intermediate slice and collects partial results from its child nodes for this slice. When the intermediate slice receives all results from local nodes, it calculates partial results and sends them to the root node. If partial results from local nodes are marked with a ep, the intermediate partial result will keep this ep. Similar to local nodes, we also perform incremental aggregations on intermediate nodes. For example, in figure, when there are two partial results of s1, they are collected to the intermediate slice (s1). On the root node, partial results from its child nodes are gathered and incrementally calculated into a root slice. When a slice ends, the aggregation engine executes aggregation functions and saves its result. If intermediate partial results carry an ep, the aggregation engine has to assemble all partial results belonging to this window and output the final result. In figure, we terminate the root slice (s1) when both partial results of intermediate s1 arrived. The partial result of intermediate s3 carries an ep}, and the root node ends the window according to this ep.

Background

In this work, we present Desis, an SPE designed to work on edge and fog computing environments by offloading part of the processing on machines closer to data streams. Desis is optimized to process multiple queries with decentralized aggregation and to share the computation between windows. Our approaches mainly consist of three features: First, Desis is capable of decentralized aggregations with tumbling windows, sliding windows, session windows, and user-defined windows. Instead of collecting data to a center and aggregating windows there, Desis is able to calculate windows with decentralized aggregation by pushing down window aggregation to other machines in the system. Second, we propose an optimizer that first performs window slicing to slice concurrent windows, and then, with window merging, shares the intermediate results of overlaps between original windows, which have different window types, window measures, and aggregation functions. Third, Desis can process real-time data based on event-time and deal with disorder data with ensuring accuracy. Desis exhibits good scaling in terms of processing high amounts of queries and throughput.

Related Work

We now present some related work with stream processing in distributed network and window aggregation among multiple queries. Madden et al. [5] propose the tiny aggregation that is a tree-structured distributed data aggregation. And it works a lot on how to build the network route between different nodes. In Cougar [6] and LEACH [7], the authors make use of a class-structured network, which selected cluster node among other nodes that aggregate windows from other nodes. The PEGASIS [8] introduces a chain-structured approach that all the nodes are a priori and conscious of all other nodes. SlideSide [9] proposed an approach for incremental processing in multi-queries scenarios that maintains a running prefix / suffix scan over the input stream. FlatFAT [10] uses a tree structure to store the partial result.

In summary, there are so many stream processing techniques that focus on sensor networks and multiple queries. However, none of them try to combine those together, which is a general problem met by many realistic scenarios. Also, they lack the solution of counting window in stream processing and only concentrate on the sliding or tumbling windows.

Evaluation

We compared our solution with the centralized system that we discussed, also we have the same network structure. We measure three metrics of both systems, latency is how much time system output results, throughput is how many events are processed per second, and network overhead is network traffic used by each system. The experiments show our system outperforms baseline by 100 times while reducing 99% network overhead.

References

[1] Benson, Lawrence, et al. "Disco: Efficient Distributed Window Aggregation." EDBT. Vol. 20. 2020.
[2] W. Yue, L. Benson, and T. Rabl. Desis: Efficient Window Aggregation in Decentralized Networks. (EDBT 2023)
[3]W. Yue, R. Moczalla, and T. Rabl. Deco: Efficient Decentralized Aggregation for Count-Based Windows.  (submitted, EDBT 2023)
[4] W. Yue, R. Moczalla, and T. Rabl. Dema: Efficient Decentralized Aggregation for Non-Decomposable Functions. (preparation, 2023)
[5] Samuel Madden, Michael Franklin, Joseph Hellerstein, and Wei Hong. TAG: a Tiny Aggregation Service for Ad-Hoc Sensor Networks. In OSDI, pages 131-146. ACM, 2002.
[6] Yong Yao and Johannes Gehrke. The cougar approach to in-network query processing in sensor networks. ACM SIGMOD Record, 31 (3): 9-18, 2002.
[7] Wendi Heinzelman, Anantha Chandrakasan, and Hari Balakrishnan. An application-specific protocol architecture for wireless microsensor networks. IEEE Transactions on Wireless Communications, 1 (4): 660-670, 2002.
[8] Stephanie Lindsey, Cauligi Raghavendra, and Krishna Sivalingam. Data gathering algorithms in sensor networks using energy metrics. TPDS, 13(9): 924-935, 2002.
[9] Theodorakis, Georgios, Peter Pietzuch, and Holger Pirk. "SlideSide: a fast incremental stream processing algorithm for multiple queries." (2020). 
[10] Tangwongsan, Kanat, et al. "General incremental sliding-window aggregation." Proceedings of the VLDB Endowment 8.7 (2015): 702-713.