university of trento department of information …d3s.disi.unitn.it/~mega/andrey_thesis.pdf ·...

112
UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION ENGINEERING AND COMPUTER SCIENCE DEGREE COURSE IN TELECOMMUNICATION ENGINEERING FINAL THESIS RefineOnSpark: a simple and scalable ETL based on Apache Spark and OpenRefine SUPERVISOR GRADUANT Prof. Alberto Montresor Andrii Bratus SECOND SUPERVISOR Giuliano Mega ACADEMIC YEAR 2013/2014

Upload: others

Post on 04-Jul-2020

0 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

UNIVERSITY OF TRENTO

DEPARTMENT OF INFORMATION ENGINEERING AND

COMPUTER SCIENCE

DEGREE COURSE IN

TELECOMMUNICATION ENGINEERING

FINAL THESIS

RefineOnSpark: a simple and scalable ETL based on Apache

Spark and OpenRefine

SUPERVISOR GRADUANT

Prof. Alberto Montresor Andrii Bratus

SECOND SUPERVISOR

Giuliano Mega

ACADEMIC YEAR 2013/2014

Page 2: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2

Page 3: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Abstract

Over the last decade, big data became a catch-all term for anything that handlesnon-trivial sizes of data. It is used to describe the industry challenge posed byhaving data harvesting abilities that far outstrip the ability to process, interpretand act on that data. Despite the relative nature of the term and the absence of anauthoritative definition of big data, it is widely used at the borderline of business-IT alignment1. Organizations need to align technology and business to answer alarger question: how the most value can be extracted from data? The answer tothis question lies in the large-scale data integration, assured by the special ExtractTransform Load (ETL) class of tools.

ETL tools allow to extract data from variety of sources, transform it intoa clean unified structure and load into a storage or application, where meaning-ful analysis can be performed on it. One of the tools offering a broad transformfunctionality of ETL is OpenRefine2, which includes a variety of data cleaning tech-niques in a single user-friendly application. From its inception in 2010, OpenRefinequickly became an inevitable data wrangling application for a large community ofusers from various industries. However, some of its architectural decisions limitits ability to be used in batch mode and keep up with the large-scale data, whichserves as a motivation for this work: to study and to overcome these limitations.

Initial investigation of the problem showed that because of the centralizedarchitecture and the data model of OpenRefine, the maximum amount of datathat can be processed is limited by the maximum available memory on a singlemachine. In our work, we improve large-scale performance of the tool by expressingOpenRefine computations using the MapReduce distributed programming model.However, part of OpenRefine’s functionality can not be generalized under a singlemodel and requires a specific solution for each computational problem, to whichwe refer as non-embarrassingly parallel problems.

The result of this thesis is a new distributed computational model for the

1Business-IT alignment is a dynamic state in which a business organization is able to use in-formation technology (IT) effectively to achieve business objectives – typically improved financialperformance or marketplace competitiveness.

2http://openrefine.org

3

Page 4: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

4

embarrassingly parallel operations for which we also provide an open source im-plementation. Performance evaluation of our implementation showed that we areable to achieve the desired large-scale behaviour and also improve the performanceof the original implementation in terms of speedup. For the non-embarrassinglyparallel problems this work provides a discussion and a sketch to the solution.

Page 5: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Acknowledgements

This thesis work would not have been possible without the support of many peo-ple. I would like to express my sincere gratitude to my advisors Alberto Montresorand Giuliano Mega for the useful comments and time devoted to this thesis. Fur-thermore, I must say that I have been very lucky to have Giuliano as a teacher,and then as a mentor for this work. I am grateful to him for arousing me with agreat interest in distributed systems during the lectures, for all the time devotedto support me during this thesis work, for sharing essential best practices with meand for all the encouraging and motivating discussions.

Moreover, I would like to acknowledge my friends Ecaterina Bodnari, Fab-rizio Waldner, Luca Gasperetti and Lorenzo Orlandi, with whom we have studiedshoulder to shoulder all this two difficult years. I am thankful to my very bestfriend from Kiev, Timur for all the support I got from him during my stay inTrento. Furthermore, I am grateful to Anya for encouraging me to always keepgoing with a smile on my face. Finally, I would like to thank my family for all thelove and support they award me with during all my life.

Page 6: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

6

Page 7: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Contents

1 Introduction 111.1 Context and motivation . . . . . . . . . . . . . . . . . . . . . . . . 121.2 Thesis roadmap . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 131.3 Contribution of the thesis . . . . . . . . . . . . . . . . . . . . . . . 15

2 Background 172.1 Big Data . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 18

2.1.1 The economy of big data . . . . . . . . . . . . . . . . . . . . 182.1.2 Characteristics of Big Data . . . . . . . . . . . . . . . . . . 19

2.2 Parallel and distributed computing . . . . . . . . . . . . . . . . . . 212.2.1 Paralelization strategies . . . . . . . . . . . . . . . . . . . . 212.2.2 Parallel and distributed computer hardware architectures . . 232.2.3 Parallel computing for big data. . . . . . . . . . . . . . . . . 24

2.3 Big data processing on large clusters . . . . . . . . . . . . . . . . . 262.3.1 MapReduce - a distributed programming model . . . . . . . 262.3.2 Large-scale distributed filesystem - Google File System . . . 302.3.3 Open source framework - Apache Hadoop . . . . . . . . . . 322.3.4 In-memory cluster computing with working sets - Apache

Spark . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 342.4 Data integration . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 37

2.4.1 Data transformation and cleansing . . . . . . . . . . . . . . 372.5 Available ETL tools . . . . . . . . . . . . . . . . . . . . . . . . . . 39

2.5.1 Pentaho Kettle . . . . . . . . . . . . . . . . . . . . . . . . . 392.5.2 OpenRefine . . . . . . . . . . . . . . . . . . . . . . . . . . . 41

3 Approach 473.1 Problem description . . . . . . . . . . . . . . . . . . . . . . . . . . . 48

3.1.1 Interaction between GC and the OpenRefine engine . . . . . 483.1.2 Microbenchmarking the OpenRefine Engine . . . . . . . . . 503.1.3 Problem statement and summary . . . . . . . . . . . . . . . 57

3.2 Solution . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 60

7

Page 8: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

8 CONTENTS

3.2.1 Distributed approach: an overview . . . . . . . . . . . . . . 613.2.2 RefineOnSpark: solving the embarrassingly parallel transforms 643.2.3 Solving the non-embarrassingly parallel transforms . . . . . 753.2.4 Distributed engine: summary and outlook . . . . . . . . . . 79

4 Evaluation 814.1 Experimental setup . . . . . . . . . . . . . . . . . . . . . . . . . . . 814.2 Evaluation and metrics . . . . . . . . . . . . . . . . . . . . . . . . . 844.3 Results . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 854.4 Summary and outlook . . . . . . . . . . . . . . . . . . . . . . . . . 90

5 Conclusions and future work 93

A Baseline 97

B Batchrefine and OpenRefine performance comparison 101

C Numerical data of the evaluation 105

Page 9: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

List of Figures

2.1 Three V’s characterizing big data . . . . . . . . . . . . . . . . . . . 20

2.2 Linear speedup: from theoretical to real . . . . . . . . . . . . . . . 25

2.3 MapReduce execution flow . . . . . . . . . . . . . . . . . . . . . . . 29

2.4 GFS architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31

2.5 Hadoop cluster architecture . . . . . . . . . . . . . . . . . . . . . . 33

2.6 Pentaho Kettle GUI . . . . . . . . . . . . . . . . . . . . . . . . . . 40

2.7 OpenRefine GUI . . . . . . . . . . . . . . . . . . . . . . . . . . . . 43

2.8 OpenRefine architecture . . . . . . . . . . . . . . . . . . . . . . . . 44

2.9 OpenRefine data model . . . . . . . . . . . . . . . . . . . . . . . . . 45

3.1 OpenRefine workload . . . . . . . . . . . . . . . . . . . . . . . . . . 51

3.2 Time required to apply the transform for various input data sizes . 56

3.3 GC Overhead during the apply operation phase . . . . . . . . . . . 57

3.4 Time required to load various file sizes into the engine . . . . . . . . 58

3.5 Space dimension scalability: minimum and maximum memory re-quirements to load data and apply transform . . . . . . . . . . . . . 59

3.6 Space dimension scalability: GC overhead for the 2 million linesinput file vs. various heap sizes . . . . . . . . . . . . . . . . . . . . 60

3.7 Fill down cells: Case A - correct, Case B - two chunks, incorrect . . 63

3.8 Transition to distributed architecture represented by multiple copiesof OpenRefine . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 65

3.9 Instruct parallel copies to perform transformations independentlyon each split . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 66

3.10 OpenRefine within Spark cluster resources . . . . . . . . . . . . . . 67

3.11 After partitioning header appears only in the first partition . . . . . 70

3.12 Network File System . . . . . . . . . . . . . . . . . . . . . . . . . . 70

3.13 Cluster architecture with HDFS . . . . . . . . . . . . . . . . . . . . 71

3.14 Application workflow . . . . . . . . . . . . . . . . . . . . . . . . . . 72

3.15 Blank down cells operation . . . . . . . . . . . . . . . . . . . . . . . 75

3.16 Fill down cells operation . . . . . . . . . . . . . . . . . . . . . . . . 76

9

Page 10: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

10 LIST OF FIGURES

3.17 Columnize by key/value example: key column - Tipo, value column- Insegna . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 77

3.18 Transpose 3 cells across column B into rows . . . . . . . . . . . . . 783.19 Transpose 3 cells across column B in parallel on two partitions . . . 79

4.1 Blank down cells operation . . . . . . . . . . . . . . . . . . . . . . . 834.2 Total processing time for the Distributed and Centralized engine . . 864.3 Total processing time for the Distributed and Centralized engine . . 874.4 Number of workers utilized for the input file size . . . . . . . . . . . 884.5 Average processing time per node and scheduling delay for various

input file sizes . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 89

B.1 Batchrefine and OpenRefine comparison: total time . . . . . . . . . 102B.2 Batchrefine and OpenRefine comparison: GC overhead . . . . . . . 102

Page 11: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Chapter 1

Introduction

“You can never have too much data – bigger is always better” – the words ofAmazon’s chief technology officer Werner Vogels [1]. Indeed, the value of datais hard to underestimate, more and more fields of our life are becoming datadriven, meaning that they are either controlled or at least influenced by data.Organizations tend to collect trillions of gigabytes arriving from various sources inorder to allow decisions to be made based on real facts rather than intuition. Apartfrom human resources, data became the most valuable asset of every organization.

Big data has become one of the most widely used terms in information tech-nology nowadays. Its inception has been caused by the digital revolution we havebeen facing for the last 20 years, where the amounts of generated data increaseat incredibly high pace. Social media, web interactions, numerous sensing devices– are only few of the available data sources. It is claimed that 95% of the wholeavailable data to the world today has been generated in the last two years [2]. Theword “data” takes its root from the Latin word “datum”, which means “thingsgiven”. Indeed, data are raw facts that can be seen as a reward for careful observa-tion, which we later use to extract meaningful information from. The traditionalway of managing data within an organization relies on relational databases, wherecaptured data is organized in a tabular form by applying some schema on top ofit. Relational databases allow to easily access and analyze the data by issuingqueries, typically written in a Structured Query Language (SQL). However, “Bigdata” refers to datasets whose size is beyond the ability of typical database soft-ware tools to capture, store, manage and analyze. Big data is too large, arrivestoo fast and is too diverse to be loaded into a database and therefore requiresnovel approaches to be managed. Big data is typically characterized by its variety,velocity and volume, which underline the heterogeneity of sources and generationspeed along with its size. All of the three characteristics complicate the processof inferring useful information from raw data, and therefore require special ap-proaches to deal with it. The process of preparing data for analytical use is called

11

Page 12: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

12 CHAPTER 1. INTRODUCTION

data integration, which is the problem of combining data coming from differentsources, of different structure and semantics, and to provide a user with the uni-fied view on it. Data integration relies on a special class of tools, typically referredto as Extract, Transform, Load (ETL) tools. The name highlights different stagesthe data has to go through from the point it has to be collected and cleaned up tothe point where a semantically structured and clean data is delivered for furtheranalytic purposes.

1.1 Context and motivation

This thesis work focuses on the transform part of the ETL, as it is where the ac-tual processing and changes are performed. Certainly, extract and load phases arealso important, but these steps are only responsible for efficiently relocating data,whereas all the structuring and cleaning operations are performed at the transformstep. Among the typical problems that have to be solved in the transform step are:different table organization (different column ordering), duplicate entries, differentformatting (date, addresses) and many others [3]. The process of resolving suchkind of issues is typically referred to as data wrangling and is enabled by the useof numerous techniques, such as: regular expressions, string distance metrics (e.g.Levenshtein distance), reconciliation with external sources, etc. A typical messydataset requires a combination of these techniques (with various configuration pa-rameters) to be applied on it, which increases the demand for data wrangling toolsthat facilitate the wrangling procedures by providing a general platform for suchoperations. OpenRefine [4] is one of the well-established tools aimed at facilitatingdata wrangling operations by gathering the required techniques in a single softwarepackage and providing a graphical user interface to apply these techniques on data.Moreover, it provides the ability to perform interactive exploratory activities ondata, such that a user can detect a problem, apply the required cleaning techniqueand observe the result immediately. Interactivity is mainly enabled by the factthat users typically operate on the datasets of modest size, which allows all thedata to be loaded into the machine’s main memory and therefore results in fasterdata manipulation.

OpenRefine is a powerful open source tool to work with messy data with alarge community of users in various industries. In particular, for this work, wecollaborate with SpazioDati s.r.l [5], which is a company working in the field ofsemantic web and linked data technologies, and OpenRefine is an important partof their data processing toolkit. From the community mailing list we noticed thatthe number of reports on OpenRefine’s poor performance when working with in-creasingly large datasets has increased in the last two years. SpazioDati confirmedthe issue to be a hot topic in the field of semantic web and expressed their interest

Page 13: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

1.2. THESIS ROADMAP 13

in the ability to improve the performance and scalability of OpenRefine.

Preliminary investigation showed that the fact that data has to be loadedinto memory is a deliberate design choice to provide an interactive desktop toolfor data cleaning of small to mid-size datasets. However, due to the successfulimplementation and a comprehensive collection of data cleaning techniques thereis an increasing demand for using OpenRefine as a batch processing tool withina data processing pipeline. This thesis work objective is to work out and over-come scalability limitations of the OpenRefine processing engine and we considerdistributed computing as a key approach to achieve a scalable batch processingimplementation of the original OpenRefine functionality.

1.2 Thesis roadmap

Chapter 2 introduces the background concepts used throughout this work and pro-vides an overview of the important tools and frameworks. Section 2.1 provides thedefinition of what is considered to be big data, its characteristics and impact onmodern businesses. Further, in Section 2.2 we discuss parallel computing, formsof parallelism and how these techniques are related to the distributed processingof massively large datasets. Section 2.3 is an important part of the required back-ground, which describes the MapReduce distributed programming model and howit is applied to process large datasets on computing clusters. After presenting thedetailed description of the model, we present two widely used frameworks basedon MapReduce, namely Apache Hadoop and Apache Spark. Further, we explainthe process of data integration and the ETL toolkit which are of the prior impor-tance when dealing with big data. From this point we focus our attention on thetransform part of the ETL and explain it by providing a step-by-step description ofdata cleaning in a real world scenario (Section 2.4.1). We conclude the backgroundchapter by presenting two of the available open source projects that provide ETLfunctionality. Section 2.5.2 contains an essential description of the OpenRefine ar-chitecture that is developed from summarizing the available documentation alongwith the close inspection of the source code of the application.

Chapter 3 is the core part of the work, which presents our approach to theproblem, a comprehensive discussion on the solution and a detailed descriptionof its implementation. We begin the chapter, by describing the peculiarities ofthe JVM memory management and its impact on the performance of OpenRefine.Understanding the interaction between the garbage collector and OpenRefine isimportant because of the direct impact on the engine’s performance and there-fore has to be considered in the estimation of the engine scalability. Section 3.1.2presents and explains our microbenchmarking approach to the performance eval-

Page 14: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

14 CHAPTER 1. INTRODUCTION

uation and quantifying the scalability of OpenRefine. Our microbenchmarkingapproach aims at assessing the engine from the perspective of time and space,which provides a broader view on the engine performance and allows to identifythe critical points and bottlenecks for the scalability. The results of performanceassessment with low GC overhead (less than 15%) are used to set the baselinefor this project (Appendix A). Based on the benchmarking results, Section 3.1.3summarizes and formulates the problem and sets the required direction for thesolution.

Our solution (Section 3.2) starts by analysing previous attempts to improvethe scalability of the OpenRefine engine. For example, one of the previous at-tempts approached the problem by externalizing the actual data storage from theOpenRefine engine into an external database, which allows to process files of largersize. However, we consider the cost of communication between the database andsequential processing of data not suitable for our objective to increase the scala-bility. Our objective is to achieve fast and scalable OpenRefine processing enginefor batch tasks. Therefore, from this point we focus our attention on the server-side of the OpenRefine application. In Section 3.2.1 we introduce our approach tosolve the scalability problem of the OpenRefine engine by distributing both dataand computations across a cluster of machines. During the design of the solu-tion we faced that a sizeable fraction of OpenRefine’s computational tasks can begeneralized and approached with a common solution, to which we refer as embar-rassingly parallel. However, there is also another class of computational tasks thatcan not be solved as a part of the general solution and require individual inves-tigation (non-embarrassingly parallel). Section 3.2.2 presents our solution for theembarrassingly parallel class of operations. We adopt the MapReduce distributedprogramming model to express our computations in terms of a map() function. Oursolution can be seen as a distributed platform for the OpenRefine, as it allows tobenefit from distributed data processing, while keeping the integrity of the originalimplementation. Throughout Section 3.2.2 we describe our model for distributedOpenRefine, its workflow and present the RefineOnSpark application, which isthe implementation of the model. We conclude the section with a preliminaryperformance comparison to the original implementation, which shows that we areable to at least double the performance, compared to the original implementation.Section 3.2.3 introduces the problem of non-embarrassingly parallel class of opera-tions. For each of the transform of this class, we provide a discussion on what arethe dependencies during the parallel execution and provide a sketch to the solution.

In Chapter 4 we evaluate the performance of our model by applying it on areal world cluster of commodity hardware. We begin the evaluation by describ-ing the experimental setup from both a hardware and software perspectives and

Page 15: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

1.3. CONTRIBUTION OF THE THESIS 15

follow it by presenting the metrics we use to measure and compare performance.Section 4.3 presents the evaluation results that show that using our model we areable to achieve scalable behaviour and to keep the processing time nearly constantby increasing the number of employed parallel resources. In addition to the per-formance results we were able to estimate the scheduling delay for the distributedprocessing and came to a conclusion that it is an absolutely acceptable price topay in return for the achieved processing speedup. We also use our evaluationresults to compare our implementation with the previously established baseline,which show that on our setup the values of speedup are in the range from 2 to10. We present a summary of the evaluation procedure and discuss the results inSection 4.4.

Chapter 5 concludes this work by reviewing what we have been doing so far,summarizing our achievements and describing what is the future work enabled bythis thesis.

1.3 Contribution of the thesis

The contribution of this thesis can be summarized as:

1. the proposed distributed programming platform for the OpenRefine, whichallows to improve the performance and to achieve scalability of the OpenRe-fine without introducing changes to the original implementation;

2. the implementation of our distributed model for the OpenRefine engine isan open source project, freely available for download;

3. detailed investigation of the OpenRefine engine, its internal data organiza-tion and analysis of data transformation algorithms allowed us to classifythe engine functionality depending on the complexity to be implementedin parallel fashion (embarrassingly and non-embarrassingly parallel). More-over, some of the implementation details were extracted from the source codeanalysis of the tool that are not available in the available documentation;

4. a comprehensive discussion and sketch to the solution for the class of trans-forms classified as non-embarrassingly parallel;

The achieved speedup and the level of scalability, provided by our model, enablesthe direction for a new implementation of OpenRefine. The proposed model canbe adopted as the core for data processing and management, resulting in a fast andmore scalable version of OpenRefine implementation, suitable for batch workloads.

Page 16: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

16 CHAPTER 1. INTRODUCTION

Page 17: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Chapter 2

Background

We can’t solve the problems byusing the same kind of thinking weused when we created them.

Albert Einstein

This chapter introduces the background concepts and ideas used throughoutthis work. In Section 2.1 we begin with an overview of the notion of Big Data,its characteristics and how it is used for business purposes by extracting valuableinformation out of it. To extract the valuable information out of the large poolsof data it has to be aggregated and processed. Moreover, this procedures have tobe performed in a reasonable time. A common way to speed up computations isby parallelizing them into a number of concurrent tasks. Therefore, in Section 2.2we explain the importance of parallel computing models and describe two typicalforms of parallelism. However, programmers usually try to avoid writing parallelcode, as it is more difficult to design an maintain. For this reason, there is a con-siderable interest in frameworks that simplify this tasks, by letting programmersto express only the required tasks and leave all the parallelization complexity tothe framework. One of the most widely used models for such frameworks is theMapReduce distributed programming model, that we discuss in Section 2.3. Wecontinue our discussion of MapReduce with frameworks that are based on thismodel and architectures used to complement it. From this point we resume ourdiscussion about Big Data with Data Integration, which is the process data typi-cally takes from acquisition to the point where it can be used for analysis purposes(Section 2.4). The process of data integration comprises three stages that data hasto go through: Extract, Transform and Load (ETL), which at the same time is thename for a specialized class of tools. We conclude this chapter with an overviewof the available tools that provide ‘Transform’ capabilities of the ETL, as data

17

Page 18: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

18 CHAPTER 2. BACKGROUND

transformation is the main focus of this work.

2.1 Big Data

The amount of data that is generated at every moment is constantly growing:companies capture trillions of bytes of information about their customers, oper-ations and a variety of internal and external processes. Such large pools of dataare captured, communicated, aggregated, stored, and analyzed to extract valuableinformation out of them. A good example of big data is “clickstream” data, whichis a data trail a user leaves while visiting a website. Clickstreams are capturedin the form of a semi-structured weblogs, which are used to derive the followingmetrics: page views, average time on site, user actions on site. Analysis of thesemetrics over time may be used to understand the popularity of the web page, theaverage number of visits before customers buy something, what they buy [6], etc.

Typically, data analytic tasks are preformed by loading data of requiredstructure into a database and continuously querying it. However, the notion of BigData applies to information that can’t be processed or analyzed using traditionaltools [7], such as databases. Therefore, a common issue faced by most of theorganizations is that they have access to a wealth of information, but they don’tknow how to get value out of it.

In defining big data, it is also important to understand the mix of unstruc-tured and structured data that comprises the volume of information that we dealwith. Unstructured data comes from information that is not organized or easilyinterpreted by traditional databases or data models, and typically, it comes in theform of free text. Examples of unstructured data are social media posts, docu-ments and search queries. Data is considered to be structured when all the recordelements reside in a fixed field, such as data contained in the relational databaseor spreadsheets. Structured data has the advantage of being easily edited, stored,queried and analyzed. Moreover, it is reported that only 20% of the data availableis structured, while 80% happens to be unstructured [8]. By far, unstructureddata is the largest piece of the big data environment, and the use cases for it arerapidly expanding.

2.1.1 The economy of big data

In 2010, The Economist [9] asserted that data has become a factor of production,almost on par with labor and capital. The International Data Corporation [10],research and advisory firm specialized in information technology, predict that thedigital universe will be 44 times bigger in 2020 than it was in 2009, totaling in a

Page 19: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.1. BIG DATA 19

staggering 35 zettabytes [11]. Indeed, with the progress of applying informationtechnology within business, each step results in a leap in data volume.

The value of big data to an organization falls into two categories: analyt-ical use and enabling new products [12]. Big data analytics can reveal hiddeninsights such as peer influence among customers, revealed by analyzing shopperstransactions, social and geographical data. In the past decades, successful webstartups became a prime example of big data used as an enabler of new productsand services. For example, by combining a large number of signals coming fromusers actions and those of their friends, Facebook has been able to craft a highlypersonalized user experience and create a new kind of advertising business [13].

Another recent product, requiring big data processing has been started byGoogle, named “Flu Trends” [14]. The main objective of the system is to detectan influenza-like disease activity by analyzing data from a variety of sources. Inattempt to provide faster detection, such an innovative surveillance system moni-tors indirect signals of influenza by analyzing drug sales, telephone advice line callrecords along with search queries on the web.

2.1.2 Characteristics of Big Data

Intuitively one would characterize something called Big Data by its size, howeverthere are other important attributes. It is typically characterized by three features:volume, variety and velocity [15]. The three V’s notation is shown on Figure 2.1and explained further in detail:

• Volume is usually the primary attribute, describing the amount of data,typically in terabytes (TB) but can be expressed in any multiple of the unitbyte for digital information. However, data volume can also be describedin terms of records (lines), transactions, tables or files count. Sometimesvolume can be quantified even in terms of time. For example, it can be twoyears of server logs.

• Variety is an important feature, as it describes the heterogeneity of data.Typically big data analytic tasks require data to be collected from a varietyof sources, where each data source has its own data structure (or even messydata without a particular structure). It could be text from social networks,image data or even raw feeds from sensor devices. Managing, merging andgoverning different varieties of data is something many organizations haveto grapple with.

• Velocity or speed can be seen as the frequency at which data is generatedand needs to be handled. Depending on the application, dealing effectivelywith Big Data may require that analytics are performed in real-time, while

Page 20: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

20 CHAPTER 2. BACKGROUND

Figure 2.1: Three V’s characterizing big data

data is still in motion. Such input is typically referred to as streaming data.There are two main reasons to consider streaming processing. The first iswhen the input data is too large to be stored in its entirety: in order to keepstorage requirements practical some level of analysis must occur as the datastreams in and is, later, discarded. The second reason to consider streamingis where the application has to provide immediate response to the data.For instance, bank fraud detection systems tend to analyze transactions inrealtime to detect anomalous behavior. Such systems are expected to negatetransactions before they are completed, but have to be absolutely transparentfor the truthful customers. On the other velocity extreme, we have data forbatch processing, that is analyzed after it has been completely loaded intostorage.

What actually distinguishes big data, aside from its variety, volume and veloc-ity, is the potential to analyze it to reveal new insights and to optimize decisionmaking, which in turn depends on how much the data can be trusted. Collectinginformation from various sources where the data can contain errors requires a datacuration step. Collected data may be outdated, duplicated, conflicting or inten-tionally wrong. Therefore, a system dealing with lots of data from sources, whichcredibility can not be identified, has to provide functionality to assure the qualityof the data and to determine the subset that can be treated as trustworthy [16].

Page 21: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.2. PARALLEL AND DISTRIBUTED COMPUTING 21

2.2 Parallel and distributed computing

Traditionally, computer software has been written for serial computation, wherean algorithm consists of a series of instructions and each instruction is executed ona central processing unit of one computer. Only one instruction may be executedat a time and the next one only after the previous one had completed. Parallelcomputing refers to the form of computation in which several calculations areperformed simultaneously, using multiple processing elements. It operates on theprinciple that larger problems can often be dived into smaller ones so that morethan one processing element can execute part of the algorithm simultaneously withothers.

Parallel computing has been employed for many years in high performancecomputing (HPC), typically in scientific computations, where complex purpose-built machines and compilers were used. Since 2004 the interest in parallel com-puting for consumer and enterprise applications has grown due to the physical con-straints preventing frequency scaling of CPUs [17]. Increasing chip performanceby scaling up the frequency became a difficult task due to the increasing powerconsumption and heat dissipation that causes silicon chips to melt. Therefore, tocontinue doubling the computer performance every 18 month (Moore’s law [18]),CPU manufactures switched to the multicore technology by placing several dieson a single CPU. Therefore, to fully exploit the available computational powerdevelopers had to switch to writing parallel code.

The first step in parallel programming is the design of a parallel algorithm orprogram for a given application problem. The design starts with the decompositionof computations of an application into several parts, called tasks, which can becomputed in parallel on multiple processors/cores of the parallel hardware. Thecontrol over tasks have to be taken into consideration to ensure that the parallelprogram produces the same results as the sequential program for all possible inputvalues. The transformation of a problem into parallel tasks is also referred to asparallelization, which can occur at different levels.

2.2.1 Paralelization strategies

The computations performed by a given program provide opportunities for paral-lel execution at different levels: instruction level, data level, loop level and tasklevel [18]. Further, we present the two most relevant for our context parallelisationstrategies, that are data and task level strategies.

Task Parallelism. Many sequential programs contain processing tasks that areindependent from each other and can be executed in parallel. By identifying suchindependent program parts and transforming them in parallel tasks, this form of

Page 22: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

22 CHAPTER 2. BACKGROUND

parallelism is called task parallelism or sometimes functional parallelism. Segmentsof the program that are not required to be sequential are broken into smaller pieces(tasks) that can run simultaneously on different processing units, thus reducingthe total time it takes to run the program or to allow modelling of an inherentlyparallel problem. Task parallelism is suitable for many application, but in generalit is considered to be a complex task, especially when it has to be defined explic-itly by developers, as coding complexity increases with additional overhead relatedto: coordination of concurrent tasks, shared memory locking and synchronization,parallelization of algorithms. A simple example of task parallelism is faced by usevery day in a web browser, where user interface controls remain responsive, mean-while the requested page is loading. Therefore, page rendering, request handlingand user interface can be seen as separate tasks executed in parallel, but are alsosynchronized between each other. Software can be designed to use task parallelismexplicitly by the programmer or implicitly by the compiler based on program sourceanalysis. In implicit case, to generate the parallel program, the compiler must firstanalyze the dependencies between the computations to be performed. Based onthis analysis, the computation can then be assigned to processors for executionsuch that a good load balancing results. In practice, automatic parallelization isdifficult to perform because dependence analysis is difficult, as the execution timeof function calls or loops with unknown bounds is difficult to predict at compiletime [18]. Moreover, languages with implicit parallelism reduce the control thatthe programmer has over the parallel execution of the program, resulting some-times in less-than-optimal parallel efficiency.

Data Parallelism. In many programs, the same operation must be applied toall of the elements in a larger data structure. If the operations to be applied areindependent of each other, this could be used for parallel execution. The elementsof the data structure are distributed evenly among the processing units and eachprocessing unit performs operations on its assigned elements. Such form of paral-lelism is called data parallelism. The resulting execution scheme for data-parallelapplications is referred to as Single-Program, Multiple Data (SPMD) model. Itmeans that one parallel program is executed by all processors in parallel. Programexecution is performed asynchronously by the participating processors. Using theSPMD model, data parallelism results if each processor gets a part of data struc-ture for which it is responsible. For example, each processor could get a part ofan array identified by a specific partitioning function. In practice, most of parallelprograms nowadays are indeed SPMD programs. Moreover, data parallelism canbe exploited for both shared and distributed address spaces. For a distributedaddress space, the program data must be distributed among the processors suchthat each processor can access the data that it needs for its computations directly

Page 23: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.2. PARALLEL AND DISTRIBUTED COMPUTING 23

from its local memory. Therefore, the distribution of data and computation isdone in a way such that each processor performs the computations specified in theprogram on the data that it loads in its local memory.

Dependencies. Concurrently running tasks may depend on the same data orone task may require the result from another task. Dependencies are the primarycomplication factor for the parallel execution and require a substantial effort incoordination and synchronization between dependent tasks. However, most algo-rithms do not consist of just a long chain of dependent calculations, which are theopportunities to execute independent parts in parallel, and synchronize only whenneeded.

Computational problems are often classified according to how often theirtasks need to synchronize or communicate with each other, with respect to thetotal execution time:

• fine-grained parallelism – subtasks have to communicated often;

• coarse-grained parallelism – subtasks have to communicate, but not veryoften;

• embarrassingly parallel – subtask have to communicate rarely or never.

Embarrassingly parallel applications are considered to be the easiest to paral-lelize. However, if substantial portion of a program requires sequential processing,not much of a speedup can be achieved by adding more processors. This is for-malized in the Amdahl’s law [18], which provides an estimate of the maximumachievable speedup depending on the fraction of the sequential part of the algo-rithm.

2.2.2 Parallel and distributed computer hardware archi-tectures

All parallel computer architectures consist of more than one processing unit butdiffer in the type of main memory and the distance between the units. Main mem-ory can be of two types: it is either shared or distributed between all the processingelements of a parallel system. In case of shared memory, processors share the sameaddress space between them and coordinate by reading and writing to the commonmemory location. In case of distributed memory each processing unit manages itsown address space and coordination is performed by exchanging messages, typi-cally referred to as message passing [19].

Page 24: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

24 CHAPTER 2. BACKGROUND

Multicore computing. A multicore processor consists of multiple executionunits (cores) on the same chip, which can execute multiple instructions comingfrom different instruction streams. As these cores are located on a single chip,they share common memory address space. Nowadays, such processors are foundin all the consumer electronics from a smartphone and mid-price laptop to thehigh-end mainframe computer.

Cluster computing. A cluster is a group of loosely coupled computers thatwork together closely, so that in some respect they can be regarded as a singlecomputer. Clusters are composed of multiple standalone machines connected bya communication network. Each node of a computing cluster has its own memoryand there is no common memory address space. Currently, the most popular typesof clusters are the commodity hardware clusters that are composed from the off-the-shelf computers and connected with a common TCP/IP Ethernet local areanetwork. The increase in their popularity is caused by the decreasing cost of theoff-the-shelf hardware, which still provides reasonable computational power. Ofcourse, the cost of constructing and managing such clusters is still high, but it ismuch lower compared to the purpose-built mainframes. Computers within a clustercan be heterogeneous, but this complicates the load balancing between them. Suchthat the task of the same size would take longer to complete on the slower computerand therefore would decrease the overall performance of the cluster. However, animportant feature of commodity clusters is the ease of scalability, as adding moremachines to the commodity cluster is a trivial task. The disadvantage of suchcollections of computers are the lower values of mean time between failure, thatdecreases with larger cluster sizes. Therefore, fault tolerance and failure recoveryis an important feature for the distributed applications running on commodityhardware.

2.2.3 Parallel computing for big data.

The pace of data generation is constantly increasing and organizations tend tofully leverage the potential hidden inside these astonishingly large volumes. Pro-cessing large volumes of data requires high degree of parallelism to complete it in areasonable time. Moreover, performance requirements to the data processing tasksare also increasing, especially in time critical applications. These requirements aretypically expressed in terms of scalability and speedup, which we explain next.

Speedup. Speedup is one of the most important measures in parallel computing,the one that quantifies how much faster a parallel algorithm runs with respect tothe sequential one. Speedup on a problem of size X with N parallel processingelements is equal to the ratio between the time it takes to solve this problem using

Page 25: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.2. PARALLEL AND DISTRIBUTED COMPUTING 25

one processing element and the time it takes to solve the same problem using Nprocessing elements.

speedup(N,X) =time(1,X)

time(N,X)(2.1)

If for the same problem size X the speedup increases linearly as a function ofincreasing number of processors N , then we speak about linear speedup. Linearspeedup means that the overhead of a parallel algorithm is always in the sameproportion with its running time, for whatever number of N . In particular, caseof time(N,X) = time(1, X)/N is referred to as ideal speedup or perfect linearspeedup. It is the maximum theoretical value of the speedup a parallel algorithm

Figure 2.2: Linear speedup: from theoretical to real

can achieve for the fixed problem size X. In practice, it is hard to achieve alinear speedup, as coordination overhead increases with the number processingelements N . What is found in practice is that parallel programs are able toachieve sub-linear speedup (Figure 2.2), which is the case for the following equa-tion: time(N,X) > time(1, X)/N . However, there are also debates on whetherachieving super-linear speedup is theoretically possible, which is the case describedby the following equation: time(N,X) 0 time(1, X)/N [20].

Scalability. Scalability is a measure of describing whether a performance im-provement can be achieved proportionally to the number of employed processingunits. Often, for a fixed problem size X and increasing number of processing unitsN at some point we observer a saturation of the speedup. Whereas by increasingthe problem size for the fixed number of processing units, leads to an increase inthe attained speedup. In this sense, scalability captures the property of parallelimplementation that the time required to process a task of a given size can be keptconstant by increasing both the size of the problem and the number of assignedprocessors. Thus, scalability is an important property of parallel programs since itexpresses that larger problems can be solved in the same time as smaller problemsif sufficient number of computational resources is employed.

Page 26: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

26 CHAPTER 2. BACKGROUND

To achieve scalable behaviour of parallel applications, larger amount of re-sources should be employed to them. Methods of adding more resources for aparticular application fall into two broad categories: horizontal and vertical scal-ing. Vertical scaling (scaling up) – means to add resources to a single node in asystem, typically involving the addition of CPUs or memory to a single computer.To scale horizontally (or scale out) means to add more nodes to a system, such asadding a new computer to a distributed software application. There are trade-offsbetween the two models. Larger numbers of computers means increased manage-ment complexity, as well as a more complex programming model and issues such asthroughput and latency between nodes. However, the vertical scalability is usuallythe first to hit the physical limitations.

Horizontal large-scale data processing. Analyzing big data may require hun-dreds of servers running massively parallel software. At this step large amount ofwork has to be done in order to efficiently distribute programs and data to multi-ple processing units. Moreover, with larger number of distributed computers, theprobability of failure increases and has to be taken care of. At this step develop-ers found themselves in a situation where the actual processing code is small andsimple but it is obscured with a heavy amount of boilerplate code related to taskscheduling, synchronization and failure handling. Therefore, the development ofdistributed programming models and frameworks that take care of the coordina-tion and distribution of tasks that can be applied to solve various problems becamea hot topic in the last years.

2.3 Big data processing on large clusters

Today’s sheer volume of data that Internet services work with has led to an in-terest in parallel processing on commodity hardware [21]. This section providesan overview of the most well established MapReduce distributed programmingmodel and open-source framework based on it. Initially we present MapReduce(MR) [22] – a programming model for large-scale data processing on commodityhardware. Further we describe the Google File System (GFS) – a distributedfile system architecture, which is an important used to complement the MapReduceprogramming model. We conclude this section with an overview of the availableopen-source frameworks that provide an implementation of the model for large-scale data processing on commodity hardware.

2.3.1 MapReduce - a distributed programming model

MapReduce emerged as an important programming model for large-scale data-parallel applications [22]. The MapReduce model popularized by Google is attrac-

Page 27: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.3. BIG DATA PROCESSING ON LARGE CLUSTERS 27

tive for ad-hoc parallel processing of arbitrary data, and it is today seen as animportant programming model for large-scale data-parallel applications.

MapReduce is a programming model for processing large data sets intendedto be run on large clusters of commodity machines. The model is considered tobe simple as it has mainly two functions map() and reduce(), however a largenumber of analytical computations can be expressed as a set of MapReduce jobs.MapReduce initial’s intent was to present an abstraction that allows expressingsimple map and reduce computations but to hide the complexity of parallelization,fault-tolerance, data distribution and load balancing. Users specify a map functionthat processes key/value pairs to generate a set of intermediate key/value pairs,and a reduce function that merges all intermediate values that are associated withthe same key.

MapReduce inalienable components are the user application, and master andworker nodes of the cluster. User application specifies the input data, map andreduce tasks and where to output the result. This specification of input data andtasks forms a MR job. User application connects to the master node in order tosubmit the job to the cluster, where master node is in charge of scheduling taskson the worker nodes and tracking job’s completion status. Worker node is theactual computational resource of the cluster, which receives and executes tasksand reports to the master about the task status.

Job execution flow

Let’s have a look on the job execution flow using a common example forwhich the aim is to build an inverted index. The inverted index problem is thecase when you have a large amount of documents and you would like to build anindex of these documents in a way that you generate and index of entries (words)each pointing to the documents where this words are present. The purpose of aninverted index is to allow fast full text searches, at a cost of increased processingin advance. We consider an inverted index example with its input and output pre-sented in Table 2.1. As an input to our MR “inverted index” program we have alist of social network posts (tweets) in the form of key/value pairs, where the key isthe tweet id (tweet1, tweet2...) and value is the contents of the tweet (words). Thedesired output is also a set of key/value pairs, where keys are the words that occurin tweets and value is a list of tweet ids that contain this word. The correspondingmap and reduce function pseudocode is presented in the Listing 3.1 and 2.2.The map function, takes as an input a key-value pair, where the key is the tweet idand the value is the text of the tweet. The result of the map function are the inter-mediate key/value pairs for each word in the contents of the tweet: word/tweetId.

The output of various map tasks is aggregated in a way that same wordsappear in the same group, and these groups are used as the input for the reduce

Page 28: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

28 CHAPTER 2. BACKGROUND

Listing 2.1: Inverted index map function

map(tweetId, tweetText) {

for (word in tweetText) {

emitIntermediate(word, tweetId);

}

}

Input Desired outputtweet1, (I love pancakes for breakfast)tweet2, (I dislike pancakes)tweet3, (What should I eat for breakfast?)tweet4, (I love to eat)tweet5, (They eat pizza for dinner.)

pancakes, (tweet1, tweet2)breakfast, (tweet1, tweet3)eat, (tweet3, tweet4,tweet5)love, (tweet1, tweet4)...

Table 2.1: Input and desired output for the inverted index example

tasks. The reduce function takes as an input each word/tweetId pair and groupstweetIds by word. Therefore, a final key/value pair is emitted, where the key is aword and value is the list of tweet ids where this word is present.

Listing 2.2: Inverted index reduce function

reduce(word, values) {

for (tweetId in values) {

AddToOutputList(tweetId);

}

emitFinal(word,List<tweetId>);

}

An example of job execution flow is shown on the Figure 2.3 and has thefollowing steps:

1. The MapReduce library in the user program splits the input files into 5 equalpieces, such that in this particular example each tweet becomes a separatesplit (partition). Then, the user program communicates the job to the masternode, which in turn starts up many copies of the program on a cluster ofmachines.

2. Workers are assigned tasks by the master. The master picks an idle workerand assigns each one a map task or a reduce task.

3. A worker assigned a map task reads the contents of the corresponding input

Page 29: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.3. BIG DATA PROCESSING ON LARGE CLUSTERS 29

Figure 2.3: MapReduce execution flow

split, parses tweetId/tweetText pairs from it and passes them to the user-defined map function. Intermediate word/tweetId pairs emitted by the mapfunction are buffered locally in the worker’s memory.

4. Periodically, the buffered intermediate pairs are written to local disk, vir-tually partitioned in R regions by hash-partitioning (or user-specified parti-tioning) function. The partitioner divides the data according to the numberof reducers so that all the data in a single partition gets executed by a singlereducer. But it is important that all the intermediate pairs having the samekey (for this example is a word) will end up in the same partition. Loca-tions of these regions are passed to the master who in turn is responsible forforwarding these locations to the reduce workers.

5. Whenever a reduce worker is notified by the master about the intermediateword/tweetId pairs location it uses a remote procedure call (RPC) to readthis data data from the local disk of map workers. Whenever a reduce workerhas read all the intermediate partition it sorts these pairs by keys in a waythat repeated keys are grouped together.

6. The reduce worker iterates over the sorted intermediate data and for eachunique word in the intermediate word/tweetId pair encountered, it passesthe key (word) and the corresponding set of intermediate values (tweetId) to

Page 30: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

30 CHAPTER 2. BACKGROUND

the reduce function. The output of the reduce stage is a word and a list ofall tweetIds where these word had occurred.

7. As soon all map and reduce tasks have been completed, the master returnsthe MapReduce call to the user program.

The original implementation [22] also describes how faults are tolerated incase of worker failures, where a master pings periodically the worker nodes and canreschedule tasks in case a worker fails. It is claimed that in case the user-suppliedmap and reduce functions are deterministic on their input values, the MapReduceimplementation produces the same output regardless of the presence of faults.

To store data sets for big data processing on a large-scale commodity clusterrequires a method to make the data available throughout the cluster fulfilling thefollowing requirements:

• fault tolerance – data has to remain available even in the presence of nodefailures;

• high throughput for a large number of concurrent reads/writes.

Therefore, another important aspect of the model is how data gets distributed overthe cluster by taking advantage of the local disks of the same machines that makeup a cluster used by MR. To satisfy the aforementioned requirements, MapRe-duce implementation is coupled with a distributed file system named Google FileSystem, which is described next.

2.3.2 Large-scale distributed filesystem - Google File Sys-tem

To complement MapReduce framework a scalable distributed file system is re-quired that provides ability to store large datasets and to deliver high aggregateperformance to a number of clients along with fault tolerance when running on acluster of inexpensive commodity hardware. The original GFS architecture andits main design choices are described in the original paper by Ghemawat Sanjayand Gobioff Howard [23].

Figure 2.4 shows a GFS cluster architecture, which consists of a single mas-ter node and multiple chunkserver nodes. The master node is used only for thecoordination and control, whereas chunkservers is where the actual data is stored.

Page 31: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.3. BIG DATA PROCESSING ON LARGE CLUSTERS 31

Figure 2.4: GFS architecture

Files are divided into fixed-size blocks, where each block is identified by animmutable and globally unique 64 bit handle assigned by the master at the mo-ment of creation.Chunkservers store blocks on local disks as Linux files and read/write data toblocks specified by block handle and byte range. Reliability is achieved by repli-cating each block on multiple chunkservers, by default replication factor is set to 3,meaning each block is replicated 3 times on different chunkservers. The replicationfactor has to be configured on the requirements based on the use case scenario intrade-off between the reliability and read and write bandwidth. Low values in-crease a risk of the file to become unavailable, while decrease the write bandwidthpenalty.Master is in charge for keeping all the file’s metadata including mappings fromfiles to blocks, current location of chunks, block lease management and block mi-gration between chunkservers. The choice of having one master simplifies thedesign and allows to exploit the master’s global knowledge for sophisticated chunkplacement and replication decisions.Clients interact with the master and chunkservers for read/write operations. Themaster is contacted for metadata operations but all data-bearing communicationgoes directly to chunkservers. Data is never read through the master, clients onlyask master which chunkserver they need to contact. Otherwise, the master nodewould immediately become a bottleneck.

Locality aware data accessTypically, in a MapReduce cluster each node takes the roles of both computation

and storage, which makes it possible to bring computations to data [24]. Datalocality is a significant advantage of data parallel systems, it allows to reducenetwork traffic and therefore improve the performance of data-intensive compu-

Page 32: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

32 CHAPTER 2. BACKGROUND

tational tasks. To achieve locality-aware computations, the MapReduce master,that is in charge of scheduling tasks across the cluster, exploits the informationfrom the GFS master in order to schedule map tasks on the nodes that actuallystore the required chunks of data.

The Google file system demonstrates qualities essential for supporting large-scale data processing workloads on commodity machine clusters, it allows to treatcomponent failure as normal rather than an exception. Moreover, the presented de-sign delivers high performance, concurrent read/write access to a number of clientsin the cluster, while providing an essential fault tolerance feature. Therefore, GFSis an essential component in a distributed big data processing architecture, whichis used to complement the MR computational model.

2.3.3 Open source framework - Apache Hadoop

Apache Hadoop project [25] develops open-source software for reliable, scalable,distributed computing. Hadoop is an open source implementation of MapReduce,written completely in Java and available as a software for controlling a cluster andrunning applications on top of it. It is a framework that allows for the distributedprocessing of large data sets across a cluster of commodity machines. Hadoopdevelopment has been inspired by Google’s MapReduce [22] and Google’s FileSystem [23] architectures.

Hadoop “common” is the core environment that includes implementations ofMapReduce functionality and Hadoop Distributed File System (HDFS), which isan implementation of GFS. Moreover, Hadoop functionality can also be extendedwith side projects that provide additional possibilites to the core environment(HBase, Hive, Pig, etc.), typically referred to as the Hadoop ecosystem [26]. TheHadoop cluster architecture is similar to the original MapReduce, presented in theSection 2.3.1. However, Hadoop uses a slightly different names for its components,which we map into the original MapReduce architecture. Hadoop common isbest described by dividing it in two layers: MapReduce and HDFS, as shownon Figure 2.5. As it had already been mentioned in the Section 2.3.1, there aretwo types of nodes in the MapReduce architecture that control the job executionprocess:

• Hadoop’s master node is called a jobtracker. The jobtracker coordinatesall the jobs on the system by scheduling tasks to run on the worker nodes.

• worker nodes are called tasktrackers, which are responsible to execute mapand reduce tasks and send progress reports to the jobtracker.

Page 33: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.3. BIG DATA PROCESSING ON LARGE CLUSTERS 33

Figure 2.5: Hadoop cluster architecture

The jobtracker is able to detect if the task fails or takes longer than other tasksand can reschedule it on a different tasktracker machines.

Hadoop divides the input to a MapReduce job into fixed-size pieces calledinput splits. It then creates one map task for each split, which runs the user-definedmap function for each record in the split. Having many splits means the time takento process each split is small compared to the time to process the whole input.For parallel computations the processing is better load-balanced if the splits aresmall, since a faster machine will be able to process proportionally more splits overthe course of the job than a slower machine. Even if the machines are identical,failed processes or other jobs running concurrently make load balancing desirable,and the quality of the load balancing increases as the splits become more fine-grained [26]. On the other hand, if splits are too small, then the overhead ofmanaging the splits and of map task creation begins to dominate the total jobexecution time.The Hadoop master node schedules map tasks across the cluster, in a way thatnodes get assigned with the part of input data that they store locally on the disk.This is what we had mentioned in Section 2.3.2 as locality-aware data access inHadoop terminology is called data locality optimization. However, only map taskscan benefit from data locality, as the input to a single reduce task is normally theoutput from all mappers. Therefore, the sorted map outputs, typically, have tobe transferred across the network to the node where the reduce task is scheduled,where these map outputs are merged and then passed to the user-defined reducefunction.

Page 34: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

34 CHAPTER 2. BACKGROUND

Hadoop had proven to be one of the most popular large scale processingframeworks over the last decade, incorporating a large community with regulardevelopments and fixes. Moreover, it is reported to be applied for a great varietyof use cases in the infrastructures of such prominent companies as Facebook, Yahooand many others.

2.3.4 In-memory cluster computing with working sets -Apache Spark

MapReduce is a successful model for large-scale data-intensive applications oncommodity clusters. It is built around an acyclic data flow model, where theuser creates an acyclic data flow graph to pass the input data through a set ofoperators. While this programming model is successfully used for a large classof applications, there are applications that can not be efficiently expressed usingan acyclic data flow model. These applications are typically based on “iterativejobs”, where processing is repeatedly applied to the same input dataset.

Apache Spark [27] is a project to overcome such limitations. In particular, itis going to allow applications to reuse a working set of data across multiple paralleloperations, while retaining the scalability and fault tolerance of MapReduce.To achieve these goals, Spark introduces an abstraction called Resilient DistributedDatasets (RDDs). An RDD is a read-only collection of objects partitioned acrossa set of machines, allowing programmers to perform in-memory computations onlarge clusters in a fault-tolerant manner [28].

In memory computing is achieved by explicitly loading RDDs into the machinesmemory across the cluster and reusing them in a sequence of MapReduce operation,therefore, reducing the cost of reading data from disk.

Fault tolerance is guaranteed by maintaining the lineage [29], meaning to keepthe knowledge of the origin and processing applied to the dataset. In this way, incase of failures, the required partition can be rebuilt from its origin. This approachworks efficiently, as long as the lineage graphs do not become very long, that mayrequire to perform a long chain of recomputations. For this purpose, an RDDprovides the ability for checkpointing, that it gets written to the stable storage(machine local or HDFS) and all the previous lineage records are removed. In caseof failure, required partition will be recomputed from the last checkpoint.

RDD operationsRDDs support two types of operations: transformations, which create a newdataset from an existing one (map), and actions, which process the dataset andreturn the processed result (reduce). All transformations are lazy, in a way thatthey are not computed right away. Instead, transformations are just remembered

Page 35: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.3. BIG DATA PROCESSING ON LARGE CLUSTERS 35

Transformation Descriptionmap(func) Return a new distributed dataset formed by passing each

element of the source through a function func.filter(func) Return a new dataset formed by selecting those elements

of the source on which func returns true.flatMap(func) Similar to map, but each input item can be mapped to

0 or more output items (so func should return a groupof items rather than a single item).

mapPartitions(func)

Similar to map, but runs separately on each partition(block) of the RDD. Function receives the whole par-tition as an argument as opposed to map, where thefunction is called for each RDD element separately.

union(otherRDD) Return a new dataset that contains the union of theelements in the source dataset and the argument.

Table 2.2: List of RDD transforms

and chained to be performed on some base dataset whenever an action is called [30].Transformations that can be applied to an RDD are presented in Table 2.2 alongwith the description. Actions supported by Spark are show in the Table 2.3.

Shared variablesNormally, when a function is passed to a Spark operation, these function is exe-cuted on a remote cluster node, working on the separate copies of all the variablesused in the function. These variables are copied to the remote machines and noupdates can be propagated back to the driver program. However, for some specialcases, Spark provides two shared variables abstractions: broadcast variables andaccumulators.

Broadcast variables allow the programmer to keep a read-only variablecached on each worker machine rather than shipping a copy of it within tasks. Itcan created from any serializable type variable at the driver program before thecomputation and distributed to the worker nodes together with the task, wherethis variable is read-only.

Accumulator is a global object of predefined type, created at the driver ap-plication with some initial state and submitted to the worker nodes together withtasks. Each worker node can add the same type of object to this variable usingan associative operation. Such abstraction is typically used to support counters orsums. Accumulator can only be read at the driver program, whereas each workercan only add to it.

Page 36: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

36 CHAPTER 2. BACKGROUND

Action Descriptionreduce(func) Aggregate the elements of the dataset using a function

func (which takes two arguments and returns one). Thefunction should be commutative and associative so thatit can be computed correctly in parallel.

collect() Return all the elements of the dataset as an array at thedriver program. This is usually useful after a filter orother operation that returns a sufficiently small subsetof the data.

count() Return the number of elements in the dataset.first() Return the first element of the dataset (similar to

take(1)).take(n) Return an array with the first n elements of the dataset.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or setof text files) in a given directory in the local filesys-tem, HDFS or any other Hadoop-supported file system.Spark will call toString on each element to convert it toa line of text in the file.

Table 2.3: List of RDD actions

Performance and limitations

The Spark performance evaluation in [28] shows that it outperforms Hadoop fora class of applications that require “iterative jobs”. By taking the advantage ofkeeping data in memory and therefore avoiding the cost of rereading data fromdisk across iterations, it provides a noticeable improvement. However, if a datasetis too large to fit in the memory of a single worker, it has to be spilled to disk andtherefore, performance degrades and becomes similar to Hadoop.

From our experience with Spark, we found it as a good alternative to Hadoopas it is easier to set-up the cluster and has a steeper learning curve, compared toHadoop. The RDD abstraction along with its supported operations reduces theeffort required to write simple MapReduce applications. Whether it is beneficialto use Spark’s RDD abstraction depends on access patterns. RDDs are best suitedfor batch applications that apply the same operation to all elements of the dataset.However, it is reported to be not optimal in terms of performance for the applica-tions that make asynchronous fine-grained read/updates to the dataset, for whichbetter support is provided by databases.

Page 37: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.4. DATA INTEGRATION 37

2.4 Data integration

With the rapid development of the Internet and sensing devices and the aforemen-tioned continued growth of information, a great interest has aroused in knowledgediscovery and data analytics to derive business intelligence from the vast quantityof dispersed pieces of information. Business intelligence is the transformation ofraw data into meaningful and useful information for business analysis purposes. Asdata arrives from a variety of sources, each having its own structure and semantics,often you will find yourself in a situation that it is not immediately possible totake advantage from it. Before any analytics can be performed on the data, it hasto be accurately prepared. Such process is usually referred to as data integration.

Data integration is the problem of combining data residing at different sources,having different structure and semantics, and to provide the user with a unifiedview on this data, upon which to perform meaningful analysis. Organizationsrequire a tool that helps them perform all the required data acquisition and pro-cessing tasks. Such class of tools is called Extract, Transform, Load (ETL),because of the life cycle data has to go through:

(a) Extract - collect data from a variety of sources, such as crawling the web,loading from a variety of databases, acquire streaming data, etc.;

(b) Transform - perform required transformations on the raw input data to giveit a unified structure, while performing cleaning and quality assurance proce-dures;

(c) Load - store data in the final target destination, which is usually a large,dimensionally modeled database – a data warehouse.

This work focuses on the transform part of the ETL, which by-and-largeis the most important step in data integration, where the system actually addsvalue to the data. The other activities, extracting and delivering data, are obvi-ously necessary, but they simply move and load data, while the transform partenhances data by applying required structural transformations and data cleansingoperations.

2.4.1 Data transformation and cleansing

Data transformation and cleansing is the process of making data credible. We saydata is credible if it delivers a suitable representation of a phenomenon to enableproductive analysis. In this section, we describe how the need of data transforma-tion and cleansing arises based on a representative usage scenario.

Page 38: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

38 CHAPTER 2. BACKGROUND

Cleansing and transformation scenario. Suppose, we are being asked toanalyze 30 years of car accidents data collected by three different authorities.Our goal is to merge data from all of the sources into a single table, so that itcan be loaded into a specialized application for analysis. Unfortunately, the dataarrives in three different formats: one source is a relational database, another isa comma-separated value (CSV) file, and the third source is in the form of tablesin a portable document format (PDF). We assume that the “extract” part of theETL does its job well and provides us with 3 tables, one for each of the sources.First, we take an insight into the different data sets to identify potential mergingproblems.

We notice that in the table extracted from the relational database, locationsof accidents are encoded in a single column (as ‘City, Province’), whereas theyare in two columns (one ‘City’ column and one ‘Province’ column) in two othertables. Therefore, in order to align this data into a common structure, we haveto perform a “column split” operation on the first table. Such that a new columngets created based on the existing one, and the value of each cell of the existingcolumn is split between two columns. After applying the column split operation,we have consistent columns across all the tree tables and therefore they can becombined in a single table.

Next, we realize that the ‘Date’ column is formatted as ‘dd/mm/yy’ in somecells and as ‘mm/dd/yyyy’ in others. Therefore we write a script to iterate overall the cells in the Date column and apply pattern matching (regular expressions)to identify and change the date to a single consistent format. In addition to thedata format issue, we notice that in the ‘City’ column the city name ‘Trento’ issometimes written as ‘Trient’. Therefore we apply an operation to change all thecells with value ‘Trient’ into ‘Trento’. Finally, the data is ready to be loaded intothe specialized application for the analysis.

We have seen a very basic use case scenario, for the data transformation andcleaning, where we required to transform the table structure and fix misspelledcity names and inconsistent date formats. It is worth mentioning, that it is arelatively simple scenario, typically required data transforms may become muchmore complicated. Such usage scenario requires exploring the raw data to revealthe problems, design and apply various types of transformation functions. Hence,an interactive specialized tools that simplify the process of creating and applyingtransformation rules on a data are of great interest for this type of tasks. In thenext section, we present two open source tools that are designed to facilitate thedata transformation tasks.

Page 39: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.5. AVAILABLE ETL TOOLS 39

2.5 Available ETL tools

In this section we present Pentaho Kettle [31] and OpenRefine [4], which are themost widely used open-source tools for data cleansing and transformation. Weaim to provide a reader with an overview of their features, basic architecture andto underline their strong points and limitations.

2.5.1 Pentaho Kettle

Pentaho presents a comprehensive modular platform for data integration and busi-ness analytics. Kettle is the data integration module of a larger Pentaho BusinessAnalytics platform, presenting its data integration and ETL capabilities. It con-sists of a core integration engine and GUI application that allows the user to designdata integration jobs and transformations.

Kettle is built with the Java programming language and consists of fourdistinct applications, namely spoon, pan, chef and kitchen. Functionality of eachis presented in Table 2.4.

Name DescriptionSpoon Graphically oriented end-user tool to model the flow of

data from input through transformation to output, suchflow is named transformation in the Kettle context.

Pan Command line tool that executes transformations mod-elled with a Spoon.

Chef Graphically oriented end user tool used to model jobs,which in turn consist of transformation entries to forma flow control.

Kitchen Command line tool used to execute jobs created withChef.

Carte Web server which allows remote monitoring of the run-ning Pentaho Data Integration ETL processes througha web browser.

Table 2.4: Pentaho Kettle components

Features. Kettle’s important feature is that it is model-driven, that providesgood visualization of the workflow and enables it to be designed in an interactive“drag and drop” fashion. Typical Kettle workspace is presented in Figure 2.6. Onthe left panel user selects a block with required operation and pastes it to theworksheet, where it has to be configured and interconnected with other blocks to

Page 40: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

40 CHAPTER 2. BACKGROUND

Figure 2.6: Pentaho Kettle GUI

form a transformation. Models built with a GUI are saved in XML format andcan be supplied to the command line tools to apply the transforms.

Kettle supports a large variety of input sources, such as databases, localfile system and HDFS. Moreover it provides a vast number of options to performdata transformations and an easy way to chain these transform activities into aworkflow, to be executed on a single machine or a Hadoop cluster. The model-driven approach is a very intuitive way to work with data integration tasks, howeverthis focuses the operator on the workflow design, but not the actual data. Auser can not see the actual data while designing the transform and, as it wasshow in Section 2.4.1, data cleansing tasks exploratory activities to be performedon data. Where a user has a view of the actual data (or at leas part of it),while applying transform and cleaning operations. In addition to that, building acomplex transformation requires numerous declarative tasks, such as configuringeach block in the workflow.

In conclusion, Pentaho Kettle is a powerful ETL tool offering broad function-ality, an intuitive graphical user interface for workflow design and can be integratedinto a broader business analytic Pentaho platform. However, it’s cumbersometransform configuration steps and lack of interactivity make it not suitable for fastand simple data transformations and requires a highly qualified staff to operate it.

Page 41: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.5. AVAILABLE ETL TOOLS 41

2.5.2 OpenRefine

OpenRefine is a powerful open-source data cleaning and transformation tool, im-plemented in the form of a web application, but intended to be used as a standalonedesktop application. The user-friendly interface of the tool permits non technicalusers to take control of data transformation and cleaning activities by performingvarious built-in and extensive operations on data, while observing the result assoon the operation is complete. In this section we give a comprehensive descrip-tion of the OpenRefine architecture, as it is the core tool this work is based on.Its architecture and data model will be often referenced throughout the rest of thechapters.

History. OpenRefine take its roots from the open source Freebase Gridworksproject, developed by Metaweb Technologies in 2010, which original objective wasto facilitate data cleaning, reconciliation and upload to Freebase database. Later in2010 Google acquired Metaweb and the Gridworks project was renamed to GoogleRefine.

Numerous enhancement had been done with the support of Google engineersuntil they decided to abandon its development and to completely relay the projecton the community. Since early versions, Google Refine had a tremendous inter-est from various communities, including journalist and data analysts. However,Google did not officially explain the reason why did they decide to stop the de-velopment of OpenRefine. The community suspects that it did not witness anyspecific integration with Google services [32], as it was developed as a desktopbased application and couldn’t fit the Googles cloud environment. Finally, it wasrebranded into OpenRefine and a large community had been established around it.

Features. OpenRefine is not directly an ETL tool, although it borrows numer-ous features from such type of tools. It offers interactive data transformation andcleanup capabilities, sometimes referred to as data wrangling. Among the numer-ous features of OpenRefine, it is noteworthy to underline the following ones, asthey are unlikely to be found in other ETL tools:

• faceting allows a user to easily filter rows of data on some specific criteriato invoke operation only on the interested ones. For instance a table hasa ‘City’ column, and you might want to focus on a specific city. Facetingallows you to easily filter only the rows associated to a specified city, applytransforms on this part of data and than return to the complete table view.Faceting is a convenient feature when performing real-time exploratory datatasks;

• expressions feature of OpenRefine allow users to write custom scripts to be

Page 42: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

42 CHAPTER 2. BACKGROUND

Example Descriptionvalue + " add this string!" concatenates two strings; the value in a cell

gets converted to a string and concatenatedto the specified string.

value.trim().length() trimming leading and trailing whitespace of acell value and than return the length a string.

value.substring(13) drop the characters of a string before the in-dex 13 and return the resulting string

Table 2.5: GREL expression examples

applied on the cell values to transform existing data or create new databased on existing entries. It can be seen much like how spreadsheet soft-ware supports the use of “formulas”. Whereas each spreadsheet softwarehas its own “formula” language, OpenRefine is capable of supporting severallanguages for writing expressions. OpenRefine has its own native languagecalled Google Refine Expression Language (GREL), but you could also useJython [33], or other languages made available by extensions. Several GRELexamples are presented in the Table 2.5;

• reconciliation is a semi-automated process of matching text names to databaseIDs (keys). This is semi-automated because in some cases, machine alone isnot sufficient and human judgment is essential. For example, given ”Ocean’sEleven” as the name of a film, it can be matched to the original 1960 movieor a 2001 remake. OpenRefine allows to perform reconciliation of namesin the dataset against any database that exposes a web service followingthe Reconciliation Service API [34]. In this way, a reconciliation service forany specific needs of an organization can be set up and easily used withinOpenRefine.

• extensions provide functional extensibility of OpenRefine’s core engine bytaking advantage of the Butterfly [35] modular web application framework.An example of such modules is the OpenRefine’s RDF extension [36], whichprovides the functionality of building an RDF schema and exporting the datain RDF format.

The aforementioned features make OpenRefine very popular both among thetechnical and non-technical users. It combines vast functionality, ease of use andexceptional interactivity, that can’t be provided by any other open-source toolavailable at the moment.

Architecture. OpenRefine is a web application, however intended to be run andused on a single machine by single user, still it consists of the server and client side.

Page 43: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.5. AVAILABLE ETL TOOLS 43

Figure 2.7: OpenRefine GUI

The server-side maintains states of the data and client-side maintains states of theuser interface and makes HTTP calls to change the data on the server-side. Suchan architectural decision provides separation of concerns in the sense of separatingdata from the user controls. Client-side delivers a user interface through modernweb technologies, such as HTML, CSS, Javascript and AJAX, which makes theGUI highly interactive, functional and concise at the same time. A screenshot ofthe standard OpenRefine working screen is show on Figure 2.7

The Server side is implemented in the Java programming language, whichassures portability across operating systems. OpenRefine’s core module is a singleJava Servlet (Java application inside a web server) that itself extends a Butter-fly [35] modular web-app framework, executed by the integrated Jetty web server.Refine Servlet is the main entry point for all modules, responsible for loading,configuring and wiring together with the core module, extensions and managingthe dispatching of requests to them. A graphical sketch of the OpenRefine archi-tecture is presented in Figure 2.8. Extensions are built in the form of Butterflymodules and can contain both server-side Java code to add functionality to theserver along with client-side Javascript and HTML in order to add controls andvisual elements to the GUI. Refine Servlet manages the interaction between theclient side of the application and the server side, therefore each action on the client

Page 44: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

44 CHAPTER 2. BACKGROUND

Figure 2.8: OpenRefine architecture

side has a specific URL which is mapped into the commands on the server side.OpenRefine has a concept of a workspace-project, where a project is the core

working unit in OpenRefine. When data is loaded into the engine, a separateproject is created for it and a user can work at one project at a time. The ac-tual data is stored inside the project instance along with the history of appliedtransformations to the data. Projects are controlled by the “Project Manager”,which holds references to all projects and maintains metadata for each of them.Metadata includes projects name and last modified date. To work on a project,the whole dataset is loaded into memory and is disposed from memory either after60 minutes of inactivity, estimated from last modified time in metadata, or whena user begins to load another project. Projects actual data is stored in a tabularform and therefore consists of the following elements: column models, rows andcells, which we describe next.

Data model. Actual data, as shown on Figure 2.9 is stored inside a projectinstance and consists of:

• raw data: as a collection of ‘Rows’, where each ‘Row’ consists of a collectionof ‘Cells’;

• column model on top of raw data that gives high level presentation or inter-pretation of the raw data.

Such design allows the column model to be edited without causing costly processingon raw data transformations. Cells inside the row objects are not named and canonly be addressed by their position indices, therefore column model maintains themappings of names and position in rows. Columns in the column model can be

Page 45: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

2.5. AVAILABLE ETL TOOLS 45

Figure 2.9: OpenRefine data model

removed and reordered without changing the raw data, therefore it makes columnmanipulation operations quick. At the moment a change is introduced to theproject’s data, a “Changes” object is created. Since keeping them in memory wouldlead to large memory cost, “Changes” objects are immediately written to disk intothe project working directory, while a “History Entry” object containing a handleto the “Changes” file is created inside the project instance. Changes file containsonly the difference information of the data before and after the transformation hadbeen applied, but it may become comparable to the size of the whole data. Suchan architectural decision provides several benefits to the user:

• applied transformations can be easily undone, which enhances the user ex-perience when performing interactive exploratory activities on the data;

• it allows the data to be reverted back, regardless of the reversibility of thetransformation, meaning that any operation can be undone.

On the other hand, the fact that it is not possible to switch off the history trackingmay degrade the performance of the tool. In particular in the case, when OpenRe-fine is used as a processing engine without the GUI (integrated in a larger pipeline),typically a series of operations are applied to the data and there is no need to re-vert them. Therefore excessive memory usage and unnecessary I/O degrades theperformance.

To output the data from an existing project, various exporters can be used.OpenRefine has a number of built-in exporters for CSV, TSV and HTML tableformats. However, extensions can be used to add custom exporters to the enginefunctionality. For instance, OpenRefine RDF extension [36] adds the possibility

Page 46: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

46 CHAPTER 2. BACKGROUND

to output data in RDF format.

Usage and Limitations. OpenRefine is a powerful data wrangling and recon-ciliation tool offering exceptional interactivity and simplicity to the user. It iswidely used for a variety of exploratory data transformation and cleaning activi-ties. Moreover, numerous extensions are shared freely by the community to extendits core functionality.

From our experience with organizations and from the mailing list of theproject, we observe that it is common practice to integrate OpenRefine inside theETL pipeline, regardless of its initial intent to be run as a desktop application.However, some of the core architectural decisions limit its performance for largescale data processing and are not easy to be fixed.

In Section 3.1.2 of Chapter 3 we conduct a series of assessments on theOpenRefine performance and clarify what are the bottlenecks in its design.

Page 47: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Chapter 3

Approach

The irrationality of a thing is noargument against its existence,rather a condition of it.

Friedrich Wilhelm Nietzsche

In Section 2.5.2 we introduced a tool for data wrangling, namely OpenRefine.It is one of the core business tools for SpazioDati s.r.l [5], a company with whichwe collaborate on this project. Recently, the company started facing performanceissues when trying to process large datasets with OpenRefine. This issue servesas a motivation for this work: to study and to overcome scalability limitations ofOpenRefine.

In this chapter we set the direction for the remainder of this work. In the firstpart (Section 3.1) we evaluate the current OpenRefine implementation, formalizethe problem that has to be solved and set the baseline for this project. In thesecond part (Section 3.2), we present our solution to the problem, along with thedescription of difficulties faced during its implementation.

OpenRefine is implemented in Java and therefore executed inside the JavaVirtual Machine (JVM), which provides automatic memory management to theapplication. Automatic memory management is a costly procedure, which has animpact on the overall performance of the application. In Section 3.1.1 we dis-cuss the interaction between JVM memory management system and OpenRefine.Taking into account the peculiarities of data organization in OpenRefine and itsinteraction with JVM memory management, in Section 3.1.2 we explain the mi-crobenchmarking procedure that we adopt to assess the performance of the tool.Our performance assessment results are used to further formulate the scalabilityproblem of OpenRefine in Section 3.1.3 and to set the baseline reference for thisproject.

47

Page 48: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

48 CHAPTER 3. APPROACH

As part of the solution in Section 3.2 we review several approaches whichattempt to overcome the OpenRefine engine scalability limitations, while preserv-ing it’s centralized architecture. We explain why these approaches had not beensuccessful to solve the scalability problem and present our distributed approachin Section 3.2.1. Part of OpenRefine functionality has potential parallelism andcan be implemented on a distributed platform without a complete rewrite of theoriginal engine. We present the solution for this class of engine functionality in3.2.2. However, there is another class of OpenRefine features that require substan-tial effort to be implemented in parallel fashion. We provide a sketch on how thisclass of problems can be solved in Section 3.2.3.

3.1 Problem description

As mentioned in Section 2.5.2, OpenRefine is implemented in the form of a web-application, consisting of a client and a server part. The client part is responsiblefor the GUI controls, whereas the server part is where the actual data is storedand processed. Since the server part is the one doing all the heavy lifting work, wefocus our attention on the server side, and refer to it as the OpenRefine “engine”for convenience.

In response to the recent report from SpazioDati on OpenRefine performanceissues, we start by investigating the scalability and performance of the OpenRefineengine. The OpenRefine engine is implemented in the Java programming languageand therefore is executed inside a Java Virtual Machine (JVM), which providesautomatic memory management by means of a Garbage Collector (GC) [37]. Sincethe GC shares resources with the application, it has a direct impact on the ap-plication’s performance running inside the JVM. Hence, an important step inperformance assessment is to grasp the impact of JVM memory management onthe overall performance of the OpenRefine engine.

3.1.1 Interaction between GC and the OpenRefine engine

Memory management is the process of recognizing when allocated objects areno longer needed, deallocating (freeing) the memory used by such objects, andmaking it available for subsequent allocations. In Java, memory management isautomatically performed by the GC, which liberates the programmer from memorymanagement complexity. The Garbage Collector is responsible for:

• ensuring that any referenced objects remain in memory;

• recovering memory used by objects that are no longer reachable from theexecuting code.

Page 49: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.1. PROBLEM DESCRIPTION 49

The JVM has a heap, that is the runtime data area from which memory for all theapplication objects are allocated. The heap is created at start-up and is managedautomatically by the GC, whereas the programmer can control only the initial andthe maximum heap size. If a computation requires more heap than can be madeavailable by the automatic storage management system, the Java Virtual Machinethrows an out of memory error: Java heap space, which typically results in a JVMcrash.

Though Garbage Collection solves many memory management tasks, it is alsoa complex task taking time and resources that are shared with the application.Typically GC is triggered when the occupancy of the heap reaches a threshold [37],but JVM can not be explicitly instructed when to perform memory deallocation. Acommon metric to evaluate garbage collector performance is the garbage collectionoverhead, which equals to the ratio of time spent in garbage collection to the totalapplication run time, as shown in Equation (3.1).

GC Overhead =Time spent in GC

Total application run time(3.1)

In case the JVM is spending more than 98% of the application runtime in GC,trying to free the memory, JVM will throw an out of memory error: GC overheadlimit exceeded. This means that your application can hardly make any progressbecause of the lack of heap space.

OpenRefine data model. We discussed how the actual data is stored insidethe OpenRefine engine in Section 2.5.2 and also provide a graphical representationin Figure 2.9. In particular, all the raw data is stored inside a project instance inthe form of a collection of rows, where each row object embodies a collection ofcells. When data is imported into the engine, new objects are created on the heapspace and are required to persist in memory to apply transformations on them. Inthe course of applying transformations on data, in addition to the space requiredto keep the raw data, extra heap space is required for intermediate objects createdby the transformation algorithm. These intermediate objects will be removed fromthe heap during the next garbage collection cycle. Consequently, performance ofthe engine is affected by the interaction of OpenRefine with the GC. It is importantto accentuate on the interaction, because the application and the garbage collectormay be seen as separate events that interact with each other, while sharing thesame resources. This interaction not only affects the performance of the engine butalso complicates the measurement procedure. When measuring the performanceof a Java application, it is important to also measure the time of GC stages. Oth-erwise, the measurements contain both the time of the actual application progressand time of the garbage collection stages, which makes it difficult to assess thereal performance of the application. In particular, when the OpenRefine engine

Page 50: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

50 CHAPTER 3. APPROACH

operates close to the limit of the maximum available heap space, we observe asignificant performance degradation of the engine that we can not reason withoutthe information about GC stages.

Taking into account the aforementioned peculiarities of the memory manage-ment inside the JVM and the data model of OpenRefine, the goal of Section 3.1.2is to provide an estimate of the engine performance and scalability that is expectedto prove our initial assumptions and help us quantify the problem.

3.1.2 Microbenchmarking the OpenRefine Engine

To analyze performance of the OpenRefine engine, a decision has been made to-wards microbenchmarking . Microbenchmarking attempts to measure perfor-mance of small components in a larger system. Therefore, we use this approach toget an estimate of OpenRefine’s engine components performance independently,to identify bottlenecks of the system. Moreover, another important reason thatleads us to use microbenchmarking approach is the absence of a real workload. Aworkload for the OpenRefine engine consists of the input data and operations tobe applied on the data (depicted in Figure 3.1). Hence, the amount of workloaddepends on the following factors:

1. the size of the input dataset (gigabytes or millions of lines);

2. number of transformations to be applied on the input data;

3. complexity and the amount of changes introduced by each of the transfor-mations;

OpenRefine offers a broad set of transformations of various complexity, whichtogether form the core functionality of the engine. Typical data cleaning tasksrequire a chain of different transformations to be applied on the input data, whichOpenRefine applies sequentially. Therefore, there are plenty of possible combi-nations of transformations that make up a workload together with the input file.Due to the unavailability of a standardized workload, we choose to construct asynthetic one, based on the following assumptions:

• performance of a group of transformations can be obtained by summing upthe performance’s of each transform in a group, given that they are appliedon the same input data;

• to have an estimate of the engine scalability, we require to assess the changein the performance with increasing input data sizes.

Page 51: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.1. PROBLEM DESCRIPTION 51

Figure 3.1: OpenRefine workload

Following this assumption, we generate 143 different combinations of the workload(11 data sizes and 13 transforms), that we describe next.

Generated synthetic workloadTo assess the engine’s performance, we take 13 transforms that are provided by thecore engine functionality, shown in Table 3.1. Both operations that are network-bound or dependent on 3rd party services were not included into the list of trans-forms as they don’t reflect the actual performance of the engine, but are dominatedby external factors.As input data, a publicly available CSV file was downloaded from the “OPENdataTrentino” project website [38]. The original file contains 70 rows and 7 columns.By taking this file as a base, we created our synthetic large datasets. Eleven filesof different sizes ranging from 1 to 4 million rows were generated by looping (re-peatedly appending) the base file to itself.

Measurement procedureTo perform our measurements we time and analyze the behavior of the engineduring a transformation cycle of a single operation, which comprises the followingstages:

• create project and load data into the engine (load data);

• apply transform;

• export transformed data from the engine (export result).

The types of data to be measured for each transformation cycle and the corre-sponding units of measurement are shown in Table 3.2.

To drive the measurement procedure and control the engine without theclient-side GUI, an auxiliary tool has been developed to access OpenRefine viaits HTTP API. We refer to this tool as Refine HTTP Client. For the benchmarkprocedures we followed the recommended best practices for benchmarking Javaapplications [39,40]:

Page 52: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

52 CHAPTER 3. APPROACH

Transformation DescriptionBlank down cells Blanks consecutively repeating cells in a column by visiting

each cell from top to bottom, such as, only the first occurrenceof repetition cells remains, consecutive is blanked. Operationis invoked on per column basis.

Fill down cells If a cell in a column is followed by a blank cell, it will beassigned a value of the previous cell. Operation is invoked onper column basis.

Column addition Create a new column based on the existing column, usingsome expression on existing values.

Column move Allows to move the column to the left, to the right, to thebeginning, or to the end of the table.

Column remove Remove specified column.Column reorder Similar to column move, but allows to reorder columns in any

desired ordering. Can be seen as a sequence of “column move”operations.

Column split Create new columns by splitting the existing one based ona separator, regular expression, or can be split by specifyinghow many symbols will go to the each of the new columns.Provides an option to keep the base column or to get it re-moved after the transformation.

Composite transform A sequence of 5 simple transforms: column split,GREL:value.split, unescape html, GREL:value.replace

Columnize by Key-Value

Is a transpose transform, where you choose the key columnand the value column, this would create columns with namesfrom the key column and insert values from the value columnin the corresponding position.

Mass edit cells Apply the same change to all identical cells in a column.Text transform Apply a custom GREL (Jython or Closure) function to each

cell in the column, moreover one can choose repeat count,that is to loop the same function on the cell N times. Forexample: value.toLowerCase() + "hello world!" wouldconvert all the cell value to lowercase and concatenate it with”hello world!” string. In case repeat count option is set, stringwould be added N times.

Transpose cells acrosscolumns into rows

This transpose transform can be used with a variety of op-tions. One way to use it is to transform a table with NColumns into a table with only 2 columns, with correspondingkey/value pairs.

Transpose cells inrows into columns

Transpose every N cells in a specified column into separatecolumns.

Table 3.1: List of chosen transforms

Page 53: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.1. PROBLEM DESCRIPTION 53

Measurement UnitLoad data time millisecondsApply transform time millisecondsExport results time millisecondsTime spent in GC millisecondsGC overhead percentageMaximum heap size Megabytes

Table 3.2: Measurement units

1. choosing an appropriate environment for testing: we use the most powerfulcomputer that we have at our disposal (4 Core 3.4 Ghz CPU, 16 GB RAM),which allows us to run the baseline tests, while keeping the GC overheadlow;

2. use a quiescent machine: the computer had not been loaded by any otherwork except the OpenRefine engine under test;

3. repeat the experiments: for each experiment type, 5 trials are performed andperformance analysis is performed on the averaged values;

4. control the GC impact on the performance: GC overhead was constantlychecked to be on the lowest possible level, such that it does not produceperformance artifacts in out measurements.

In our opinion, the best methodology to assess the scalability of OpenRefineis by observing the engine from both ability and usability perspectives, as we areinterested not only in the ability to process large files but also in time it takesto do so. Such methodology requires the measurements to be performed in twodimensions, each one with different objectives and approach:

1. a time dimension. For which the goal is to provide an objective measure ofhow the engine’s processing time changes to an increasing input size. It isaccomplished by providing the engine with the maximum available memoryand measuring the performance for an increasing input data size;

2. a space dimension. For which the objective is to derive the relationshipbetween the required memory and the input file size, which can also be seenas memory footprint estimation. It is accomplished by deriving the criticalmemory operation points (out of memory error occurs below this point) foran increasing input data size.

Page 54: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

54 CHAPTER 3. APPROACH

Time dimension scalabilityProcedure for the time dimension scalability approach is based on timing loaddata, apply transform and export result events for an increasing input data size(1 to 4 million lines), while retaining the same amount of maximum availablememory given to the JVM. The maximum heap size for this experiment has beenfixed to the value of 11 GB, which is large enough to keep the GC overheadreasonably low and constant (in relative terms) for all of the considered workloadlevels. Measurements were conducted for each of the 13 transforms, presented inTable 3.1, independently. The result of the experiments is presented in Figure 3.2.From the perspective of the time-dimension scalability assessment, the followingbehavior of the OpenRefine engine can be observed:

1. with an increasing input file size, all the 13 core operations exhibit eitherlinear growth or constant behavior in terms of time required to apply thetransformation as function of the file size;

2. in particular, both of the column reordering operations (Column move andColumn reorder) are totally independent from the input file size, and the timerequired to apply these transforms is nearly constant. It is explained by thefact that in OpenRefine such operations are performed on a column modeland do not introduce changes to the actual raw data. All the remaining 11transforms exhibit linear growth behavior with different rate of increase;

3. the slope of the linear increase depends on the complexity of the operationand the amount of changes it introduces to the data. We can state that key-value columnize, column split and both of the transpose operations can beclassified as the most complex ones. This classification can be inferred fromthe Figure 3.2. Where, with respect to simple transforms, these operationsrequire almost 10 times more time;

4. GC overhead that occurs during the “apply operation” phase, depicted onFigure 3.3, proves that our measurement has not been significantly influ-enced by the GC. We observe that for all of the operations GC overheadremains under 20%, therefore the time spent doing actual processing domi-nates the time spent performing GC. However, we observe that some opera-tions demonstrate higher garbage collection overhead values with respect tothe other core transforms. As this higher GC values are independent fromthe input data size, we rely this difference on the inherent characteristics ofthe transform;

5. an important observation from the GC overhead graph (Figure 3.3) is thatthe lowest values are observed not only for the most lightweight column

Page 55: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.1. PROBLEM DESCRIPTION 55

ordering operations, but also for the operations previously identified as themost complex ones. For such operations, the time required to perform actualprocessing tasks on the data severely dominates the time performing GC;

6. time required to load the input data into the engine, shown on Figure 3.4,is growing linearly with larger input files. It increased from two seconds for1 million lines up to eight seconds for the 4 million lines input file. Usingthe slope of a linear equation and assuming to have large enough heap toaccommodate an 8 million lines file, we can infer that an 8 million file wouldalready take 16 second to import it into the engine. Hence, this should alsobe considered in the engine scalability assessment.

We summarize that OpenRefine engine mostly shows decent performance onour synthetic workload and given a sufficiently large amount of memory it is able toprocess small to mid-size datasets. Nevertheless, the observed increase in the timerequired both for loading data into the engine and applying transforms is not adesired behavior of a system working in a fast-growing big data environment. Evenby making a false assumption that the memory of a computer can be infinitelylarge, the time required to load data and apply transforms on it would anywaycompromise the ability of the engine to process the data in a reasonable time.

Space dimension scalabilitySpace dimension scalability approach is based on the observation of the Open-Refine engine behavior in a memory constrained environment, which can be seenas a memory footprint estimation. Procedure for the space dimension scalabilityapproach is based on measuring the GC overhead of the “Load data” and “Applytransform” phases for an increasing file input size (1 to 4 million) and variousmaximum available memory values. This procedure allows to identify the criticalmemory points, below which, either the GC overhead is larger than 80% or theengine fails by throwing out of memory error. As a workload for this approachwe chose to use the “Transpose cells across columns into rows” operation, thatshowed to be the most computationally complex operation in the time dimensionapproach. From this approach we expect to extract the minimum memory re-quirements to load and process various input file sizes. We assume that the mostresource demanding transform reflects the worst-case scenario, such that for otheroperations the critical amount of memory would be less or equal to the obtainedvalue. The results are presented in Figure 3.5, from which we can observe thefollowing engine behavior:

1. the graph of the minimum required heap memory to load the files of varioussizes has similar linearly increasing behavior to the graph showing the min-imal memory requirements to apply the transform on the files of the samesizes. The minimum required heap size to load the 1 million lines file (79

Page 56: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

56 CHAPTER 3. APPROACH

Figure 3.2: Time required to apply the transform for various input data sizes

Mb) is equal to 1,5 Gb, whereas to load the 4 million lines file (315 Mb) theengine requires at least 5.5 Gb of available heap space;

2. to apply the “Transpose cells across columns into rows” operation on a 4million lines file, the engine requires at least 8 Gb of the available heapspace, which has grown from the value of 2.5 Gb for the 1 million linesinput. For lower heap values, the engine either spends more than 85% of thetime in GC or becomes completely inoperable and throws the out of memoryerror, which causes the system to crash;

3. from Figure 3.6 we can observe that operating near the boundary points ofthe minimum required heap space, GC overhead reaches very high values(more than 75%). This means that only small fraction of time, the engineis doing useful progress, whereas most of the time it is performing memorymanagement operations.

The space dimension approach describes the behavior of the OpenRefine enginein a memory constrained environment. It proves that the ability to process a

Page 57: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.1. PROBLEM DESCRIPTION 57

Figure 3.3: GC Overhead during the apply operation phase

file of a particular size directly depends on the available heap size. Moreover, aconjunction of input data size and the complexity of transforms to be applied onit, imply a critical memory point, such that operating below it causes OpenRefineengine to crash. Moreover, performance reduces drastically when working close tothe critical point of the required memory.

3.1.3 Problem statement and summary

We have adopted a microbenchmarking approach to assess the performance andpotential scalability of the OpenRefine engine. The decision to analyze the enginefrom both time and space perspective provides us with a broader view on the enginebehavior. Summarizing benchmarking results, we focus on the following scalabilityand performance issues of the current OpenRefine engine implementation:

• the ability to apply transformations of certain complexity on certain datasize is limited by the maximum available memory on a single machine;

• time required to process data increases linearly both with respect to theincreasing complexity of the transforms and data size. This increase in timecomprises: time required to import the data into the engine and time requiredto perform the actual transformation task.

Page 58: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

58 CHAPTER 3. APPROACH

Figure 3.4: Time required to load various file sizes into the engine

In this work, we are going to tackle the aforementioned limitations of the OpenRe-fine engine that limit its performance and scalability. Our objective is to providea scalable version of the engine, while retaining its functionality and support offuture development versions.

Scalability is the ability of a system to handle a growing amount of work ina capable manner or its ability to accommodate that growth. To accommodatelarger workloads, OpenRefine requires more resources, such as memory and CPUtime. Methods of adding more resources for a particular application fall into twobroad categories: vertical and horizontal scaling. Vertical scaling (scale up) meansto add resources to a single node in a system, involving the addition of CPUsand memory to a single computer. For instance, a machine with more physicalresources can accommodate more web-server threads to handle larger number ofconnections. Whereas horizontal scaling (scale out) means to add more nodes to asystem, such as adding a new computer to a distributed architecture. An examplemight involve scaling out from one web server system to three. Both of the scalingmethods have their advantages and disadvantages, which are considered on a case-by-case basis.

For our scenario we favor the horizontal scaling approach for the followingreasons:

1. scale up method requires sophisticated hardware solutions, such as supercom-puter machines. These are typically very expensive and purpose-build, there-fore require large investments in the company’s infrastructure. Whereas,scale out method allows to use the inexpensive commodity hardware. More-

Page 59: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.1. PROBLEM DESCRIPTION 59

Figure 3.5: Space dimension scalability: minimum and maximum memoryrequirements to load data and apply transform

over, it is not required to invest in a private infrastructure, as modern cloudbase services, such as Amazon EC2 [41], provide the resizable, distributedcompute capacity in the cloud;

2. keeping up with the fast growing big data sizes with a centralized architec-ture is considered more difficult, as the scalability is still limited by a singlemachine resources. Even with a very powerful machine, two of these wouldbe twice as powerful.

Therefore, we consider distributed parallel processing as the key to success-ful implementation to provide OpenRefine engine the ability to keep up with theconstantly growing data sizes.

Performance baseline for the projectIn order to proceed to our solution, we have to set a baseline for performancecomparison. Therefore, we adopt the results obtained from the time dimension mi-crobenchmarking approach (Section 3.1.2) as a baseline reference for this project.

Page 60: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

60 CHAPTER 3. APPROACH

Figure 3.6: Space dimension scalability: GC overhead for the 2 million linesinput file vs. various heap sizes

We intend to use the baseline in order to evaluate how much improvement wecan obtain, moving the OpenRefine engine from centralized to a distributed ar-chitecture. A more detailed account representation of the baseline is presented inAppendix A.

3.2 Solution

In the last section, we have presented the performance evaluation of the currentOpenRefine implementation, assessed its scalability and concluded with a descrip-tion of the problem. This section presents a solution to the aforementioned prob-lem, describes the issues faced during its implementation and explains choices thathave been made.

To overcome the hardware limitations of a single machine, we have to dis-tribute our data and processing operations over a cluster of machines and makethe result available to the user. To use cluster systems, the computations to beperformed must be partitioned into several parts of reasonable size and assignedto the available resources for execution. The design of our solution starts withthe decomposition of computations into several parts, called tasks, which can be

Page 61: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 61

computed in parallel on the cluster resources.As described earlier, in OpenRefine, data is kept in the form of a collection

of rows, therefore our solution aims to perform raw data split based on rows intoa number of pieces. By analyzing the core transform’s algorithms, we becomeaware that most of the transforms have potential data parallelism, therefore canbe split into parallel tasks to operate on parts of the dataset. However there isalso a second class of transforms that can’t operate on parts of the dataset, as theyare dependent on the information contained in the other data parts. Such classof transforms require special effort in synchronization and coordination betweenparallel tasks. Section 3.2.1 presents an overview of our distributed approach andpresents a roadmap for its implementation.

3.2.1 Distributed approach: an overview

The crucial limiting factors for the scalability of OpenRefine are: the amount ofoperational memory and the processing time. Consequently, we have to come upwith another method of storing and processing data that would be scalable andwouldn’t require severe modifications to the original implementation. In the mail-ing list of OpenRefine community [42] we have seen an approach to overcome thememory limitations by backing OpenRefine with a database to store the data.However, these attempts didn’t get any support from the community and wereabandoned. The reason for that is that this approach aims to overcome memorylimitations by externalizing the data storage, however the performance and scal-ability are still limited by the time required to access data in a database and bya single processing unit. Load data into a parallel database and apply transfor-mations within it by using a query language facilities is another approach that weconsidered, but was abandoned for the following reasons:

• Structured Query Language (SQL) used for accessing and managing data inparallel database systems is too limiting to express the transformations thatare already available in OpenRefine;

• databases are designed for long time-storage under the same schema that isdefined when data is loaded. Data transformations used in OpenRefine mayrequire frequent schema changes, that would compromise the performanceof a relational database. For instance, Columnize by Key/Value operation,mentioned in Table 3.1 has to add as many new columns to the table as thereare rows in the key column. Therefore, it is very unlikely such operation canbe executed efficiently within a database;

• general-purpose database management systems are more suitable for finegrained data access/updates (get one record), but have limited performance

Page 62: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

62 CHAPTER 3. APPROACH

for coarse grained ones (iterate through all the records).

Therefore, databases can not serve as a platform for scaling OpenRefine and wehave to find another method for storing and processing large data sizes, while re-taining OpenRefine core functionality.

In this section we present our own distributed approach to solve the scala-bility problem of the OpenRefine engine. We consider that keeping data of largesizes in the memory of a single machine is seen as a primary constraint fro theengine scalability. Our approach aims to move towards a distributed architectureand exploit data parallelism by distributing both data and computations over acluster of machines. Distributing data for parallel computation requires it to besplit. From Section 2.5.2 we know that OpenRefine stores all data in the formof a table which in turn is a collection of rows. Consequently, we decide to splitOpenRefine’s tabular data based on rows into a number of pieces (chunks), suchthat each chunk can be processed by an independent copy of OpenRefine engine.This approach requires to fully grasp how transformations are applied to data andwhat are the data access patterns for various transforms.

Transform classificationWe start by dividing all the core engine transforms (Table 3.1) in two groups ofembarrassingly parallel and non-embarrassingly parallel transforms.

Embarrassingly parallel transforms are those for which little or no effort isrequired to separate them into a number of parallel tasks [18]. In our case, werefer to operations where each single split contains all the data required to applya transform. For example a transform that operates independently on each row,such as “Text transform”, where the same function is applied to each cell value ina column independently and therefore does not depend on values from other rows(chunks).Non-embarrassingly parallel transforms are those that can not be easily splitup into parallel tasks, as they may require the results from a preceding transfor-mation step or information from another chunk. For instance, “Fill down cells”operation iterates from top to bottom of a particular column while keeping track ofthe last non-blank cell value to fill a subsequent empty cell with it. On Figure 3.7we show a situation where “Fill down cells” is applied on the ‘Insegna’ column.“Case A” shows how this transform is applied correctly on a single table, whereas“Case B” applies the same transform independently on two chunks. In particular,we observe that the transform is applied correctly on the first chunk, however thefirst row of the second split has a blank cell in ‘Insegna’ column and therefore hasto be filled with the previous non-blank cell value, which is not available within

Page 63: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 63

this split. We say that such transforms have dependencies between chunks, andtherefore require individual investigation in order to understand the nature of thedependency. This understanding allows to analyze and solve the dependency withmaximum efficiency in order to provide a high degree of parallelism.

Figure 3.7: Fill down cells: Case A - correct, Case B - two chunks, incorrect

Table 3.3 shows which of the transforms from Table 3.1 belong to embarrass-ingly parallel or to non-embarrassingly parallel transforms.

Embarrassingly parallel Non-embarrassingly parallelColumn addition Blank down cellsColumn move Fill down cellsColumn remove Columnize by Key/ValueColumn split Transpose cells in rows into

columnsMass edit cellsText transformTranspose cells across columns

into rows

Table 3.3: Transform classification

Page 64: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

64 CHAPTER 3. APPROACH

Solution roadmap. In this work our main focus is devoted to the solution andits implementation for the embarrassingly parallel operations, that we present inSection 3.2.2. Further, in Section 3.2.3 we review all of the four non-embarrassinglyparallel transforms and provide a sketch for their solution. We summarize thesolution and discuss the work that will be done next in Section 3.2.4.

3.2.2 RefineOnSpark: solving the embarrassingly paralleltransforms

To migrate OpenRefine to a distributed platform, some architectural decisions haveto be made. As mentioned in the previous section, our approach aims to distributeboth data and computations to be performed in parallel on a cluster of commod-ity hardware. By analyzing state-of-the-art approaches aimed at processing largedatasets in a distributed manner, we found that the most well-established modelapplied to such problems is the MapReduce computational model [22]. From itsdescription in Section 2.3.1, we know that it requires the computations to be ex-pressed as a set of map() and reduce() functions. Therefore, we have to find away to express our problem as a set of such functions.

Expressing OpenRefine data processing in terms of MapReduce tasksTo express our data processing in terms of MapReduce tasks, we decide to have alook at the problem from a very high-level perspective. By this we mean that we donot want to immediately introduce substantial changes into the engine but rathertry to express computations by looking at the engine as an integral component.One of the main reasons to favor this view is that it will allow to retain the supportof current and future engine developments and numerous available extensions.Another important reason is the reduced initial implementation complexity fromwhich we can benefit until the model will prove to be successful. Therefore, ourtransition from a centralized to a distributed architecture has to rely on multiplecopies of the OpenRefine engine, that we show schematically on Figure 3.8.

From a high-level perspective OpenRefine engine can be expressed as a singlefunction, that takes input data and transform description as arguments and returnsthe processed result:

performTransform(inputData,transformDescription)

Adopting this view to the aforementioned distributed model, consisting of multiplecopies of OpenRefine (Figure 3.8), we have to assign each engine copy with a partof the dataset and instruct the engines to perform the required transformationsindependently on each input data split. A graphical sketch of such procedure isshown on Figure 3.9.

Page 65: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 65

Figure 3.8: Transition to distributed architecture represented by multiple copiesof OpenRefine

Referring back to Section 2.3.1, all the computations within a MapReduceprogramming model are performed on key/value pairs, with the help of map andreduce functions. Map function is used to perform arbitrary transformations onthe key/value pairs and to emit intermediate key/value pairs. Whereas the reducefunction merges the intermediate key/value pairs associated to the same key. Forour application, we suggest to split the input file into pieces and to represent eachpiece as a separate key/value pair, where the key is the position number of thepiece and value is the actual contents of it. Our computational problem can bedescribed as a single map() function, as shown in Listing 3.1.

Listing 3.1: Data transformation as a single map function

map(inputSplit, transformDescription) {

performTransform(inputSplit, transformDescription)

}

In the MapReduce environment, the master node schedules map tasks in a waythat a worker node gets assigned part of the input data to apply the map functionon it. Such implementation of a map(), aims to delegate the processing of theactual data to the local copy of OpenRefine engine and return the result back tothe map function call. As the worker node finishes to process the assigned maptask, it stores the result locally and informs the master about the completion.When all the worker nodes have finished processing their tasks, the result of eachof the parallel “map” can be collected and used to assemble the result as a singlefile.

Regardless it is a very high-level description and we omit lots of importantdetails, it provides a simple description of our approach to express an inherently

Page 66: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

66 CHAPTER 3. APPROACH

Figure 3.9: Instruct parallel copies to perform transformations independently oneach split

sequential OpenRefine data processing model in terms of the MapReduce pro-gramming model. In the next section we refine our model by applying it on aMapReduce-like open source framework.

A parallel OpenRefine on Apache Spark implementationTo move from a high-level description of the solution closer to implementation,we have to decide which of the available MapReduce frameworks to adopt as aplatform for our application. In Section 2.3.1 we present two large-scale dataprocessing frameworks, namely Apache Hadoop and Apache Spark. Hadoop is theclosest implementation of the Google’s proprietary MapReduce framework [22],whereas Apache Spark is a more recent framework providing several additionalabstractions for large scale data processing.

We favor Apache Spark framework as a platform for our application for sev-eral reasons. First of all, we find Resilient Distributed Dataset (RDD) a moresuitable abstraction to represent our tabular data as opposed to key/value pairs inHadoop. Instead of key/value pairs, RDD allows us to represent our input datasetas a collection of objects, which we use to create a collection of ‘Rows’. Moreover,along with RDD abstraction, Spark provides a convenient API to operate on data(e.g. count, collect, mapPartitions), whereas Hadoop is strictly limited by mapand reduce functions. Another important feature provided by Spark is the abilityto keep data in memory between computational stages, the use of which will beexplained later in this chapter, within the discussion of the non-embarrassinglyparallel transforms. The least important but convenient aspect is that from our

Page 67: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 67

personal experience we found that setting up and configuring Spark cluster hap-pens to be easier than the one of Hadoop.

A Spark cluster consists of a single master and a number of worker nodes.A sketch for how we access cluster resources is shown on Figure 3.10. We refer toa transformation request from the user, as a job, which gets split into a number ofparallel tasks in the cluster. To use cluster resources we have to develop a DriverApplication that connects to the master and submits the required information tocreate a job on the cluster. The information to create a job is the following:

• the location of the input data;

• integer N, specifying on how many pieces to split the input dataset;

• description of the transforms to be applied on the input dataset;

• destination, where to save the result.

Figure 3.10: OpenRefine within Spark cluster resources

The master node is in charge of scheduling tasks to be performed on each split(chunk) of the RDD on worker nodes. As mentioned previously, we consider Open-Refine as an integral component and in terms of our distributed model we see itas a single component with a performTransform() interface. However, originalOpenRefine implementation doesn’t provide such an interface, it is designed in theform of a web service and can be accessed only using its HTTP API. At this pointwe consider two options to provide such interface to OpenRefine:

• embedded engine: develop a modified version of OpenRefine engine bystripping off the web server part and wrapping the engine’s subsystem into acontainer with the required interface. This approach can be seen as a Facadedesign pattern [43] and we refer to its implementation as BatchRefine.

Page 68: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

68 CHAPTER 3. APPROACH

• remote engine: implement a client with the required performTransform()

interface and keep the original OpenRefine engine unmodified. The clientprovides access to the OpenRefine engine via its HTTP API, but hides all theinteraction between them. We refer to the implementation of this approachas Refine HTTP Client.

Initially we thought of the embedded engine as a more suitable approach becauseof the following reasons:

1. getting rid of the web-server burden would improve the performance andresult in a version of the OpenRefine engine with reduced memory require-ments;

2. embedded engine can be integrated as a library, therefore reduce the overheadof handling HTTP connections.

To follow the embedded engine approach, we developed a modified version of theoriginal OpenRefine engine, namely BatchRefine. However, during the implemen-tation we realized that by getting rid of the web-server we also loose the automaticsupport of OpenRefine extensions. Hence, each extension has to be manually con-figured to work with BatchRefine. At this point we have a trade-off between theout-of-box extensions support for the remote engine approach and the performancegain we expect to get from BatchRefine. Therefore, we decide to perform a seriesof performance comparison tests between the original OpenRefine version, usingRefine HTTP Client, and BatchRefine. Our objective is to assess how much ofperformance gain we can get from BatchRefine. For the detailed description andresults of the performance comparison we ask the reader to refer to the Appendix B,whereas in this section we present only the conclusions:

1. performance comparison showed that removing the web-server and callingthe underlying OpenRefine methods directly doesn’t improve the overall per-formance of the engine. Therefore, the overhead introduced by the web-serverpart is negligible with respect to the load of the actual transformation task.

2. the difference between using HTTP protocol to interact with OpenRefine orby directly calling methods within the same JVM is not noticeable from thecomparison results.

We think that integrating OpenRefine engine into our distributed platform is nota necessary task at this point. As it will increase the complexity of the imple-mentation, eliminate automatic support of the OpenRefine extensions and doesn’tprovide noticeable performance gain in return. Hence, we decide to use the Refine

Page 69: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 69

HTTP Client that provides our application with the required interface to OpenRe-fine engine, which we assume to have running locally on each of the worker nodes.From the sketch of our distributed architecture, shown in Figure 3.10, we can seethat each worker node has its own OpenRefine engine, which is accessed using theRefine HTTP Client.

So far, we presented all the components of our distributed application andthe interactions between them, but an important aspect of data distribution andsplitting has not been described yet, which we explain next.

Data partitioning and distribution. To process data on a Spark cluster, inputdata has to be made available across the cluster, where it is split and loaded intoworker nodes for processing. We begin with a description of our input data splittingmethod and then describe our experience with distributing data across the clusternodes.

Splitting of the input dataset is based on rows, such that each piece con-tains only complete lines and the resulting chunks are of nearly equal size. Equalsize splits are important so that each independent resource takes approximatelythe same workload and consequently the same time to perform a transform onits part. Partitioner is a logic responsible to compute correct splits of the inputdata using a specific user-defined partitioning function. In this way, partitionsare the equivalent of chunks, but partitioned (split) with some logic. In our im-plementation partitioning is performed wit a fixed block size, which assures thateach partition is less or equal to a specified amount of megabytes (any multiple ofthe unit byte). Therefore, number of partitions (N) can be computed in the wayshown in Equation 3.2.

N =

⌈Total file size

Block size

⌉(3.2)

We don’t go into the detail of how partitioner deals with lines that appear brokenacross two successive blocks and refer the reader to consult ‘TextInputFormat’ ofHadoop input formats [26]. However we focus on another important partitioningproblem that we faced during our implementation.

When partitioning a Comma Separated Value (CSV) file, an important partof it, is the header, typically on the first line of the file. After partitioning, theheader appears only in the first partition (Figure 3.11). In this case, only the firstpartition can be correctly processed by OpenRefine, whereas partitions without aheader would fail to be processed. To solve this issue, we need a way to make aheader available at all the worker nodes before submitting the assigned partitionto the OpenRefine engine. To accomplish this goal, we adopt Spark’s broadcastvariable abstraction, that can be used to distribute a header to all the workernodes before each processing task. Each worker node before submitting the as-

Page 70: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

70 CHAPTER 3. APPROACH

Figure 3.11: After partitioning header appears only in the first partition

signed partition to OpenRefine checks if the assigned split contains the header,by comparing the chunk beginning with the broadcast variable value. If it doesnot contain the header, than the header is added to the beginning of the partitionbefore submitting to OpenRefine, but removed again after the result is returned.This simple approach solves the aforementioned problem with very low overhead.

Distributing data throughout the cluster is another problem to be consid-ered. Initially, we relied on the fact that data would be available to all workermachines by means of a Network File System (NFS), shown on Figure 3.12. Inthis case there is a single dedicated machine within the cluster that stores thedata (NFS Server), which is attached over the network as a virtual local storageon the worker machines. In this way, all the read/write operations are performedover the network, but seen as a local operation by the clients. Whenever a worker

Figure 3.12: Network File System

is assigned a partition to process, it reads the assigned part of the file from theNFS server, over the network. To output the results after processing we reliedon the convenience provided by Spark, collect() operation on the RDD, whereeach worker node sends the result directly to the driver application. Using NFSand collect() method is a convenient and simple approach in terms of imple-mentation, but not can’t provide the required scalability feature. It showed to bea bottleneck of our system for several reasons:

Page 71: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 71

• concurrent read over the network from a single server wouldn’t scale to thelarge number of worker nodes;

• using the collect() function on the RDD, requires all the data to fit intothe memory of a driver program, which is not a suitable implementation,when dealing with large data sizes.

With an increasing number of worker nodes, the performance of the NFS becomesvery poor and we have to think of a different approach to make the data availableon all the cluster nodes. Therefore, we require a convenient method to performconcurrent read/write operations with high throughput on a cluster of commoditymachines.

As described in Section 2.3.2, HDFS precisely fulfills the aforementioned re-quirements and is designed to serve large datasets to the Hadoop’s MapReducetasks.

Complementing architecture with HDFS. To provide a distributed storagesystem to the Spark cluster, we configure a Hadoop Distributed Filesystem to workon the same pool of machines. The resulting distributed architecture, shown onFigure 3.13, is complemented by the HDFS nameneode on the Spark master node,and by HDFS datanodes on each of the cluster worker nodes.

Figure 3.13: Cluster architecture with HDFS

Complementing our architecture with HDFS enables it to have a reliablestorage that provides a filesystem with high throughput on a cluster of commodityhardware and good support for MapReduce I/O patterns. In particular, HDFShandles data by splitting it on equal size blocks, and replicating them on severaldatanodes over the cluster for redundancy. Therefore, blocks of the file are located

Page 72: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

72 CHAPTER 3. APPROACH

on the same machines, where the actual data processing happens. By aligningthe size of a partition on a Spark job with the size of the HDFS block, Sparkmaster can exploit namenode’s global knowledge to assign tasks in a way thateach worker already has the required data locally. We consider locality-awarescheduling, a crucial feature of our system, as it allows to reduce the amount ofdata to be transmitted over the network.

Application workflow and preliminary result. To summarize the descriptionof our distributed application, we provide a step by step overview of the workflowon a real-world transformation example. We also provide the preliminary per-formance estimates for a particular workload and compare it to the centralizedversion of the engine. As a workload for this example we use a 2 million lines file(158 Mb) and apply a “Column addition” operation on it. We set up a Sparkcluster with HDFS consisting of a master and 14 worker nodes, and configure theHDFS blocksize to a the of 16 Mb.

Figure 3.14: Application workflow

The whole transformation workflow, shown in Figure 3.14, has the followingsteps:

1. write the input file to HDFS. The 158 Mb file gets split, according to Equa-tion (3.2) into 10 blocks, where each of the blocks is written to the workernodes local disk;

2. a request is submitted to the diver program, containing the location of theinput and the required transform description;

3. driver program submits the job to the Spark master, specifying the inputlocation and a map function (Listing 3.1) to be performed on each partition;

Page 73: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 73

4. Spark master contacts the HDFS namenode to get the information aboutthe locations of the input file blocks. HDFS namenode, exploiting its globalknowledge of the distributed filesystem, provides the location (ip-address orhostname of the node) for each of the 10 blocks to the Spark master;

5. Spark master schedules map tasks on the worker nodes to be performed oneach of the 10 partitions of the file. Map tasks are assigned in a way that aworker nodes has to process the partition, which is stored on its local disk.

6. on each worker node, an instance of the Refine HTTP Client reads the as-signed partition from the local disk and, if required, adds the header to thebeginning of the partition;

7. Refine HTTP Client streams the prepared partition and the transform de-scription to the OpenRefine engine running locally on the same machine viaHTTP connection. OpenRefine performs the required data processing andstreams the result back to the Refine HTTP Client;

8. each Refine HTTP Client, removes the header from the partition if it wasadded before sending data to OpenRefine and writes the resulting partitionto HDFS. Writing results to HDFS is performed asynchronously, such thateach worker node writes its own result independently from other workernodes;

9. the result is stored in HDFS as 10 partial text files, one for each of the maptasks. The result is made available to the user by reading and reassemblingthe partial files.

We processed the same workload both on the distributed and centralized en-gines to get the preliminary comparison of the engines performance on exactly thesame workload. For evaluating parallel systems, one of the common performancemeasures is the speedup [44]. Speedup on a problem of size X with N machines isequal to the ratio between the time it takes to solve this problem on one machineand the time it takes to solve the same problem on N machines (Equation 3.3).

speedup(N,X) =time(1,X)

time(N,X)(3.3)

Distibuted and centralized engines have different events that can be measured,therefore in Table 3.4 we show the events that we measured for each of the enginetypes. In particular, for the centralized version of the engine we can separatelymeasure the time to import the data into the engine, time to process and to exportthe data. Consequently, for the centralized engine, the total time to apply trans-formation on data is equal to the sum of this three events. For the distributed

Page 74: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

74 CHAPTER 3. APPROACH

architecture, the measurements are more coarse grained. In particular, we canmeasure the time required to load the data into HDFS, the time it takes to per-form a map task on each of the workers, total job execution time, time required toretrieve and reassemble the result from HDFS. Total job execution time is mea-sured from the point a user submits the job description to the driver application(number 2 in Figure 3.14) up to the time each worker finished writing the resultto HDFS (number 8 in Figure 3.14). However, communication and schedulingdelays are not easy to be measured. For the distributed engine we compute thetotal time to perform a transform as a sum of the following events: load data intoHDFS, total job execution time and the time required to retrieve and reassemblethe result from HDFS. For the comparison between the two engines we consideras a fair evaluation method, the comparison of the described total time values.

Centralized Time [s] Distributed Time [s]Load data into the

OpenRefine3.878 Load data in HDFS 2.554

Apply transforma-tions on the data

4.508 Average map task exe-cution time (load datato OpenRefine, pro-cess data in OpenRe-fine, output data fromOpenRefine, write re-sult to HDFS) on allthe worker nodes

1.599

The longest map tasktime

2.174

Total job executiontime (all the maptasks + schedulingand communicationdelays)

2.614

Export result fromthe engine

3.493 Read and reassemblethe result from HDFS

1.897

Total time3.878 + 4.508 + 3.493 = 11.879 s 2.554 + 2.614 + 1.897 = 7.064 s

Speedup = 1.68

Table 3.4: Measured events in centralized and distributed engine versions

The numbers presented in Table 3.4 were obtained from processing the afore-mentioned workload on 10 machines of a Spark cluster, using our distributed en-gine. To compare to the centralized engine, we adopt the measurements from ourbaseline tests (Appendix A). An estimate of the speedup for this particular work-load and setup is equal to the value of 1.68, which means that we can outperform

Page 75: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 75

the centralized engine’s performance even for such small datasets (158 Mb). Weunderline that this are only the preliminary results for this particular workload andcluster setup. Performance of the distributed engine largely depends on the con-figuration and cluster setup. The detailed performance evaluation, configurationand performance tuning options are presented in Chapter 4.

3.2.3 Solving the non-embarrassingly parallel transforms

In contrast to the embarrassingly parallel operations, non-embarrassingly parallelare more difficult to be split on a number of parallel tasks. The constraint forthis are the dependencies that arise, when an inherently sequential task is splitinto a number of parallel tasks. Therefore, additional effort is required for thedependencies resolution and synchronization between the parallel tasks. In thissection, we discuss the problem for four of the non-embarrassingly parallel trans-forms, reported in Table 3.3. Our objective is to identify the dependencies and toprovide a sketch to the solution for each of the transforms.

Blank down cells operation is used to eliminate consequently repeating cellvalues in a column. From example in Figure 3.15, we can observe it being appliedon the ‘Year’ column, where cell values on 2nd and 3rd rows are set to blank as theyhave the same value of the cell int first row. This operation reads all the columnfrom top to bottom, one cell at a time, while keeping track of the previous cellvalue by storing it in a variable. For each subsequent cell, its value is comparedto the stored value of the previous non-blank cell. The current cell value is setto blank in case it is equal to the stored previous cell value. In this way, on theboundaries of two partitions, there is a dependency on the previous partition’s lastcell value (after it has been processed). However, our solution shows that it is notrequired to wait for the previous partition to complete the processing.

Figure 3.15: Blank down cells operation

To solve this dependency, we assume that each of the parallel tasks knowsthe location of the preceding partition and knows how to read it. In this way,

Page 76: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

76 CHAPTER 3. APPROACH

all the worker nodes that receive a task with this operation, start by reading therequired column in the previous partition. Reading is performed in the reverseorder, until the first non-blank cell is found in this column, and then can be usedto check how to proceed with the blank down operation on this partition.

Fill down cells operation has a similar algorithm to the previous “Blank down”one. It is used to fill blank cells in a column with the last non-blank precedingcell value. From example in Figure 3.16, this operation is applied on the ‘Year’column, where the cells of this column at rows numbered 2,3 and 4 are blankand therefore are filled with the cell value from the first row. As opposed, to the“Blank down” operation, for this operation dependencies between parallel taskson partitions occur only in the case when the partition’s first row has a blank cellin the required column.

Figure 3.16: Fill down cells operation

To solve this operation, we initially thought of introducing additional intel-ligence into the Partitioner logic, such that a partitioner prevents a partition tohave blank cells in the first row. However, this approach can work only when acolumn has a small number of blank cells, otherwise it may result in very largepartitions of unequal size. Therefore, we decided to leave the partitioning as itis and came up with another solution. For this solution we assume that each ofthe parallel map tasks knows the location of the preceding partition and can readfrom it. In case a cell of the partition starts with a blank value, it is requiredto access the contents of the previous partition. Reading of the specified columnin the previous partition is performed in the reverse order, until a non-blank cellvalue is found. This value is used to fill an empty cell in the current partition.

Columnize by Key-Value is a transpose type of operation, for which a userspecifies two columns in the source file, a key and a value column. Cell valuesfrom the specified key column are used to create as many new columns (cell valuebecomes the title of a new column) as there are unique, non-blank cells in the keycolumn. Cells from the value column remain at the same row, but are moved to

Page 77: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 77

the corresponding key column (which were on the same row in the original table).Columns, not being selected as neither key or value, are kept unchanged and areall shifted to the left.

This type of transform can be better explained using a graphical examplein Figure 3.17. The original table contains a list of 8 leisure places with theirlocations and type of place it belongs to. The key/value columnize operation canbe used to group all the places names that belong to the same type in a singlecolumn. To achieve this, we select the column ‘Tipo’ as a key and ‘Insegna’ as avalue column. After the operation is applied on the original table, we can observethat columns ‘Comune’ and ‘Indirizo’ are shifted to the left and 3 new columnsare created from the unique values of the key column. Cells from the value columnare shifted into the corresponding newly created key column. From the exam-ple you can see that in the original table “AL FAGGIO” from ‘Insegna’ columncorresponds to “Restaurant” of the ‘Tipo’ column. Therefore, in the resultingtable “AL FAGGIO” is palced into the ‘Tipo’ column. An important peculiarity

Figure 3.17: Columnize by key/value example: key column - Tipo, value column- Insegna

of this operation, is the case when a key column has empty cells, in this case anew key column won’t be created and the cell value in the value column would beskipped. Therefore, the way the preparation algorithm works, doesn’t allow it tobe performed on parts of the file. A global knowledge of the whole key column isrequired, to create the new columns and to correctly place the cells from the valuecolumn.

To solve this problem, we consider dividing the transformation process intwo stages:

Page 78: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

78 CHAPTER 3. APPROACH

1. at the first stage, the whole key column is processed to eliminate blank cellsand to extract only the unique entries out of it (can be processed locallyor distributed, depending on the size of the table). The obtained values,being exactly the names of the new columns that have to be created in theresulting table, are distributed to all the worker nodes.

2. at the second stage, parallel workers are assigned with a piece of the inputtable and perform the following steps:

(a) removes the key and value column out of the table, but keep their valuesand the row number structured as triples. Such that each triple can beaccessed later;

(b) create new column for each of the values obtained from the first stage;

(c) Go through each triple of this partition and place the value on the samerow, but into the column corresponding to the key of the triple.

The results from the second parallel stage can be reassembled into a singletable by appending them in the correct order. Regardless, the description of thesolution is very high-level, we consider it relatively easy to be implemented.

Transpose cells across columns into rows operation is best described usinga visual example. Let’s have a look on the example presented in Figure 3.18. As

Figure 3.18: Transpose 3 cells across column B into rows

you can observe from the example, every 3 cells from column B are rotated andadded into 3 new columns horizontally. As this operations is simple sequentialrotation of a subset of cells in a column it can be solved by correctly partitioningthe file, in a way that transposition wouldn’t be broken into different partitions.To achieve independent partitions, an input file has to be partitioned in a waythat number of rows in a chunk equals to the multiple of N cells to be transposed.Moreover, if the total number of rows at the input is not multiple of N, only thelast partition may have arbitrary rows count. Let’s describe a parallel solution

Page 79: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

3.2. SOLUTION 79

for the example in Figure 3.19, where we transpose 3 cells across column B. Wepartition an input table of 9 rows into two chunks of 6 and 3 rows. Further the

Figure 3.19: Transpose 3 cells across column B in parallel on two partitions

transpose operation is applied independently on each chunk and the results areconcatenated by appending the chunks in the right order.However, the limitation of such approach is that the number of cells to transposecan be chosen by the user close or equal to the total number of rows. In thiscase the partitioning of the input would become inefficient and would require largepartitions.

Conclusions. In this section we have reviewed four of the non-embarrassinglyparallel operations and reasoned about their dependencies. For each transformwe provide a sketch of the solution, but not the implementation. However, whendiscussing the solution, our objective was efficiency and ease of implementation.As the implementation for non-embarrassingly parallel transforms is the part ofthe work that we are going to implement in the nearest future.

3.2.4 Distributed engine: summary and outlook

This chapter has discussed the performance and scalability limitations of OpenRe-fine engine. A series of microbenchmarking tests were performed to better identifythe shortcomings of its architecture. This approach led us to a conclusion that aprior limiting factor for the ability to handle large input files is the lack of avail-able hardware resources, mainly memory. We also observe that because of theinherently sequential data transformation algorithms, time required to completea transform increases linearly with the input file size, regardless of the sufficientamount of available memory. This led us to a conclusion that the key to the scal-abale version of OpenRefine is the distributed parallel processing. Therefore, we

Page 80: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

80 CHAPTER 3. APPROACH

have implemented a transition of the OpenRefine engine to a distributed archi-tecture, based on a MapReduce-like framework, namely Apache Spark. Currentdistributed engine implementation supports most of the operations of the Open-Refine core engine functionality. However, there is a class of operations, that arecurrently not supported. Next chapter presents the performance evaluation ofthe newly developed distributed engine and compared to the original OpenRefineversion.

Page 81: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Chapter 4

Evaluation

In Chapter 3, we have discussed the OpenRefine scalability problem and pre-sented a solution to it. Our solution aims to overcome these limitations by movingOpenRefine to a MapReduce-based distributed platform that allows to distributedata and processing tasks over a cluster of machines. Embarrassingly paralleltransforms can be treated uniformly and are eligible to be solved with a generalsolution, whereas non-embarrassingly parallel ones require a custom solution foreach of the transforms. For each of the non-embarrassingly parallel transforms,we provide a sketch to their solution in Section 3.2.3, whereas for the embarrass-ingly parallel operations we provide a fully-functional implementation, which wedescribed in Section 3.2.2. Our implementation is a software application based onthe Apache Spark [27] cluster computing framework.

In this chapter we evaluate the performance of our application by applying iton a real-world cluster of commodity machines. We begin with the description ofthe experimental setup, which includes: the hardware characteristics of the cluster,the utilized frameworks and the relevant configuration parameters. Further, weprovide metrics for the performance evaluation and the procedure of comparingour application performance with the baseline. Section 4.3 presents the results ofthe evaluation that we discuss and summarize in Section 4.4.

4.1 Experimental setup

To test our application we require a set of computers connected through a LocalArea Network (LAN), which are used to set up an Apache Spark [27] cluster. Ourapplication’s architecture also relies on HDFS to store input data and to write theresults on it. Therefore, the same pool of machines must be used to configure theHadoop [25] distributed file system, such that each worker node of the cluster isalso the datanode of the HDFS. Both of the frameworks require a single dedicated

81

Page 82: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

82 CHAPTER 4. EVALUATION

node in the cluster, used as the ‘master’ for Spark and the ‘nameserver’ for HDFS.Moreover, as described in Section 3.2.2, our solution relies on independent copiesof the OpenRefine engine running locally on each of the worker nodes. Hence, todescribe the exact test bed, we provide a description of the hardware and softwareconfiguration for both master and worker machines.

Hardware description. The University of Trento provides an opportunity touse the computer classroom’s machines for scientific experiments and computa-tions during the night time period. These machines are located in three differentcomputer classrooms in the same building and are interconnected through a 1Gbit Fast Ethernet link, but the topology of the network is not known. More-over, the computers are not identical in terms of hardware characteristics, shownin Table 4.1, there are two different possible types of CPU on these machines. Theallocation of computers to be used as a cluster is transparent to us, and thereforethe cluster may consist of machines with different CPUs and relatively distantlocations within the LAN, that is typical for commodity clusters.

Worker node characteristicsCPU Intel R© CoreTM i5, 3.06 GHz or

Intel R© CoreTM 2 Quad, 2.66 GHzRAM 4 Gb RAMOperating system Ubuntu Linux 12.04 (3.2.0-64)

Table 4.1: Worker node characteristic

For the master node of the cluster, we use a different machine (Table 4.2),located in the same building and connected to the workers through a 1 Gbit FastEthernet, but wit better hardware characteristics. The decision to use a different

Master node characteristicsCPU Intel R© CoreTM i7, 3.4 GhzRAM 16 Gb RAMOperating system Ubuntu Linux 12.04 (3.8.0-42)

Table 4.2: Master node characteristics

machine for the master node was mostly taken for convenience, as we had fulladministrative permissions on it, whereas only limited permissions on the workernodes.

Software and configuration. Both master and worker nodes have the sameversion of the Ubuntu Linux (12.04) operating system and the same version of theJava Runtime Environment (OpenJDK 7). Figure 4.1 shows our master-worker

Page 83: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

4.1. EXPERIMENTAL SETUP 83

cluster architecture and what are the different software components that have to beinitialized on the master and worker nodes. To setup the cluster, we used the mostrecent versions of the software packages available at the moment of the experiment:Apache Spark [27] (1.0.2), Apache Hadoop [25] (1.2.1) and OpenRefine [4] (2.6).On each worker node, OpenRefine is configured to utilize 2 Gb of memory as itsmaximum heap size. This is the maximum memory we could allocate to the actualdata processing, as the remaining 2 Gb have to be shared between the operatingsystem, Spark worker and the HDFS datanode. HDFS block size is configured to16 Megabytes and the replication factor set to 3, which is the default value for theHDFS. Such HDFS configuration enables each file in the distributed filesystem tobe split on 16 megabyte chunks and to store 3 copies of each chunk across thecluster datanodes.

Figure 4.1: Blank down cells operation

For this experiment, the driver application for the Spark cluster (RefineOn-Spark), is run on the same machine as the Spark master node, to which it connectsto. Setting up a cluster is performed in the following order:

1. start the Apache Spark master and the HDFS namenode and bind them tothe machine’s local IP address but different local ports (use default ports);

Page 84: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

84 CHAPTER 4. EVALUATION

2. establish a remote terminal connection to each of the worker nodes andperform the following tasks:

• start the Spark worker instance specifying the master’s ‘IP address:port’;

• start the HDFS datanode instance specifying the namenode’s ‘IP ad-dress:port’;

• start the OpenRefine engine and bind its HTTP server to the ‘localhost’address;

3. on the master node, using the Spark’s administration UI tool, check that allthe workers are registered to the Spark master. Perform the same check forthe datanodes using the namenode’s UI.

4. start the RefineOnSpark driver program that connects to the Spark masterand is ready to be assigned work from the user;

The aforementioned steps are described for clarification and are available as asingle script, configurable for different clusters.

The RefineOnSpark driver program is designed in a way that it stays con-nected to the cluster and prompts the user to submit the workload (input filelocation, transform description). The workload and metrics used for the evalua-tion are described in the next section.

4.2 Evaluation and metrics

Workload. Operations for the evaluation procedure are all of the non-embarrassinglyparallel transforms (Table 3.3), which namely are: Column addition, Columnmove, Column remove, Column split, Mass edit cells, Text transform and Trans-pose cells across columns into rows. As input data we use the same tabular datafile from the microbenchmarking approach with 15 different sizes (from 1 millionto 10 million lines). Therefore, we have a total of 105 different workloads (15 filessizes and 9 transforms).

Metrics. The main metric used for the evaluation is the total processing time ,which is the time it takes to process the workload, from the moment it is submittedto the driver application, up to the moment the result is available on HDFS. Thesemeasurements are performed at the driver application and precisely represent theperformance perceived by the user of the application.

However, during our experiments we perform additional measurements: oneach of the worker nodes we measure the time it takes to process the assignedpartition. We refer to this metric as the worker time and measure it on each ofthe worker nodes independently. This measure includes the time a worker takes

Page 85: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

4.3. RESULTS 85

to read the assigned partition, submit it to the OpenRefine, wait for it to processthe data and receive the result. For our evaluation, worker time serves to estimatethe scheduling delay S, which is computed as the difference between the totalprocessing time Ttot and the largest of the worker’s processing time max(Wn),as shown in Equation (4.1).

S = Ttot −max(Wn) (4.1)

Scheduling delay is a varying, difficult-to-measure and imprecise metric, as it maybe affected by slower machines or congested networks, but still we expect it toprovide a view on the cluster’s management overhead that is added to the actualdata processing.

Comparison with the baseline. Part of the workload with the input files from1 to 4 million lines can be compared to the baseline (Appendix A), established atthe time-dimension scalability tests in Section 3.1.2. The performance baseline isrepresented only for the workloads with the input data of 1 to 4 million lines, as forlarger input sizes we couldn’t provide the measurements unaffected by increasingGC overhead. We compare the total processing time of the centralized engine andthe total processing time for the same workload using our distributed solution.The metric used to compare with the baseline is the speedup, which we havedescribed previously, using the Equation (3.3).

Workloads with larger input file sizes (from 4 to 10 million lines) are usedto analyse the ability to accommodate larger workloads, therefore to assess thescalability of our implementation.

4.3 Results

This section presents the results of our distributed engine evaluation and alsoprovides the comparison to the baseline. The total processing time for variousworkloads is shown in Figure 4.2 and 4.3 . To provide comparison to the baseline,for the input sizes from 1 to 4 million lines we also plot the baseline total processingtime on the same graph.

The number of partitions that the input is divided depends on the HDFSblock size configuration and is computed using the Equation (3.2). Partitions arescheduled to be processed concurrently, each on a separate worker node, providedthat the cluster is large enough to accommodate the node requirements. In Fig-ure 4.4 we can observe the utilization of the worker nodes depending on the inputfile size, such that we use 5 nodes to process 1 million file and 45 nodes to processthe 9 million one. In our experiments, the maximum number of utilized nodesis equal to 50 workers, for the 10 million lines file. Our cluster setup consists of

Page 86: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

86 CHAPTER 4. EVALUATION

Figure 4.2: Total processing time for the Distributed and Centralized engine

60 worker nodes and therefore all the partitions are scheduled to be performed inparallel.

Comparison with the baseline. The results in Figure 4.2 and 4.3 show thatwe have reached the desired performance goal to overcome the performance ofthe OpenRefine engine. This is true even for workloads that are not computa-tionally intensive. In particular, as it was discovered, using microbenchmarkingapproach in Section 3.1.2, the column reordering transforms (‘Column move’, ‘Col-umn reorder’) are independent from the input file size and are processed by theOpenRefine engine in a few seconds for all data sizes. However, the time it takesto load the input data and to export the result for the centralized engine, is largerthan to assign the same workload to be performed using our distributed engineimplementation.

In Table 4.3 and 4.4 we provide the numerical representation for these two lightweightoperations. We show the time it takes to apply these transforms for both of theengines, the average processing time for a single cluster worker, the schedulingdelay, the number of active workers and the achieved speedup. We can observethat even for the smallest considered input file (1 million lines) we can reach theresult of 190% speedup, whereas for larger input size files the distributed engineexhibits the speedup of almost 600%. This means that it takes 6 times less time

Page 87: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

4.3. RESULTS 87

Figure 4.3: Total processing time for the Distributed and Centralized engine

to perform the same task, compared to the original implementation.

Moreover, for larger workloads, we achieve even higher performance improve-ment compared to the centralized engine. For instance, the ‘Column split’ oper-ation (Table 4.5) showed that the distributed engine outperforms the centralizedimplementation by the factor of 10 on the 4 million input workload. Moreover,an important observation from the data in Tables 4.3, 4.4, 4.5 is that with anincreasing input file size, the average worker time does not increase, which is thedesired behaviour, when we add more resources to accommodate larger workloads.

Scalability evaluation. Presented results show that larger workloads can beaccommodated by splitting the total job on a larger number of tasks and there-fore increasing the utilization of cluster resources. From the Figure 4.2 we canobserve that this allows to keep the total processing time nearly constant for allthe workloads. However, we observe a small increase in the total processing timewhen reaching the larger sizes of the file. In Figure 4.2 it is noticeable for thefastest operations (less than 5 seconds), as the scale of the graph allows to noticethis small increase. However, from numerical values in the Table 4.5 we can seethat the total processing time for the distributed engine starts to increase withthe input values larger than 6 million lines. This slight increase in the total pro-cessing time is caused by the cluster managing overhead, as the cluster’s master

Page 88: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

88 CHAPTER 4. EVALUATION

Figure 4.4: Number of workers utilized for the input file size

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup1 3.96 2.03 1.29 0.74 5 1.95

1.5 5.64 2.17 1.24 0.93 8 2.602 7.41 2.55 1.36 1.19 10 2.90

2.5 9.10 2.44 1.28 1.16 13 3.733 10.80 2.35 1.30 1.06 15 4.59

3.5 12.38 2.56 1.25 1.31 18 4.834 14.64 2.51 1.33 1.19 20 5.82

4.5 - 2.52 1.36 1.17 22 -5 - 2.65 1.32 1.33 25 -6 - 2.89 1.32 1.57 30 -7 - 3.64 1.35 2.29 35 -8 - 5.11 1.34 3.77 40 -9 - 5.53 1.34 4.19 45 -10 - 6.14 1.37 4.77 50 -

Table 4.3: Numerical representation: column move transform

node has to schedule task to the larger amount of machines. Scheduling overheadis very difficult to be measured precisely, as it is influenced by the network delay,machines hardware and the scheduling algorithm of the master node. However, wepresent an estimate of the scheduling delay for the ‘Column split’ operation in thecorresponding column of the Table 4.5 and in Figure 4.5 we present the graphicalview on it. We observe that the worker time stays nearly at the constant value forvarious input sizes and the scheduling delay increases with the number of the nodesutilized for the job. The time required to schedule tasks on 5 nodes is around 1second, but this value increases to 6 seconds as it is required to schedule 50 workernodes. However, the larger the workload, the less noticeable the scheduling delaycontribution is in the total processing time. For the numerical results of all of the

Page 89: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

4.3. RESULTS 89

Input size [m lines] Centralized Distributed Avg. per Node [s] Scheduling Delay [s] N. workers Speedup1 4.03 2.07 1.35 0.72 5 1.95

1.5 5.67 2.20 1.34 0.85 8 2.582 7.51 2.34 1.38 0.97 10 3.20

2.5 9.18 2.45 1.25 1.20 13 3.743 10.93 2.51 1.32 1.19 15 4.36

3.5 12.80 2.42 1.30 1.12 18 5.294 15.15 2.50 1.33 1.17 20 6.05

4.5 - 2.76 1.38 1.38 22 -5 - 2.66 1.33 1.33 25 -6 - 3.06 1.36 1.70 30 -7 - 3.47 1.36 2.11 35 -8 - 3.42 1.37 2.05 40 -9 - 5.07 1.35 3.72 45 -10 - 5.99 1.35 4.64 50 -

Table 4.4: Numerical representation: column reorder transform

Figure 4.5: Average processing time per node and scheduling delay for variousinput file sizes

evaluated operations we invite the reader to refer to Appendix C.

Page 90: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

90 CHAPTER 4. EVALUATION

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup1 29.03 9.87 8.19 1.67 5 2.95

1.5 42.33 10.23 7.81 2.42 8 4.142 56.01 10.34 8.50 1.84 10 5.42

2.5 69.38 10.56 8.10 2.47 13 6.573 84.43 10.51 8.45 2.07 15 8.03

3.5 100.01 11.50 8.14 3.36 18 8.704 113.35 11.11 8.51 2.60 20 10.20

4.5 - 11.62 8.73 2.89 22 -5 - 10.93 8.47 2.46 25 -6 - 12.23 8.47 3.76 30 -7 - 13.22 8.49 4.73 35 -8 - 12.96 8.45 4.52 40 -9 - 13.85 8.45 5.40 45 -10 - 15.41 8.85 6.55 50 -

Table 4.5: Numerical representation: column split transform

4.4 Summary and outlook

This chapter has presented the evaluation of our work, in particular the perfor-mance of our distributed OpenRefine engine implementation. The performanceresults comparison with the baseline shows that we have achieved our objectiveand our implementation overcomes the scalability limitations of the original Open-Refine engine. We are able to achieve a speedup of 200%, even for the workloadsthat don’t require high computational resources, but are still limited with the slowimporting and exporting phase of the centralized engine implementation. There-fore, such results allow to consider the overhead required to schedule the tasks ona cluster of machines as relatively low, even for the “lightweight” operations. Itis important to notice that these particular results are obtained for the specificcluster setup and workload, however the scalability behaviour has to be the sameon any properly configured infrastructure.

The distributed engine implementation provides the ability to tune perfor-mance by choosing the appropriate partition size. Smaller partitions require lessworker time to be processed, however increase the number of tasks to be sched-uled on the cluster, whereas larger partition sizes increase the time required byeach worker, but decrease the total number of tasks. Therefore, the partition sizeshould be configured in trade-off between the worker time and scheduling over-head. These configurations also depend on the available cluster resources: sizeand worker hardware characteristics. It is important to notice, that if the numberof scheduled tasks is larger than the number of available workers, tasks would bescheduled sequentially. Sequential utilization of resources allows to process heavyworkloads even using a small number of worker nodes, but at the cost of reducedperformance.

Our implementation of the engine is a fully functional software applicationthat can be used to process all of the embarrassingly parallel transforms on a dis-

Page 91: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

4.4. SUMMARY AND OUTLOOK 91

tributed cluster of machines. The non-embarrassingly parallel transforms are notsupported in the current implementation of the engine, but we aim to continuewith the development of the proposed solutions for them. Moreover, our imple-mentation is an open source project freely available for download from the publicgithub repository [45].

Page 92: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

92 CHAPTER 4. EVALUATION

Page 93: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Chapter 5

Conclusions and future work

Nowadays, the importance of big data is hard to underestimate. There is a hugedemand in tools able to efficiently handle big data, which is typically not a trivialtask and requires substantial effort in the design of scalable algorithms and archi-tectures. Big data is either too big, too fast, too sparse or too dirty. This workhas been motivated by more than one of the big data problems, in particular whendirty data becomes too big.

we constantly hear people talking about big data, its usage, meaning andvalue. But somehow it is always spoken in the context of a problem. This happensfor a reason that the problem lies in the core of big data, it is that everyoneunderstands its endless potential but does not know how to benefit from it.

Numerous techniques are available to deal with dirty data, such as patternmatching, regular expressions, validation with external sources, etc., but imple-menting them for every new piece of data is difficult and time-consuming. There-fore, there is a great interest in the universal instruments and tools that facilitatethe work with messy data. OpenRefine belongs to the class of tool that can fulfilthe typical data-processing needs of messy data. However, its original implemen-tation was considered in the scope of relatively small datasets and therefore someof OpenRefine’s architectural decisions limit its ability to accommodate today’sworkloads. These limitations served as a motivation for this work: to study andto overcome these limitations.

In reviewing the state-of-the-art and later formalizing the scalability problemwe, came across two ways to solve it: vertical and horizontal scaling. Vertical scal-ing aims to solve the problem by providing more resources to the application usinga more powerful machine, while retaining the use of the same software with small orabsolutely no changes. It relies on a purpose-built high capacity mainframe com-puters with multiple CPUs and large amounts of memory, which obviously causeslarge investments. In this thesis we favour the horizontal scalability approach,which is more complex from the software point of view, but allows the use of dis-

93

Page 94: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

94 CHAPTER 5. CONCLUSIONS AND FUTURE WORK

tributed architectures of inexpensive machines. Moreover, to our knowledge, smallto mid-size companies rarely invest money in their computing infrastructures, butrather favour the on-demand computational resources provided by cloud comput-ing services. Last, but not least, there is the concept that no matter how powerfula single machine could be, two of those would always be twice as powerful, andbeing able to leverage these resources is always an advantage.

To provide the scale-out scalability to the OpenRefine we chose to adopt theMapReduce programming model, and built the new distributed data processingmodel for OpenRefine. During the implementation, we came across an importantproblem of data storage and distribution, when processing large datasets. How-ever, after some research and practical experiments we complemented our modelwith the GFS architecture, which perfectly fits our distributed model and allows toreduce the dependency of our system on the network. In our distributed model forOpenRefine, processing is performed at the location of the data, what is usuallyreferred to as “code brought to data”. Moreover, our model is built by viewingOpenRefine engine as a black box, and that allows us to retain the support of nu-merous OpenRefine extensions and future development versions. Our distributedmodel for OpenRefine supports most of the original engine functionality, howeverthere is a class of operations that are not currently supported in our implemen-tation. Each of these operations require an individual approach to resolve thedependencies that arise when trying to parallelize the current sequential imple-mentation, to which we refer as non-embarrassingly parallel. Our discussion onthese problems showed that all of them can be split into parallel tasks and stillmaintain a high degree of parallelism, but for some of operations there is also apessimistic scenario, where the dependencies would cause the tasks to be executedsequentially.

To provide the implementation of our model, instead of Hadoop we chose touse Apache Spark, which is a more recent MapReduce-like framework providingadditional convenient abstractions for distributed data processing. In particular,the abstraction that lies at the core of the framework is the Resilient DistributedDataset, which provides a convenient API that we used to design our data pro-cessing tasks. During the work conducted on this thesis we witnessed one majorand two minor updates of the framework, which means that it is well supportedand lots of developers have interest in it. This particular interest in Apache Sparkis explained with the fact that people are searching for an alternative to the goodold Hadoop, that would support the MapReduce model but make it easier to use.

The main contribution of this work is the distributed model for OpenRe-fine’s functionality and the implementation of this model. Performance evaluationof the implementation showed that it outperforms the original OpenRefine engineeven for the tasks for which OpenRefine was considered to have a decent per-

Page 95: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

95

formance. In particular, on a 1 million line dataset OpenRefine can perform acolumn reordering operation in 4 seconds, which in our opinion is fast enough.However, our implementation doubled the performance of this task, and is able tocomplete the same operation in 2 seconds by distributing it over 5 machines. Ourmodel is able to achieve scalability by almost eliminating the use of the networkduring transformation stages and the ability to relay the work on larger amountof processing nodes. For more computationally heavy operations we are able toachieve much higher values of the speedup, compared to the original OpenRefineimplementation. In addition to the speedup, our implementation exhibits scalablebehaviour, where larger input datasets can be accommodated by adding moreresources into the system. During the scalability evaluation we noticed an increas-ing behaviour of the scheduling delay when the number of utilized nodes increases.This is the expected behaviour in the MapReduce model, as more machines have tobe communicated by a single master node. However, we consider scheduling delaynegligible price to pay in return to the ability to process increasingly large datasets.

Future work. This work provides the implementation that supports the em-barrassingly parallel transforms of the OpenRefine core functionality, whereas thesupport for the non-embarrassingly parallel ones is still to be implemented. An-other important aspect for the future work is the interaction between the Open-Refine and our distributed platform for it. We witness two possible views on thisinteraction:

1. completely rewrite the OpenRefine engine to embed our distributed modeland use it as a core of the new data processing engine;

2. continue using our distributed model as a platform for the unmodified Open-Refine engine, but integrate OpenRefine into a single JVM on each workernode, to eliminate unnecessary HTTP communication overhead.

In our opinion the first approach provides great opportunities to bring OpenRefineto the next level, the goal of which is the ultimate exploratory data cleansingexperience on very large datasets. However, this approach has to be discussed andaccepted by the OpenRefine developer community and requires substantial effort,which will probably cause OpenRefine to be completely rewritten.

The second approach can be seen as the development of our data processingmodel as standalone project, serving as a distributed platform for the OpenRefineengine. However, there is always room for higher abstractions and our model canbe generalized to support not only OpenRefine, but also other data processingtools.

Page 96: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

96 CHAPTER 5. CONCLUSIONS AND FUTURE WORK

Page 97: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Appendix A

Baseline

In this chapter, we present the numerical results obtained during the time-dimensionscalability tests of the original OpenRefine engine in Section 3.1.2, which we alsoadopt as the baseline for this project. The results are reported in 13 tables: onefor each of the transforms described in Table 3.1).

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2215.80 757.80 0 1806.80 4816.60 478.40 9.93

1.25 2550.40 1304.00 364 1759.80 5650.40 569.40 10.081.5 3274.40 1018.80 0 2259.00 6586.80 728.40 11.061.75 3615.40 1204.00 0 2728.40 7582.20 901.40 11.89

2 3925.20 1734.00 379 2833.40 8527.40 997.40 11.702.25 4288.80 1645.60 0 3321.00 9559.00 1230.00 12.872.5 4988.00 1665.60 0 3766.20 10455.20 1334.00 12.762.75 5395.40 2172.00 388 3931.80 11666.00 1556.00 13.34

3 5686.80 2410.20 469 4313.00 12443.40 1646.00 13.233.5 6783.60 2628.80 398 5180.80 14629.80 2128.00 14.554 7814.40 2813.20 306 6158.20 16833.00 2698.60 16.03

Table A.1: Baseline values: Blank down cells

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2234.20 2518.40 292 1286.80 6075.40 499.60 8.22

1.25 2554.40 3126.00 390 1707.00 7422.60 726.40 9.791.5 3291.60 3439.60 226 1903.40 8672.00 842.60 9.721.75 3593.60 4016.40 326 2348.80 9994.60 1077.60 10.782.00 3960.60 4754.20 558 2636.80 11387.00 1306.20 11.472.25 4296.80 5127.20 245 3043.60 12770.40 1436.40 11.252.5 4978.00 5678.40 494 3275.80 14043.80 1661.60 11.832.75 5451.40 6495.40 595 3657.20 15638.80 1889.40 12.08

3 5701.20 7014.40 852 4113.80 16865.00 2275.60 13.493.5 6758.20 8555.40 1118 4407.40 19782.60 2603.60 13.164 7844.20 9349.80 943 5142.20 22631.20 2887.40 12.76

Table A.2: Blank out cells

97

Page 98: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

98 APPENDIX A. BASELINE

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC GC [ms] overhead %1 2228.00 2190.20 320 1654.00 6107.60 527.80 8.64

1.25 2562.00 2661.20 427 2172.40 7432.20 768.20 10.341.5 3264.40 2969.00 286 2479.40 8748.40 896.80 10.251.75 3586.60 3400.80 387 3023.80 10046.80 1183.00 11.772.00 3967.40 3946.40 490 3402.80 11352.60 1288.20 11.352.25 4293.00 4533.40 333 3880.60 12834.00 1621.80 12.642.5 4984.80 4801.20 461 4345.40 14168.80 1836.00 12.962.75 5386.60 5620.80 800 4995.00 16038.20 2309.60 14.40

3 5692.40 6007.60 837 5805.80 17560.40 2717.20 15.473.5 6787.00 7304.80 1191 6354.80 21544.00 3287.00 15.264 7809.80 8468.60 1521 6662.40 23290.40 3531.60 15.16

Table A.3: Column addition

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2207.20 32798.40 1186 140.20 35188.80 1405.60 3.99

1.25 2557.00 48342.80 1438 164.20 51109.40 1668.80 3.271.5 3267.20 67062.20 1376 193.20 70575.40 2108.60 2.991.75 3608.20 89332.00 1826 218.40 93209.80 2584.20 2.772.00 4191.40 114923.80 2329 248.00 119417.20 3196.60 2.682.25 4272.80 143872.60 2943 276.60 148477.80 4149.60 2.792.5 5012.20 174919.20 3113 300.40 180289.80 4479.60 2.482.75 5323.20 210124.60 3908 329.00 215837.20 5359.80 2.48

3 5725.00 249059.80 4851 356.20 255201.20 7021.00 2.753.5 6763.60 338080.60 7159 422.40 345335.80 10737.20 3.114 7890.80 435990.00 9413 473.60 444574.60 12430.40 2.80

Table A.4: Columnize by Key-Value

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2191.40 135.20 0 1599.80 3960.80 204.40 5.16

1.25 2537.60 172.60 0 2193.60 4938.40 560.00 11.341.5 3274.00 167.20 0 2164.80 5638.60 605.60 10.741.75 3610.60 201.40 0 2790.00 6638.00 871.40 13.132.00 3903.20 233.80 0 3234.60 7405.20 965.60 13.042.25 4382.40 426.40 0 3187.20 8206.80 1048.80 12.782.5 5049.60 257.40 0 3757.40 9101.40 1285.60 14.132.75 5484.00 295.40 0 4263.40 10077.60 1380.40 13.70

3 5670.60 328.00 0 4764.20 10796.80 1594.40 14.773.5 6745.80 356.60 0 5234.80 12381.20 1822.20 14.724 7793.80 398.80 0 6016.20 14635.40 2246.80 15.35

Table A.5: Column move

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2209.20 1834.40 288 1229.80 5307.60 496.20 9.35

1.25 2560.60 2217.80 392 1488.80 6303.40 597.20 9.471.5 3257.80 2169.00 0 2010.20 7470.60 835.80 11.191.75 3599.60 2796.00 321 2145.40 8577.80 990.80 11.552.00 3966.00 3333.80 423 2519.00 9852.40 1171.00 11.892.25 4311.20 3665.20 242 2768.60 10867.80 1397.60 12.862.5 4971.60 3834.20 354 3149.80 11992.60 1516.60 12.652.75 5346.80 4389.20 583 3355.60 13129.60 1748.60 13.32

3 5678.80 4852.60 686 3722.00 14288.80 1976.60 13.833.5 6788.40 5590.60 753 4501.00 16969.60 2570.60 15.154 7844.60 6501.20 900 4757.60 20131.40 2840.80 14.11

Table A.6: Column remove

Page 99: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

99

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2210.00 139.80 0 1645.40 4030.20 205.80 5.11

1.25 2563.60 182.20 0 2239.60 5019.40 555.80 11.071.5 3251.20 166.20 0 2218.20 5669.00 607.20 10.711.75 3575.60 205.20 0 2820.00 6634.20 872.60 13.152.00 3957.00 240.80 0 3277.60 7509.40 973.00 12.962.25 4418.60 507.60 0 3275.40 8324.20 1049.00 12.602.5 5052.00 266.20 0 3830.20 9182.80 1280.60 13.952.75 5356.20 302.80 0 4309.40 10003.00 1378.60 13.78

3 5760.60 336.60 0 4799.60 10931.60 1601.80 14.653.5 6715.20 361.60 0 5688.60 12799.60 1829.20 14.294 7823.20 384.20 0 6304.00 15150.80 2254.20 14.88

Table A.7: Column reorder

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2227.80 25144.20 1295 1627.20 29034.80 1531.40 5.27

1.25 2554.40 31324.60 1480 1931.00 35847.60 1721.40 4.801.5 3283.20 36754.20 1261 2259.40 42332.60 1894.80 4.481.75 3629.40 43960.60 1563 2692.00 50319.80 2189.20 4.352.00 3941.00 48955.60 1704 3077.60 56010.20 2336.00 4.172.25 4339.40 55022.00 2325 3412.80 62813.20 2976.80 4.742.5 4970.40 60668.80 2223 3705.80 69383.00 3375.00 4.862.75 5371.20 67357.20 2569 4089.40 76856.00 3752.40 4.88

3 5675.00 74053.40 3041 4657.80 84425.80 4196.40 4.973.5 6761.60 87716.40 3355 5423.60 100007.20 4915.20 4.914 7827.20 99442.20 3806 6040.60 113348.40 5795.60 5.11

Table A.8: Column split

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2217.40 27672.20 1541 1844.40 31775.40 1790.00 5.63

1.25 2546.40 33737.80 1721 2302.40 38627.40 2000.60 5.181.5 3293.00 40366.00 1482 2738.60 46435.40 2139.60 4.611.75 3615.60 47806.80 1853 3187.80 54650.00 2493.20 4.562.00 3969.80 53279.20 2083 3619.40 60911.60 2783.20 4.572.25 4284.40 60171.40 1858 4085.20 68671.80 2970.80 4.332.5 5073.00 66628.20 2638 4530.40 76271.00 3781.60 4.962.75 5385.20 73619.00 2885 5191.00 84238.00 4093.40 4.86

3 5695.60 80204.20 3196 5499.60 91443.20 4571.60 5.003.5 6720.40 93168.40 3664 6445.20 106559.60 5329.20 5.004 7796.80 106784.00 4325 7972.40 122596.60 6545.40 5.34

Table A.9: Composite transform

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2209.20 944.60 0 1770.60 4958.80 478.00 9.64

1.25 2562.00 1538.40 367 1791.80 5926.40 573.20 9.671.5 3253.20 1248.40 0 2338.60 6877.60 795.40 11.571.75 3585.80 1485.60 0 2752.40 7858.00 892.60 11.362.00 3921.00 2061.00 376 3024.80 9042.40 1140.60 12.612.25 4365.20 1977.80 0 3324.40 9881.60 1221.40 12.362.5 4987.20 2043.80 0 3847.40 11037.60 1388.60 12.582.75 5330.80 2613.20 385 3993.60 11998.20 1579.20 13.16

3 5715.60 2781.60 436 4456.60 12988.20 1640.00 12.633.5 6825.00 3175.40 388 5211.60 15247.60 2229.40 14.624 7875.20 3389.80 293 6484.60 17784.80 2804.40 15.77

Table A.10: Mass edit cells

Page 100: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

100 APPENDIX A. BASELINE

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2212.20 3418.40 328 1588.40 7253.60 670.60 9.25

1.25 2569.60 4405.20 572 1794.40 8804.80 778.20 8.841.5 3264.20 4951.20 472 2126.80 10378.40 1088.20 10.491.75 3610.80 5843.20 584 2669.60 12160.20 1376.00 11.322.00 3948.80 6842.40 864 3018.60 13899.60 1652.40 11.892.25 4274.20 7422.80 583 3603.60 15606.60 2104.40 13.482.5 4988.40 8579.20 1171 3856.60 17461.20 2496.20 14.302.75 5356.40 9727.60 1545 3924.00 19043.80 2632.80 13.82

3 5693.80 10872.40 1930 4368.40 21138.00 3053.60 14.453.5 6766.00 12062.40 1798 5194.80 24822.60 3346.20 13.484 7847.60 14235.20 1683 5766.20 27885.00 3630.40 13.02

Table A.11: Text transform

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2231.00 16595.80 1356 2067.60 20929.00 1576.80 7.53

1.25 2556.00 20527.80 1600 2557.80 25678.80 1839.20 7.161.5 3282.60 23751.00 1354 3026.60 30096.80 1999.20 6.641.75 3612.40 27924.00 1639 3531.80 35227.00 2273.60 6.452.00 3936.80 31886.00 1876 3997.00 40107.00 2506.60 6.252.25 4295.80 36036.40 1957 4557.40 45036.80 2610.00 5.802.5 5005.60 39433.20 1777 4994.00 49471.20 2840.80 5.742.75 5347.20 43832.00 2208 5568.00 54784.80 3404.00 6.21

3 5622.00 47121.00 2322 6020.40 58804.60 3443.60 5.863.5 6730.40 55229.20 2201 6991.80 70118.60 3727.60 5.324 7796.20 62563.60 2124 8300.80 78700.80 4169.20 5.30

Table A.12: Transpose cells across columns into rows

Input file [m lines] Load data [ms] Apply operation [ms] Apply GC [ms] Export [ms] Total [ms] GC [ms] GC overhead %1 2212.80 20930.80 999 4786.40 27968.40 1216.60 4.35

1.25 2548.40 26331.00 1151 6043.60 34961.20 1366.40 3.911.5 3275.20 31066.80 913 7381.20 41763.40 1539.60 3.691.75 3594.60 36166.40 1126 8362.60 48163.00 1762.40 3.662.00 3961.80 41691.20 1268 9294.40 54986.20 1910.60 3.472.25 4268.40 46620.60 956 10686.80 61795.20 2023.20 3.272.5 5005.20 51863.60 1130 11652.00 68950.20 2193.20 3.182.75 5365.20 56905.20 1397 12775.20 75107.00 2498.80 3.33

3 5691.60 62241.60 1459 13831.00 81895.20 2554.00 3.123.5 6730.40 72480.20 1422 16334.60 95587.60 2926.80 3.064 7802.40 81820.20 1305 19156.40 108958.60 3307.80 3.04

Table A.13: Transpose cells in rows into columns

Page 101: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Appendix B

Batchrefine and OpenRefineperformance comparison

As mentioned in Section 2.5.2, OpenRefine’s architecture is the one of a web appli-cation, which is implemented as a servlet and coupled with a web-server. However,for the implementation of a distributed platform for OpenRefine in Section 3.2.2,it is required to provide a version of the engine that can be integrated and ac-cessed using a single-function performTransform() interface. On this purpose webuilt a special version of the engine, namely BatchRefine, which uses OpenRefineas a library by calling underlying methods without instantiating the web-serverand exposes the required interface. We expect that due to the absence of theweb-server, BatchRefine has lower resource requirements compared to the originalOpenRefine, and therefore results in better performance.

To prove our assumptions and quantify the expected performance gain weconduct a series of performance measurements on both BatchRefine and standardOpenRefine engine. As a workload for the experiment, we apply the transformsspecified in Table 3.1 on a 1 million lines input file. To also check the difference ofthe memory footprint between the engines, we process each workload combinationusing various heap sizes (from 1 to 8 Gb). The obtained results are presented intwo ways:

1. comparison between the engines in terms of total time required to apply thetransform (load data, apply transform, export result) on various heap sizes(Figure B.1).

2. GC overhead that occurs on different heap sizes for both of the engines(Figure B.2).

Comparison results. From the obtained results we conclude that our initialassumption of reduced Batchrefine’s memory requirements didn’t hold. In partic-

101

Page 102: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

102APPENDIX B. BATCHREFINE ANDOPENREFINE PERFORMANCE COMPARISON

Figure B.1: Batchrefine and OpenRefine comparison: total time

Figure B.2: Batchrefine and OpenRefine comparison: GC overhead

Page 103: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

103

ular from Figure B.1 we observe that the minimum required heap size and theheap size at which the time to perform a transform stabilizes to a constant valueare the same for both of the implementations. We observe almost identical GCoverhead behaviour for BatchRefine and original OpenRefine implementations onFigure B.2, which means that both engine implementations have the same mem-ory footprint on this particular workload. However, for some of the transformswe observe that BathcRefine takes less time to process the data, showing con-stant difference with OpenRefine for all the heap sizes. This is explained by thefact that OpenRefine has to write the “changes” introduced to data on local disk,while BatchRefine omits this step. Therefore we can observe a constant differ-ence in processing time that depends on the amount of changes introduced to databy a transform, which in turn is the amount of data to be written on the local disk.

Summarizing the obtained results we can state that web-server and servletcontainer doesn’t have a noticeable impact on the OpenRefine’s performance andomitting them doesn’t reduce engine’s resource requirements. Nevertheless, thisapproach allows us to discover an important observation about the I/O overheadintroduced by writing “changes” information to disk. For some usage scenarios, itis important to provide the ability to switch off the history recording feature of theengine, in particular when it is used as a part of larger data processing pipeline.

Page 104: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

104APPENDIX B. BATCHREFINE ANDOPENREFINE PERFORMANCE COMPARISON

Page 105: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Appendix C

Numerical data of the evaluation

This chapter presents numerical data obtained from performance evaluation of ourdistributed implementation of the OpenRefine engine. It is reported in 9 tables:one for each of the non-embarrassingly parallel operations.

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup1 29.03 9.87 8.19 1.67 5 2.95

1.5 42.33 10.23 7.81 2.42 8 4.142 56.01 10.34 8.50 1.84 10 5.42

2.5 69.38 10.56 8.10 2.47 13 6.573 84.43 10.51 8.45 2.07 15 8.03

3.5 100.01 11.50 8.14 3.36 18 8.704 113.35 11.11 8.51 2.60 20 10.20

4.5 - 11.62 8.73 2.89 22 -5 - 10.93 8.47 2.46 25 -6 - 12.23 8.47 3.76 30 -7 - 13.22 8.49 4.73 35 -8 - 12.96 8.45 4.52 40 -9 - 13.85 8.45 5.40 45 -10 - 15.41 8.85 6.55 50 -

Table C.1: Performance evaluation: Column split transform

105

Page 106: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

106 APPENDIX C. NUMERICAL DATA OF THE EVALUATION

Input size [m lines] Centralized Distributed Avg. per Node [s] Scheduling Delay [s] N. workers Speedup1 4.03 2.07 1.35 0.72 5 1.95

1.5 5.67 2.20 1.34 0.85 8 2.582 7.51 2.34 1.38 0.97 10 3.20

2.5 9.18 2.45 1.25 1.20 13 3.743 10.93 2.51 1.32 1.19 15 4.36

3.5 12.80 2.42 1.30 1.12 18 5.294 15.15 2.50 1.33 1.17 20 6.05

4.5 - 2.76 1.38 1.38 22 -5 - 2.66 1.33 1.33 25 -6 - 3.06 1.36 1.70 30 -7 - 3.47 1.36 2.11 35 -8 - 3.42 1.37 2.05 40 -9 - 5.07 1.35 3.72 45 -10 - 5.99 1.35 4.64 50 -

Table C.2: Performance evaluation: Column reorder transform

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup1 3.96 2.03 1.29 0.74 5 1.95

1.5 5.64 2.17 1.24 0.93 8 2.602 7.41 2.55 1.36 1.19 10 2.90

2.5 9.10 2.44 1.28 1.16 13 3.733 10.80 2.35 1.30 1.06 15 4.59

3.5 12.38 2.56 1.25 1.31 18 4.834 14.64 2.51 1.33 1.19 20 5.82

4.5 - 2.52 1.36 1.17 22 -5 - 2.65 1.32 1.33 25 -6 - 2.89 1.32 1.57 30 -7 - 3.64 1.35 2.29 35 -8 - 5.11 1.34 3.77 40 -9 - 5.53 1.34 4.19 45 -10 - 6.14 1.37 4.77 50 -

Table C.3: Performance evaluation: Column move transform

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup100 20.93 7.33 5.84 0.57 5 2.86150 30.10 7.66 5.72 0.56 8 3.93200 40.11 8.02 6.17 0.57 10 5.00250 49.47 8.21 5.79 0.75 13 6.03300 58.80 8.16 6.06 0.58 15 7.21350 70.12 9.15 5.95 1.08 18 7.66400 78.70 8.71 6.20 1.03 20 9.04450 - 9.45 6.27 1.35 22 -500 - 8.78 6.14 1.08 25 -600 - 9.11 6.06 1.01 30 -700 - 10.51 6.10 2.16 35 -800 - 10.68 6.09 2.25 40 -900 - 12.41 6.13 3.58 45 -1000 - 12.72 6.24 4.83 50 -

Table C.4: Performance evaluation: Transpose cells across columns into rows

Page 107: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

107

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup100 5.31 2.55 1.77 0.44 5 2.08150 7.47 2.57 1.60 0.46 8 2.91200 9.85 2.50 1.67 0.46 10 3.94250 11.99 2.77 1.67 0.44 13 4.33300 14.29 2.64 1.69 0.48 15 5.41350 16.97 2.85 1.64 0.55 18 5.95400 20.13 2.82 1.69 0.54 20 7.14450 - 2.77 1.69 0.61 22 -500 - 3.17 1.67 0.88 25 -600 - 3.44 1.67 1.15 30 -700 - 3.90 1.67 1.45 35 -800 - 4.16 1.67 1.78 40 -900 - 6.20 1.67 3.62 45 -1000 - 5.42 1.82 2.03 50 -

Table C.5: Performance evaluation: Column remove

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup100 4.96 2.31 1.54 0.47 5 2.15150 6.88 2.41 1.38 0.61 8 2.85200 9.04 2.55 1.54 0.70 10 3.55250 11.04 3.40 1.49 1.26 13 3.25300 12.99 2.93 1.59 0.59 15 4.43350 15.25 3.10 1.54 0.69 18 4.92400 17.78 3.27 1.58 0.78 20 5.44450 - 3.35 1.62 1.04 22 -500 - 3.05 1.56 0.79 25 -600 - 3.69 1.57 1.29 30 -700 - 4.28 1.55 1.97 35 -800 - 4.20 1.56 1.81 40 -900 - 5.46 1.52 3.19 45 -1000 - 6.47 1.57 3.82 50 -

Table C.6: Mass edit cells

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup100 6.11 3.09 2.08 0.54 5 1.98150 8.75 2.92 1.82 0.58 8 3.00200 11.35 3.76 2.14 0.58 10 3.02250 14.17 3.30 1.89 0.66 13 4.29300 17.56 3.37 1.95 0.70 15 5.21350 21.54 3.57 1.94 0.90 18 6.03400 23.29 3.57 1.98 0.78 20 6.52450 - 3.75 2.05 0.59 22 -500 - 4.03 2.00 0.74 25 -600 - 4.28 1.96 1.45 30 -700 - 4.49 1.99 1.24 35 -800 - 6.20 1.99 2.91 40 -900 - 7.19 1.99 3.64 45 -1000 - 7.44 2.03 4.41 50 -

Table C.7: Performance evaluation: Column addition

Page 108: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

108 APPENDIX C. NUMERICAL DATA OF THE EVALUATION

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup100 31.78 11.22 9.20 0.52 5 2.83150 46.44 11.22 8.65 0.53 8 4.14200 60.91 11.28 9.28 0.55 10 5.40250 76.27 11.66 8.73 0.64 13 6.54300 91.44 11.60 9.08 0.79 15 7.88350 106.56 12.36 9.02 1.36 18 8.62400 122.60 11.80 9.45 0.81 20 10.39450 - 12.00 9.42 0.86 22 -500 - 12.10 9.32 1.10 25 -600 - 13.71 9.37 2.15 30 -700 - 14.48 9.36 1.65 35 -800 - 14.33 9.23 3.17 40 -900 - 16.89 9.32 4.17 45 -1000 - 15.01 9.54 3.63 50 -

Table C.8: Performance evaluation: Composite transform

Input size [m lines] Centralized [s] Distributed [s] Avg. per Node [s] Scheduling Delay [s] N. workers Speedup100 7.25 3.31 2.35 0.47 5 2.19150 10.38 3.10 2.14 0.49 8 3.35200 13.90 3.40 2.30 0.50 10 4.09250 17.46 4.03 2.18 1.07 13 4.33300 21.14 3.57 2.26 0.60 15 5.92350 24.82 3.52 2.16 0.59 18 7.05400 27.89 4.09 2.32 0.82 20 6.82450 - 3.89 2.35 0.81 22 -500 - 3.74 2.27 0.64 25 -600 - 4.74 2.30 1.51 30 -700 - 4.95 2.25 1.81 35 -800 - 5.59 2.26 2.49 40 -900 - 6.89 2.25 3.71 45 -1000 - 7.95 2.37 4.58 50 -

Table C.9: Performance evaluation: Text transform

Page 109: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

Bibliography

[1] Big data is changing every industry, even yours!http://smartdatacollective.com/bernardmarr/207086/

big-data-changing-every-industry-even-yours, 2014.

[2] James Manyika, Michael Chui, Brad Brown, Jacques Bughin, Richard Dobbs,Charles Roxburgh, and Angela Hung Byers. Big data: The next frontier forinnovation, competition, and productivity. Technical report, McKinsey GlobalInstitute, 2011.

[3] Erhard Rahm and Hong Hai Do. Data cleaning: Problems and current ap-proaches. IEEE Data Engineering Bulletin, 23:2000, 2000.

[4] OpenRefine. http://openrefine.org.

[5] Spaziodati S.r.l. http://www.spaziodati.eu/.

[6] A. Kaushik. Web Analytics 2.0: The Art of Online Accountability and Scienceof Customer Centricity. Wiley, 2009.

[7] Dirk deRoos, Chris Eaton, George Lapis, Paul Zikopoulos, and Tom Deutsch.Understanding Big Data: Analytics for Enterprise Class Hadoop and Stream-ing Data. McGraw Hill Professional, 2011.

[8] Byung-Kwon Park and Il-Yeol Song. Toward total business intelligence in-corporating structured and unstructured data. In BEWEB ’11 Proceedings ofthe 2nd International Workshop on Business intelligence and the WEB, 2011.

[9] A special report on managing information. Data, data everywhere. TheEconomist, 2010.

[10] International Data Corporation. http://www.idc.com/.

[11] John Gantz and David Reinsel. The digital universe decade areyou ready? http://www.emc.com/collateral/demos/microsites/

idc-digital-universe/iview.htm, 2010.

109

Page 110: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

110 BIBLIOGRAPHY

[12] O’Reilly Radar Team. Big Data Now. O’Reilly, 2012.

[13] Facebook’s social advertising system and the rise of the fansumer.http://www.theguardian.com/technology/blog/2007/nov/07/

facebookssocialadvertisings, 2007.

[14] Jeremy Ginsberg, Matthew H. Mohebbi, Rajan S. Patel, Lynnette Brammer,Mark S. Smolinski, and Larry Brilliant. Detecting influenza epidemics usingsearch engine query data. Nature, 2009.

[15] Bill Gerhardt, Kate Griffin, and Roland Klemann. Unlocking value in thefragmented world of big data analytics. Cisco Internet Business SolutionsGroup (IBSG).

[16] Christian Bizer, Peter Boncz Michael L. Brodie, and Orri Erling. The mean-ingful use of big data: Four perspectives four challenges. ACM SIGMOD,2011.

[17] S.W. Keckler, K. Olukotun, and H.P. Hofstee. Multicore Processors and Sys-tems. Integrated Circuits and Systems. Springer, 2009.

[18] Thomas Rauber and Gudula Rnger. Parallel Programming For Multicore andCluster Systems. Spinger, 2010.

[19] Ajay D. Kshemkalyani and Mukesh Singhal. Distributed Computing: Princi-ples, Algorithms, and Systems. Cambridge University Press, New York, NY,USA, 1 edition, 2008.

[20] J.L. Gustafson. Fixed time, tiered memory, and superlinear speedup. InDistributed Memory Computing Conference, 1990., Proceedings of the Fifth,volume 2, pages 1255–1260, Apr 1990.

[21] Ciprian Dobre and Fatos Xhafa. Parallel programming paradigms and frame-works in big data era, spinger. International Journal of Parallel Programming,2014.

[22] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: simplified data processingon large clusters. Communications of the ACM - 50th anniversary issue: 1958- 2008, 2008.

[23] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The google filesystem. SIGOPS Oper. Syst. Rev., 37(5):29–43, October 2003.

Page 111: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

BIBLIOGRAPHY 111

[24] Zhenhua Guo, G. Fox, and Mo Zhou. Investigation of data locality inmapreduce. In Cluster, Cloud and Grid Computing (CCGrid), 2012 12thIEEE/ACM International Symposium on, pages 419–426, 2012.

[25] Apache Hadoop. http://hadoop.apache.org/.

[26] Tom White. Hadoop: The Definitive Guide. O’Reilly Media, Inc., 1st edition,2009.

[27] Apache Spark. http://spark.apache.org/.

[28] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma,Murphy McCauley, Michael J. Franklin, Scott Shenker, and Ion Stoica. Re-silient distributed datasets: A fault-tolerant abstraction for in-memory clustercomputing. In Proceedings of the 9th USENIX Conference on Networked Sys-tems Design and Implementation, NSDI’12, pages 2–2, Berkeley, CA, USA,2012. USENIX Association.

[29] Rajendra Bose and James Frew. Lineage Retrieval for Scientific Data Pro-cessing: A Survey. ACM Comput. Surv., 37(1):1–28, March 2005.

[30] Apache Spark RDD operations. http://spark.apache.org/docs/latest/

programming-guide.html#rdd-operations.

[31] Pentaho Kettle. http://community.pentaho.com/projects/

data-integration/.

[32] OpenRefine history. http://openrefine.org/2013/10/12/

openrefine-history.html.

[33] Jython: Python for the Java platform. http://www.jython.org/.

[34] OpenRefine Reconciliation Service API. https://github.com/OpenRefine/OpenRefine/wiki/Reconciliation-Service-API.

[35] Butterfly modular web application framework. https://code.google.com/

p/simile-butterfly/.

[36] Openrefine RDF extension. http://refine.deri.ie/.

[37] Sun Microsystems. Memory Management in the Java HotSpot Virtual Ma-chine.

[38] Servizio commercio e cooperazione di Provincia di Trento. Osterie tipiche tren-tine. http://dati.trentino.it//dataset/osterie-tipiche-trentine,2013. Dataset in CSV format.

Page 112: UNIVERSITY OF TRENTO DEPARTMENT OF INFORMATION …d3s.disi.unitn.it/~mega/andrey_thesis.pdf · university of trento department of information engineering and computer science degree

112 BIBLIOGRAPHY

[39] Inc. Sun Microsystems. Java tuning white paper. http://www.oracle.com/technetwork/java/tuning-139912.html, 2005.

[40] Java microbenchmark review criteria. http://www.oracle.com/

technetwork/java/tuning-139912.html, 2005.

[41] Amazon Inc. Elastic Compute Cloud, November 2008.

[42] OpenRefine overcome RAM limitation (Google group discussion). https://

groups.google.com/forum/#!topic/openrefine-dev/2UsiJf-J4rU, 2005.

[43] Erich Gamma, Richard Helm, Ralph Johnson, and John Vlissides. DesignPatterns: Elements of Reusable Object-Oriented Software. Pearson Education,1994.

[44] D.L. Eager, J. Zahorjan, and E.D. Lazowska. Speedup versus efficiency in par-allel systems. Computers, IEEE Transactions on, 38(3):408–423, Mar 1989.

[45] RefineOnSpark github repository. http://github.com/andreybratus/

RefineOnSpark, 2014.