a framework for clustering and classification of big data...

19
A Framework for Clustering and Classification of Big Data Using Spark Xristos Mallios, Vasilis Vassalos, Tassos Venetis, and Akrivi Vlachou (B ) Athens University of Economics and Business, Athens, Greece {mallios,vassalos,avenet,avlachou}@aueb.gr Abstract. Nowadays, massive data sets are generated in many mod- ern applications ranging from economics to bioinformatics, and from social networks to scientific databases. Typically, such data need to be processed by machine learning algorithms, which entails high processing cost and usually requires the execution of iterative algorithms. Spark has been recently proposed as a framework that supports iterative algorithms over massive data efficiently. In this paper, we design a framework for clustering and classification of big data suitable for Spark. Our frame- work supports different restrictions on the data exchange model that are applicable in different settings. We integrate k-means and ID3 algorithms in our framework, leading to interesting variants of our algorithms that apply to the different restrictions on the data exchange model. We imple- mented our algorithms over the open-source computing framework Spark and evaluated our approach in a cluster of 37-nodes, thus demonstrating the scalability of our techniques. Our experimental results show that we outperform the algorithm provided by Spark for k-means up to 31 %, while the centralized k-means is at least one order of magnitude worse. 1 Introduction Huge amounts of data are generated every day at tremendous rates. With the rapid expansion of data generation, we are moving from the Terabyte to the Petabyte Age. As the size of the data grows, applications are forced to seek more computing power and storage resources. However, real-time and archival data is growing faster than computing power. As a result, parallel processing is necessary for scaling up data-intensive processing algorithms. Moreover, due to the huge amount of available information data mining techniques have become important for discovering knowledge in databases and are becoming increasingly vital for Big Data analytics in various fields such as medicine, science and business. Even though distributed data mining has been studied in various distrib- uted settings, with the advent of cloud-based systems new challenges arise for data-centric applications aiming at reliable, cost-efficient and privacy-preserving data management and processing. In addition, data mining algorithms require iterative algorithms that do not scale well on popular programming models for implementing data-centric distributed computing, such as MapReduce. On the other hand, Spark [18] was proposed as a framework that supports efficiently c Springer International Publishing AG 2016 C. Debruyne et al. (Eds.): OTM 2016 Conferences, LNCS 10033, pp. 344–362, 2016. DOI: 10.1007/978-3-319-48472-3 20

Upload: others

Post on 07-Jun-2020

22 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classificationof Big Data Using Spark

Xristos Mallios, Vasilis Vassalos, Tassos Venetis, and Akrivi Vlachou(B)

Athens University of Economics and Business, Athens, Greece{mallios,vassalos,avenet,avlachou}@aueb.gr

Abstract. Nowadays, massive data sets are generated in many mod-ern applications ranging from economics to bioinformatics, and fromsocial networks to scientific databases. Typically, such data need to beprocessed by machine learning algorithms, which entails high processingcost and usually requires the execution of iterative algorithms. Spark hasbeen recently proposed as a framework that supports iterative algorithmsover massive data efficiently. In this paper, we design a framework forclustering and classification of big data suitable for Spark. Our frame-work supports different restrictions on the data exchange model that areapplicable in different settings. We integrate k-means and ID3 algorithmsin our framework, leading to interesting variants of our algorithms thatapply to the different restrictions on the data exchange model. We imple-mented our algorithms over the open-source computing framework Sparkand evaluated our approach in a cluster of 37-nodes, thus demonstratingthe scalability of our techniques. Our experimental results show that weoutperform the algorithm provided by Spark for k-means up to 31 %,while the centralized k-means is at least one order of magnitude worse.

1 Introduction

Huge amounts of data are generated every day at tremendous rates. With therapid expansion of data generation, we are moving from the Terabyte to thePetabyte Age. As the size of the data grows, applications are forced to seek morecomputing power and storage resources. However, real-time and archival data isgrowing faster than computing power. As a result, parallel processing is necessaryfor scaling up data-intensive processing algorithms. Moreover, due to the hugeamount of available information data mining techniques have become importantfor discovering knowledge in databases and are becoming increasingly vital forBig Data analytics in various fields such as medicine, science and business.

Even though distributed data mining has been studied in various distrib-uted settings, with the advent of cloud-based systems new challenges arise fordata-centric applications aiming at reliable, cost-efficient and privacy-preservingdata management and processing. In addition, data mining algorithms requireiterative algorithms that do not scale well on popular programming models forimplementing data-centric distributed computing, such as MapReduce. On theother hand, Spark [18] was proposed as a framework that supports efficientlyc© Springer International Publishing AG 2016C. Debruyne et al. (Eds.): OTM 2016 Conferences, LNCS 10033, pp. 344–362, 2016.DOI: 10.1007/978-3-319-48472-3 20

Page 2: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 345

iterative algorithms over vast-sized data sets. Spark is a powerful open sourceprocessing engine with primary targets processing speed, scalability, ease of use,and sophisticated analytics. Since its release, Spark has seen rapid adoption byenterprises across a wide range of industries. It has quickly become the largestopen source community in Big Data, with over 500 contributors from 100+ orga-nizations.

In this paper, we propose a framework for clustering and classificationenabling big data analytics using Spark. Our framework supports three differentmodes of restrictions of the data exchange model. In distributed environments,privacy-preserving restrictions may exist depending on the application scenario.For example in medical data mining over data centers of different hospitals, theraw data which are stored in a different data center may not be allowed to beexchanged. According to the HIPAA privacy rule1 sharing of patient informationis unauthorized, without the patient authorization. On the other hand, aggre-gated data is allowed to be exchanged between different data centers. For thisreason, it is essential to find a way to implement machine learning algorithms ina distributed way, but simultaneously respect the data-exchange restrictions.

To summarize, the main contributions of this paper are:

– We design a framework for clustering and classification of big data suitablefor Spark that supports different restrictions on the data exchange model

– We integrate k-means and ID3 algorithms in our framework for all differentrestrictions on the data exchange model and provide a detailed descriptionbased on the Spark operations.

– In order to verify the efficiency of the proposed techniques, we implemented ouralgorithms over Spark and evaluated our approach in a cluster of 37-nodes. Inour experimental evaluation we show the scalability of our framework. More-over since Spark provides MLlib, a machine learning library that includes areference implementation of k-means, we show that our algorithms outperformthe MLlib algorithm, while also providing restricted data exchange.

This paper is organized as follows. Section 2 reviews the related work, whileSect. 3 briefly introduces Spark. Our framework is presented in Sect. 4, while inSect. 5 and Sect. 6 we show how k-means and ID3 can be implemented in ourframework. In Sect. 7 the experimental evaluation is presented and we concludein Sect. 8.

2 Related Work

K-means [12] is a well-studied problem. In [11], a simple and effective implemen-tation of the Lloyd’s algorithm is presented. In [10], a new algorithm is proposedfor k-means clustering that manages to converge in a small number of passes.An optimized version of the k-means algorithm is presented in [13] that exploitsthe observation that after a certain number of iterations only a small part of

1 http://www.hhs.gov/ocr/privacy/.

Page 3: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

346 X. Mallios et al.

the data elements change their cluster. Several variants of k-means have beenstudied, such as k-medians [2] that computes the median as the cluster centeror ellipse-shaped clustering [3].

K-means has also been studied in different distributed settings. In [8], agreedy approximation algorithm is presented that computes k-means over a dis-tributed setting. A distributed version of the k-means for a dynamic P2P net-work is designed in [4], where nodes only require local synchronization. In [1],the communications cost among the distributed nodes is reduced by construct-ing a global coreset. In [19], a parallel k-means algorithm is presented wherethe Message Passing Interface (MPI) programming model is used for messageexchange across the nodes. [16] presents a method of privacy-preserving k-meansclustering that is based on the assumption that different nodes contain differentattributes for a common set of entities. Clustering over MapReduce is studiedin [5]. MapReduce architecture differs from Spark as only two phases are allowednamely map and reduce. In [15] the authors present an experimental compar-ison of different algorithms executed on MapReduce and Spark. The authorsevaluated also the k-means algorithms, namely Mahout2 for MapReduce, andthe k-means program from the example package for Spark.

ID3 is a famous decision learning algorithm developed by J. Ross Quinlan [14].In [6] an empirical comparison between ID3 and back-propagation is presented.In [17] the authors propose efficient secure multi-party computation protocols toconstruct a privacy-preserving ID3 algorithm over horizontally partitioned dataamong multiple parties. An approach for building a distributed ID3 classifier wasproposed in [9],which takes onlymetadata fromdistributeddata sets, thus avoidingtotal access to the original data.

3 Spark Overview

In this section, we briefly describe Apache Spark [18]. Spark was proposed asa framework that supports efficiently iterative algorithms, which were not sup-ported sufficiently by MapReduce. This class of applications includes many iter-ative machine learning algorithms, such as k-means. Spark supports these appli-cations while retaining the scalability and fault tolerance of MapReduce. TheSpark engine runs in a variety of environments like Hadoop3 or Mesos clus-ters4 and it has been used for query processing (SQL), advanced analytics (e.g.machine learning) and streaming over large data sets in a wide range of datastores. Moreover, Spark can outperform Hadoop by 10x in iterative machinelearning workloads [18].

In terms of system architecture (Fig. 1), a Spark master node controls theworkflow, and Spark worker nodes are responsible for executing part of thejob submitted to the Spark master node. The main key concept in Spark isthe resilient distributed data set (RDD), which represents a read-only collection2 https://mahout.apache.org/.3 http://wiki.apache.org/hadoop.4 http://mesos.apache.org.

Page 4: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 347

Fig. 1. Spark Architecture: Data is stored in worker nodes, while the spark driver isthe coordination node.

of objects partitioned across a set of machines. An RDD may be cached inmemory across machines and may be reused in multiple MapReduce-like paralleloperations, while fault tolerance is achieved by rebuilding from other RDDs justthat partition of data that is lost. Spark comes with a built-in set of over 80high-level operators. Next, we describe some very basic Spark operations.

– map(): transforms each record of the RDD data to a new RDD record basedon a user-defined function.

– mapPartitions(): is a Spark transformation, which runs in each partitionseparately. In each partition, the input is an iterator over all the records ofthe corresponding parent partition and the output can be whatever the userwant. This transformation is very useful, when users want to aggregate datain each partition separately and not globally.

– filter(): is a Spark transformation, which filters records of the parent RDDbased on a user-defined function.

– reduce(): is a Spark action, which aggregates the data of an RDD based ona user-defined function. This action works in a distributed way and not in acentralized one.

– reduceByKey(): can be applied only on a Pair(key-value) RDD and it aggre-gates the values of the records with the same key based on a user-definedfunction. This transformation runs initially locally by aggregating the recordswith the same key and after that aggregates the data in a global way.

– takeSample(): returns a sample of the RDD to the master node based on agenerator seed. Each worker is ordered to return a set of records to the master.After that, the master node unions the retrieved records.

– collect(): returns the RDD records to the master node as an array of objects.

4 Framework

In this paper we propose a framework for clustering and classification algorithmsthat relies on Spark architecture and enables different levels of restrictions onthe data-exchange model.

Page 5: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

348 X. Mallios et al.

Fig. 2. Framework overview.

In our framework there exists one master node that coordinates the queryexecution and several workers that execute the clustering and classification algo-rithms in parallel. One main characteristic of distributed data mining algorithmsis that their execution requires multiple communication round-trips between themaster node and the workers. Figure 2 depicts an overview of our framework.

In more details, the distributed computation requires multiple round-trips.Figure 2 shows the execution of each round-trip. First the master node requeststhat each worker to computes locally some intermediate results based on thelocally stored data (phase 1: local pre-processing). In Spark framework themap() method or mapPartitions() is applied for phase 1. Then, the workersexchange the computed results to gather some global knowledge and processthe exchange data. This is phase 2 and mentioned as global distributed process-ing. reduceByKey() method is applied. Finally, each worker (phase 3) may alsoexecutes locally some additional computation based on the intermediate resultsfrom phase 2 before the results are collected by the master node. Finally, anotherround-trip may be initialized by the master node or the final result is computedby the master and reported.

Note that the workers in each round-trip may process the data differently.For example, in the first round-trip could be used for initialization, while in theremaining round-trips the actual computation takes place. Moreover, dependingon the algorithm some of the phases may not be necessary and are omitted.

A very crucial issue of our framework is that raw data or aggregated informa-tion may be needed to be exchanged between the master node and the workersor between different workers in order to compute the intermediate or final result.In distributed environments, privacy-preserving restrictions may exist depend-ing on the application scenario. For example in medical data mining over datacenters of different hospitals, the raw data which are stored in a different datacenter are not allowed to be exchanged, while aggregated data is allowed tobe exchanged between different data centers. For this reason, our framework

Page 6: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 349

supports three different variants of the data exchange model depending on therestrictions that information has to obey during data exchange:

– No-restriction mode: in this mode the master and the workers canexchange any type of information. Also, workers can exchange any type ofinformation directly or through the master node with other workers.

– Partially Restricted Data Exchange Mode: in this mode the masterand the workers can exchange only aggregated information, since raw dataare not allowed to be shared. Also aggregated information of one worker canbe exchange with other workers through the master node or directly.

– Strict Restricted Data Exchange Mode: in this mode again raw data isnot allowed +nged at all, but in addition aggregated information is allowedonly to be send between a worker and the master. The main idea is that themaster is authorized to gather the aggregated information from the workersbut the workers are not allowed to access any information of other workers.In this mode reduceByKey() method is not allowed, since aggregated resultsare shuffled among the worker nodes

Our framework is designed in a way that the distributed algorithms providesolutions for the data mining problems that are equivalent to the solutions of thecentralized algorithms. Moreover, we examine another alternative mentioned asapproximate algorithm which reduces the communication cost for computing theproblem solution, but may result to a different result compared to the centralizedalgorithm.

In the following sections we describe how k-means and ID3 algorithms areimplemented in our framework.

5 K-Means Algorithms

K-means [12] is a popular clustering method. Given a set of items and an integerk, k-means partitions the items in k groups based on their individual character-istics. More formally, let S = p1, p2, ..., pn be a set of n points in d-dimensionaleuclidean space and let k be a predefined positive integer which specifies the num-ber of the clusters. The goal is to compute k clusters centers C = {cj , 1 ≤ j ≤ k}such that the value

∑kj=1

∑ni=1 ‖cj − pi‖ is minimized. ‖cj − pi‖ denotes a cho-

sen distance measure between a data point and the cluster center.The k-means algorithm first picks randomly a set of initial points. Then, the

method works iteratively and in each step assigns each point to the nearest cen-ter. In the end of each iteration the center of each cluster is updated as the meanof the items values which have been assigned to it. The algorithm terminateswhen either the centers converge or the maximum number of iterations havebeen reached.

In this paper, we focus on distributed processing of the k-means algorithm.More specifically, let M be the master node of the cluster and a set of m workernodes W = {Wi, 1 ≤ i ≤ m}. On each worker Wi there is a set of local datapoints Si ⊆ Rd, while the global data set S =

⋃mi=1 Si.

Page 7: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

350 X. Mallios et al.

Algorithm 1. Distributed k-Means Algorithm (DistKM)1: Input: source: data set; max: max iterations; k: number of clusters; e: error tol-

erance.2: Output: C: centers of the clusters.3: source.map(convertToArray()).cache()4: C = takeSample(false, k, seed)5: C′ = C6: numOfIterations = 07: while (true) do8: broadcast(C)9: map(findCenters())

10: reduceByKey(aggregatePoints())11: map(computeCentroids())12: C = collect()13: numOfIterations + +14: if (numOfIterations == max) then15: break16: else if (convergence(C, C′, e)) then17: break18: else19: C′ = C20: return C

5.1 Distributed K-Means Algorithm (DistKM)

Our first algorithm first selects randomly in a distributed way k data points asan initial set of clusters centers C. In the following the set C is propagated toall workers. Each worker that receives the set of cluster centers C, aggregates itslocal data by assigning each point to its closest cluster center and sums up thecoordinate values of all data points assigned to the same cluster. Each workeralso counts the number of points that have been assigned to each cluster center.Then, the above-mentioned aggregated results are shuffled among the workernodes with the restriction that all the aggregated data which correspond to acluster cj will be gathered to the same physical machine. Each worker computesfor each cluster cj for which it is responsible, the new cluster center cj andreturns the results to the master node. After the master node collects the newcluster centers, it decides if a new iteration is needed based on the stoppingcriteria.

Our distributed k-means algorithm using Spark is presented in Algorithm 1.More specifically, in line 3 the algorithm reads the data set as RDD data andconverts each point into an array of float numbers. The RDD data is storedin cache, if it fits. To this end, we use the basic operation provided by Sparksource.map(). In the next line, the initial set of cluster centers C is selected asa random sample of k points. This can be done be the operation takeSample()provided by Spark, which collects k data points in a distributed way. The variableC′ stores the set of cluster centers of the previous iteration, while the variablenumOfIterations counts the number of iteration the algorithm has executed.

Page 8: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 351

In each iteration, the master node broadcasts the current cluster centers Cto the worker nodes (line 8) by broadcast() operation of Spark. Thereafter, eachworker maps its data points to key-value RDDs, aggregates the key-value RDDsin a distributed way and compute the new cluster centers (lines 9–11). Theseparts of the algorithm are discussed in details in the next paragraphs. Then, themaster node collects (line 12) the new set of cluster centers. In lines 13–19, themaster node decides if a new iteration is needed and updates the local variables.

Fig. 3. Example of distributed k-means

Figure 3 depicts an example of our algorithm. Step A shows the data storedlocally at each worker. We assume that k = 3 and C is initialized randomlyas C = {(2, 2), (1, 14), (4, 3)}. Step B shows the result of the Spark operationmap(findCenters()). findCenters() is a user-defined function that specifies howthe data should be processed during the map() operation. Its implementationis omitted due to space limitations, but is explained in the following in details.findCenters() is executed on each data point and for each data point the closestcluster center is found. Then, the data point is mapped to a key-value RDD,where the key is the id of the nearest cluster center, while the value is a 2-element tuple. The first element of the tuple is the point itself and the secondelement is value 1. For example, in Fig. 3 (Step B) the data point (2, 2) is mappedto the key-value RDD [1, [(2, 2), 1]], since the closest cluster center is c1.

Subsequently, reduceByKey(aggregatePoints()) is executed on each workernode Wi. aggregatePoints() is a user-defined function that specifies how the value(2-element tuple) of the key-value RDD will be processed during the operationreduceByKey() of Spark. Each worker aggregates its local points with the samekey (the ones which have been assigned to the same center) by summing thecoordinates of the points and counting the number of the points. The result ofthis step is the creation of one key-value pair for each center, where the value is a2-element tuple. The first element is the sum of the coordinates and the secondelement is the number of the assigned points. In Fig. 3, in Step C the key-value data is aggregated based on the key values. For example, [2, [(1, 14), 1]]and [2, [(1, 11), 1]] are aggregated and the key-value [2, [(2, 25), 2]] is created.

Page 9: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

352 X. Mallios et al.

Algorithm 2 .PartiallyRestrictedDataExchangek-meansAlgorithm(PRKM)1: Input: s: data set; max: max iterations; k: number of clusters; e: error tolerance.2: Output: C: centers of the clusters.3: map(convertToArray()).cache()4: mapPartitions(kmeans())5: C = sampling(collect(), k)6: ...

Based on reduceByKey(aggregatePoints()), aggregatePoints() is also executedglobally. This is depicted in Fig. 3 Steps D–E. First, data is shuffled among theworker nodes, with the restriction that the key-value RDDs which correspond toa specific cluster (same key value) are gathered to the same worker. Each workeraggregates its data in the same way as in the previous step. After this step,each cluster is represented by a single 2-tuple record, where the first element isthe total sum of the coordinates of all points which have been assigned to thecluster and the second element is the number of the assigned points. In line 11,we compute in a distributed way the new centers of the k clusters, by applying toeach aggregated record the function computeCentroids(). This function dividesfor each cluster the tuple’s first element (sum of coordinates in the cluster) withthe value of the second element of the tuple (number of points in the specificcluster). Finally (Step G), the master node collects the results.

5.2 Partially Restricted Data Exchange Algorithm (PRKM)

In the following we modify DistKM algorithm to obey the partially restricteddata exchange model, where nodes are allowed to exchange only aggregated dataand not raw data. Algorithm 2 shows the first lines of the pseudocode of ourimplementation using Spark that differ compared to Algorithm 1. In DistKMalgorithm the master selects a random set of k points as the initial centersof the clusters, and this initialization violates our data model with restrictiveaccess. Differently, the partially restricted data exchange (PRKM) algorithmrequest from all workers to locally apply a centralized k-means algorithm onits data points in order to create k clusters. To this end, the Spark operationmapPartitions() (line 4) is used which allows us to loop over all the data of apartition on the corresponding worker node. After that, the master node collectsthese values and randomly chooses k of these k ∗m points to initiate the centersof the k clusters. In line 5, sampling () is a user-defined function that is executedlocally, while collect() is a Spark operation that retrieves the local cluster centers.

5.3 Strict Restricted Data Exchange Algorithm (SRKM)

Our next algorithm complies with the strict restricted data exchange model,where workers are not allowed to exchange any type of data among them, unlessthe one end of the communication is the federation (master) node. The initial-ization of the cluster centers is done in the same way as Algorithm 2 (lines 1–6).

Page 10: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 353

Algorithm 3. Strict Restricted Data Exchange k-means Algorithm (SRKM)...10 mapPartitions(aggregatePoints())11 C′′ = collect()12 C = computeCentroids(C′′)...

Then, in the beginning of each iteration, the master node broadcasts the currentcluster centers to all worker nodes. Each worker node Wi assigns each of thelocal data points to the nearest cluster and aggregates its local data by sum-ming up the coordinate values of all data points assigned to the same clusterand counting the data points assigned to each cluster. This is similar to thedistributed k-means algorithm Algorithm 1 (lines 8–9). The main difference toAlgorithms 1 and 2 is that the global aggregation is not allowed to be done ina distributed way. Therefore, the master node gathers the above aggregate datafrom each worker separately, aggregates those values in order to create one tuplethat represent each cluster and computes the new center of each cluster locally.After the computation of the new centers, the master node decides if there willbe a next assignment step similar to Algorithm 1 (lines 13–20).

Algorithm 3 shows the lines of pseudocode of our algorithm (lines 10–12)that differs from Algorithms 1 and 2. In the line 10, each partition aggregateslocally its data, before sending them to master. The mapPartitions() operationis user with parameter the user-defined function aggregatePoints(). In line 11,the master collects the aggregated data from each worker separately and in line13, it aggregates the records which correspond to the same cluster and finallycomputes the new centers with computeCentroids() which is a user definedfunction that is executed locally.

5.4 Approximate Algorithm (AprxKM)

Our last algorithm differs from the previous algorithms because this algorithmdoes not compute the same result as the centralized algorithm applied on thesame initial cluster centers. Thus, we call this algorithm approximate algorithmand it relies on [7]. The approximate algorithm consists of two steps. In thefirst step, each worker compute l cluster centers by using a centralized k-meansalgorithm (with input parameter l), where l > k. When all the workers haveconverged, the master node collects the l ∗ m local centers. In addition, eachlocal cluster center cj has its personal weight wj which is equal to the numberof local points assigned to it. In the second step, the master performs locally theweighted k-means algorithm on the l ∗m local centers which have been obtainedfrom the previous step. The advantage of the approximate algorithm is that themaster contacts the workers only once, while the trade-off is the quality of theresult since the retrieved cluster centers differ from the cluster centers returnedby the centralized k-means.

Page 11: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

354 X. Mallios et al.

6 ID3

Classification algorithms often result to a decision trees, that represent the howthe points are classified into groups. A decision tree is a tree in which each branchnode represents a choice between a number of alternatives, and each leaf noderepresents a decision. Each decision tree has one root node, while each branchrepresents a possible scenario of decision and its outcome.

ID3 [14] is one of the basic algorithms to build decision tree models. The deci-sion tree is constructed in top-down manner using a greedy approach. In eachiteration, ID3 computes the Shannon entropy for each available attribute andselects the attribute with the minimum Shannon entropy as the most promisingattribute for the decision tree. The selected attribute is used as the next splitof the tree. Then ID3 creates a reduced data set R by removing the selectedattribute from each available data point. For each distinct value of the selectedattribute (i.e. corresponds to a different sub-tree of the decision tree) the appro-priate subset of data points are selected and only those will be considered foreach path. The process is repeated until all new nodes are considered as leafnodes.

The conditions for a node to be a leaf node are four:

1. All the values have the same classification value: The node is turned to leafand labeled with the above classification value

2. There are not more attributes to be selected: The node is turned to a leafand is labeled with the most common classification value

3. The size of the data set is smaller than a predefined size. The node is turnedto leaf and labeled with the most common classification value

4. Maximum depth has been reached. The node is turned to leaf and labeledwith the most common classification value

In the following, we present our different implementations over Spark. Notethat we do not present a+ different algorithm for the no-restriction mode ofdata exchange model as the simplest algorithm for ID3 obeys to the partial dataexchange restriction.

6.1 Partially Restricted Data Exchange Algorithm (PRID3)

Algorithm 4 shows the pseudocode of our first algorithm (PRID3). The RDDdata is read and each point is converted into a 2-element tuple, where the firstelement is an array with the attribute values and the second element is the clas-sification value (line 3). The master node has a queue locally (line 4–6) which isused in order to traverse the nodes of the decision tree level by level similar tothe centralized ID3 algorithm. Initially, a root node is inserted in the queue thatcorresponds to the entire data set and in each iteration, one node of the queueis removed. In the first iteration, each worker Wi loops over all local data andcomputes for each distinct value of every attribute the local number of occur-rences of each classification value (Spark operation mapPartitions(), line 11).

Page 12: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 355

The result is a key-value RDD tuple, where the key is the attribute and thevalue is a list. Each element of the list corresponds to a tuple and stores thevalue as well as the number of occurrences of each classification value. Then, theSpark operation reduceByKey() (line 12) is invoked and the number of occur-rences of the classification value for every attribute value are aggregated. Figure 4shows an example of this phase of the ID3 algorithm.

Algorithm 4. Partially Restricted Data Exchange ID3 Algorithm (PRID3)1: Input: source: data set; minRecs: records threshold; maxDepth: maximum depth

2: Output: rootNode: root of the decision tree.3: source.map().cache()4: rootNode = newNode()5: queue = newQueue()6: queue.push(rootNode)7: while (!queue.isEmpty()) do8: node = queue.pop()9: if (node.isLeaf()) then

10: continue11: mapPartitions(reduceCategoriesLocally())12: reduceByKey(aggregateCategories())13: mapPartitions(computeCategoriesInfo())14: reduce(aggregate())15: for all (category value ∈ info. 4) do16: mapPartitions.filter(filterRecs(category value))17: childDataset = map(filterAttribute())18: childNode = newNode(childDataset)19: queue.push(childNode)20: return rootNode

Afterward, the entropy of each value is computed in a distributed way. Eachworker computes locally the attribute for which the entropy has the minimumvalue and computes some important information(name of attribute, entropy,number of distinct classification values, number of distinct attribute values, mostfrequent classification value, number of records), as also shown in Fig. 4. For thisSpark operation mapPartitions() is used (line 13). Then, the workers identifyglobally the attribute with the minimum entropy and the result is returned tothe master (line 14).

In lines 15–19, for each distinct value of the selected attribute, a correspond-ing child node is created, in which the available data set will be a subset of itsparent data set. The local data is first reduced by removing attributes that havebeen already used in the same path of the decision tree and selecting only tuplesthat qualify based on the path of the decision tree. Spark operation map() andfilter() are used for reducing the data. The child nodes are added to the queue.

Moreover, the master node decides (line 9) if the current node will become aleaf node or not in the same way as the centralized ID3. In the case of the leaf

Page 13: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

356 X. Mallios et al.

Fig. 4. ID3 Example

node, the categorical class is assigned to the most frequent classification value.In the next iterations, a node is removed from the queue and the same procedureas for the first iteration is applied. Finally, the master node returns the root ofthe decision tree, if all nodes have been processed and the queue is empty.

6.2 Strict Restricted Data Exchange Algorithm (SRID3)

PRID3 algorithm complies with the partially restricted data exchange modelsince only aggregated data are exchanged, but violates the strict restricted dataexchange model. In PRID3, the number of occurrences of each attribute valueare computed in a distributed manner, first the number of occurrences are com-puted locally and then the data are shuffled among the workers, in order tocompute the overall number of occurrences (Spark operation reduceByKey()).However, the data shuffling among the workers is not allowed under the strictrestricted data exchange model. For this reason, the master node collects fromeach worker the local number of occurrences and computes the global numberof occurrences as well as the entropy of each attribute value locally. Then, itfinds the attribute with the globally lowest entropy. The rest of the algorithm issimilar to the PRID3 algorithm.

Table 1. k-means data sets.

Data set d |S| Size

HIGGSa ( HIGGS) 28 11 million (385 million) 7.48 Gb (261.93 Gb)

SUSY b ( SUSY ) 18 5 million (675 million) 7.48 Gb (300.53 Gb)ahttps://archive.ics.uci.edu/ml/datasets/HIGGS.bhttps://archive.ics.uci.edu/ml/datasets/SUSY.

Page 14: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 357

Table 2. ID3 data sets.

Data set d |S| Size Classification values

Nurserya ( Nursery) 8 13 thousand (13 million) 1MB (10Gb) 5

Carb (Car) 6 1728 (1.7 billion) 51Kb (48.6Gb) 4a https://archive.ics.uci.edu/ml/datasets/Nursery.b https://archive.ics.uci.edu/ml/datasets/Car+Evaluation.

7 Experiments

In this section, we present our experimental evaluation. We first presentour experimental setup and then we present the result of our experimentalevaluation.

7.1 Experimental Setup

In this section, we present the experimental setup for evaluating the K-Meansand ID3 algorithms. We evaluated experimentally both centralized and parallelimplementations of k-means and ID3. The centralized version of algorithms wereevaluated on a single workstation with a double-core 2.5 GHz processor and 8Gbof memory. The parallel algorithms were evaluated on a cluster of 37 nodes, eachwith a double-core 2.5 GHz processor and 8GB of memory. The cluster was builtover the infrastructure cloud provider okeanos5. Hadoop version 1.0.4 and Spark1.4.0 were used for all the experiments.

We used different data sets to evaluate the performance of the k-means algo-rithm. The first data set is the HIGGS data set6 which consists of 11 millioninstances, each of which has 28 numerical attributes and its size is 7.48 Gb. Thesecond data set is the SUSY data set7and consists of 5 million instances, eachof which has 18 numerical attributes. and its size is 7.48 Gb. In order to evaluatethe performance of our algorithms in larger data sets, we created the HIGGSand SUSY data sets which were created by multiplying the data of the HIGGSand SUSY data sets respectively. The HIGGS data set is consisted of 385 mil-lion instances and its size is 261.93 Gb, while the SUSY data set is consisted of675 million instances and its size is 300.53 Gb.

Moreover, we used two data set to evaluate the performance of the ID3algorithm. The first data set relies on the Nursery data set8, which was derivedfrom a hierarchical decision model originally developed to rank applications fornursery schools. The data set is consisted of almost 13 thousand instances, eachof which has 8 categorical attributes and its size is 1 Mb. The second data setcalled Car data set9 has 1728 instances, each of which has 6 categorical attributesand its size is almost 51 Kb. The number of the classification values is 5 for the5 https://okeanos.grnet.gr/home/.6 https://archive.ics.uci.edu/ml/datasets/HIGGS.7 https://archive.ics.uci.edu/ml/datasets/SUSY.8 https://archive.ics.uci.edu/ml/datasets/Nursery.9 https://archive.ics.uci.edu/ml/datasets/Car+Evaluation.

Page 15: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

358 X. Mallios et al.

Nursery data set and 4 for the Car data set. In order to evaluate the performanceof our algorithms in a more representative data set in terms of size, we createdthe Nursery and the Car data sets which were created by multiplying the dataof the Nursery and the Car data set respectively. The Nursery data set isconsisted of 13 million instances and its size is 10Gb, while the Car data set isconsisted of 1.7 billion instances and its size is 48.6Gb.

Tables 1 and 2 summarize the different data sets used to evaluate the per-formance of our algorithms. In addition, we compared the performance of ouralgorithms with the Weka k-means amd ID3 algorithms10 and the MLLib algo-rithm11. The default value of k is 2.

7.2 K-Means Evaluation

First, we compare the performance of our algorithms, the MLLib algorithm andthe Weka centralized algorithm (Fig. 5). For this comparison we only used theHIGGS data set. The execution time of the centralized is larger than 35 min,while the distributed algorithms have finished in less than 2 min. From the aboveexperiment, we can see the great benefit of the distributed execution of the k-means algorithm.

In Fig. 6 we compare the performance only of the distributed k-means algo-rithms. We compare the performance of our algorithms and the Mllib algorithmon the HIGGS (Fig. 6(a)) and the SUSY (Fig. 6(b)) data sets. For each ofthe aforementioned data sets we run the algorithms over 20, 28 and 36 work-ers respectively. For each setup, we repeated the experiment four times and wereport the average values of the execution time. We note that apart from theapproximate algorithm, the faster algorithm is SRKM . Usually, DistKM isfaster than PRKM , because of the slower initialization step. Moreover, SRKMis faster than DistKM , because the global aggregation of the local aggregatedtuples is executed faster in a centralized way than in a distributed one is becausethe size of the information is very small.

1

10

100

1000

36

Tim

e (s

ec) L

og s

cale

Number of Workers

DistKMPRKMSRKM

ApxKMMllib

Weka

Fig. 5. Distributed vs centralized

10 http://weka.sourceforge.net/doc.dev/weka/clusterers/SimpleKMeans.html.11 https://github.com/apache/spark.

Page 16: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 359

In Fig. 6, the approximate k-means algorithm is faster than the other algo-rithms. This result was expected, as the retrieved clusters are different than thecentralized, i.e., the convergence takes place at each worker locally rather thanover the entire data set. In order to examine how different the retrieved clustersare, we define the measure error =

∑kj=1

∑ni=1 ‖cj − pi‖. Figure 6(c) shows the

error values for HIGGS data set. Smaller values of error values, indicate clus-ters of better quality and the values of the approximate algorithm is worse thanthe other algorithms. So, the approximate algorithm may lead to small executiontime but the retrieved clusters may be of lower quality.

0 200 400 600 800

1000 1200 1400

20 28 36

Tim

e (s

ec)

Number of Workers

DistKMPRKMSRKM

ApxKMMllib

(a) HIGGS

0

500

1000

1500

2000

28 36

Tim

e (s

ec)

Number of Workers

DistKMPRKMSRKM

ApxKMMllib

(b) SUSY

0

1000

2000

3000

4000

5000

20 28 36

erro

r / 1

03

Number of Workers

DistKMPRKM

SRKMApxKM

Mllib

(c) Error

0

1000

2000

3000

4000

5000

6000

2 5 10

Tim

e (s

ec)

k

DistKMPRKMSRKM

ApxKMMllib

(d) Varying k

Fig. 6. k-means evaluation by varying the number of workers and k.

Finally, we evaluate the performance of our algorithms while varying k forthe SUSY data set. Figure 6(d) shows that as expected larger k values are moredemanding and the execution time for all algorithms increases for larger values ofk. Nevertheless, the approximate algorithm is still the fastest, but the returningclusters are not equivalent to the centralized k-means. SRKM algorithm alwaysis the fastest compared to the remaining algorithms and the gain increases withk. Note that the setup k = 10 could not be successfully executed by Mllibalgorithm.

Page 17: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

360 X. Mallios et al.

0

100

200

300

400

500

600

20 30 36

Tim

e (s

ec)

Number of Workers

PRID3SRID3

(a) Nursery

0 200 400 600 800

1000 1200 1400

30 36

Tim

e (s

ec)

Number of Workers

PRID3SRID3

(b) Car

Fig. 7. ID3 evaluation by varying the number of nodes.

7.3 ID3 Evaluation

First, we study the performance of our algorithms compared to the centralizedalgorithm. For the Car and 36 workers, the execution time is 1692 s, 1182 sand 1122 s for the centralized, PRID3 and SRID3 respectively. The centralizedalgorithm is slower than our algorithms, however, the benefit of the distributedexecution is not as fast as expected. Spark works really well for algorithms thatperform multiple iterations over the same data set, like k-means, while in thecase of ID3 each iteration is executed on a different subset of the initial data set.Figure 7 depicts our experimental results for Nursery and Car while varying thenumber of workers. The results show that for both data set and all setups SRID3is faster than PRID3. In SRID3 the master node gathers the aggregated datafrom each worker separately and it aggregates them, while in PRID3, all theworkers participate in the aggregation of the locally aggregated data. However,the size of the collected data is small and the aggregation can be done efficientlyin a centralized way.

8 Conclusion

Data mining algorithms, which entails high processing cost and usually requiresthe execution of iterative algorithms, are very important for big data analyt-ics. Spark has been recently proposed as a framework that supports iterativealgorithms over massive data efficiently. In this paper, we design a frameworkfor clustering and classification of big data suitable for Spark. We study twowell known algorithms namely k-means and ID3. Moreover, we investigate theeffect on our algorithms of applying different restrictions on the data exchangemodel and propose appropriate variants. In order to verify the efficiency of ouralgorithms, we implemented our algorithms over the framework Spark and eval-uated our approach in a 37-nodes cluster. Our experimental results show thatwe outperform the algorithm provided by Spark for k-means up to 31 %.

Acknowledgment. The research leading to these results has received funding fromthe European Union Seventh Framework Programme (FP7/2007-2013) under grant

Page 18: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

A Framework for Clustering and Classification of Big Data Using Spark 361

agreement no. 604102 (Human Brain Project). This work was co-funded by the Euro-pean Union and the General Secretariat of Research and Technology, Ministry OfEducation, Research Religious Affairs under the project (AMOR) of the Bilateral R&TCooperation Program Greece srael 2013–2015. This support is gratefully acknowledged.

References

1. Balcan, M.-F.F., Ehrlich, S., Liang, Y.: Distributed k-means and k-median clus-tering on general topologies. In: Burges, C., Bottou, L., Welling, M., Ghahramani,Z., Weinberger, k. (eds.) Advances in Neural Information Processing Systems 26,pp. 1995–2003. Curran Associates Inc. (2013)

2. Bradley, P.S., Mangasarian, O.L., Street, W.N.: Clustering via concave minimiza-tion. In: Advances in Neural Information Processing Systems, pp. 368–374 (1997)

3. Cheung, Y.-M.: k-means: a new generalized k-means clustering algorithm. PatternRecogn. Lett. 24(15), 2883–2893 (2003)

4. Datta, S., Giannella, C., Kargupta, H.: K-means clustering over a large, dynamicnetwork. In: SDM, pp. 153–164 (2006)

5. Ferreira Cordeiro, R.L., Traina Jr., C., Machado Traina, A.J., Lopez, J., Kang, U.,Faloutsos, C.: Clustering very large multi-dimensional datasets with mapreduce.In: SIGKDD, pp. 690–698 (2011)

6. Fisher, D.H., McKusick, K.B.: An empirical comparison of id3 and back-propagation. In: IJCAI, pp. 788–793 (1989)

7. Guha, S., Meyerson, A., Mishra, N., Motwani, R., O’Callaghan, L.: Clusteringdata streams: theory and practice. IEEE Trans. Knowl. Data Eng. 15(3), 515–528(2003)

8. Guha, S., Mishra, N., Motwani, R., O’Callaghan, L.: Clustering data streams. In:Foundations of Computer Science, pp. 359–366 (2000)

9. Jasso-Luna, O., Sosa-Sosa, V., Lopez-Arevalo, I.: An approach to building a dis-tributed id3 classifier. In: DCAI, pp. 385–394 (2009)

10. Jin, R., Goswami, A., Agrawal, G.: Fast and exact out-of-core and distributedk-means clustering. Knowl. Inf. Syst. 10(1), 17–40 (2006)

11. Kanungo, T., Mount, D.M., Netanyahu, N.S., Piatko, C.D., Silverman, R., Wu,A.Y.: An efficient k-means clustering algorithm: analysis and implementation. Pat-tern Anal. Mach. Intell. 24(7), 881–892 (2002)

12. MacQueen, J.B.: Some methods for classification and analysis of multivariate obser-vations. Berkeley Symposium on Mathematical Statistics and Probability 1, 281–297 (1967)

13. Poteras, C.M., Mihaescu, M.C., Mocanu, M.: An optimized version of the k-meansclustering algorithm. In: Computer Science and Information Systems, pp. 695–699(2014)

14. Quinlan, J.R.: Induction of decision trees. Mach. Learn. 1(1), 81–106 (1986)15. Shi, J., Qiu, Y., Minhas, U.F., Jiao, L., Wang, C., Reinwald, B., Ozcan, F.: Clash

of the titans: Mapreduce vs. spark for large scale data analytics. PVLDB 8(13),2110–2121 (2015)

16. Vaidya, J., Clifton, C.: Privacy-preserving k-means clustering over vertically par-titioned data. In: SIGKDD, pp. 206–215 (2003)

17. Xiao, M.-J., Huang, L.-S., long Luo, Y., Shen, H.: Privacy preserving id3 algorithmover horizontally partitioned data. In: PDCAT, pp. 239–243 (2005)

Page 19: A Framework for Clustering and Classification of Big Data ...static.tongtianta.site/paper_pdf/9e5ed934-7ada-11e... · A Framework for Clustering and Classification of Big Data Using

362 X. Mallios et al.

18. Zaharia, M., Chowdhury, M., Franklin, M.J., Shenker, S., Stoica, I.: Spark: Clustercomputing with working sets. In: USENIX Conference on Hot Topics in CloudComputing, p. 10 (2010)

19. Zhang, J., Wu, G., Hu, X., Li, S., Hao, S.: A parallel k-means clustering algo-rithm with mpi. In: Parallel Architectures, Algorithms and Programming, pp. 60–64 (2011)