resource-aware decomposition of geoprocessing · pdf fileresource-aware decomposition of...
TRANSCRIPT
Resource-Aware Decomposition of
Geoprocessing Services Based on Declarative
Request Languages
by
Michael Owonibi
A thesis submitted in partial fulfillment
of the requirements for the degree of
Doctor of Philosophy
in Computer Science
Approved, Thesis Committee
Prof. Dr. Peter Baumann
Jacobs University, Bremen
Prof. Dr. Alexander Zipf
University of Heidelberg, Heidelberg
Dr. Angela Schäfer
Alfred-Wegener-Institut, Bremerhaven
28TH FEBRUARY, 2012
SCHOOL OF ENGINEERING AND SCIENCE
2
Acknowledgements
I want to express my heartfelt gratitude to all who have supported me during my PhD studies.
Most especially, I want to thank my supervisors, colleagues and family.
3
Abstract
Distributed, service-oriented systems are often used today for geospatial data access and
processing. However, it is difficult to find methods for easy, flexible, and automatic
composition and orchestration of workflow of geo-services. A promising approach at
addressing this problem is provided by the Open Geospatial Consortium (OGC) Web
Coverage Processing Service (WCPS). This service offers a multidimensional raster
processing query language with formal semantics and we believe that this language contains
sufficient information for an automatic orchestration. Therefore, this thesis focuses on
investigating the means to dynamically and efficiently distribute WCPS-based web service
request across several heterogeneous nodes as sub-requests, and managing the execution of,
and the aggregation of the results of those sub-requests. Task distribution is based, among
others, on the individual node capabilities and availability, network capabilities, and source
data locations. A key goal is to dynamically optimize a global service quality function while
also considering a global service cost function. This thesis, therefore, involves a highly
interdisciplinary approach which combines results from geo-processing, distributed
computing, workflow scheduling, service oriented computing, and distributed query
optimization and execution.
We propose D-WCPS (Distributed WCPS), a framework in which coverage processing query
can be dynamically distributed among several WCPS servers . Servers can join the network
by registering with any server in the network, and every server publishes its processing
capacities and locally available data to the WCPS registry which is mirrored across all
servers. Likewise, each of the servers in the framework can decompose a query to a
distributed query, and also, coordinate the execution of a distributed query using information
from its WCPS registry. Other contributions of this thesis include query optimization and
decomposition algorithms; inter-operator, intra-operator, and inter-tuple parallelism methods
for coverage processing; cost model and sever calibrations for distributed coverage
processing; P2P-based orchestration model; mirrored registry synchronization techniques etc.
D-WCPS has been implemented and tested in clusters and clouds. Evaluation of our
scheduling and distributed execution model shows remarkable speedups in the execution of
different distributed WCPS queries.
Several servers can, therefore, efficiently share data and computation with respect to
dynamic, resource-aware coverages processing.
4
5
Acknowledgements ............................................................................................................................... 2
Abstract .................................................................................................................................................. 3
INTRODUCTION ................................................................................................................................. 8
1.1 MOTIVATION ....................................................................................................................... 10
1.1.1 Interoperability and Sharing ............................................................................................. 10
1.1.2 Efficiency ............................................................................................................................. 11
1.1.3 Dynamic service decomposition for distributed computation ......................................... 11
1.2 APPLICATION (USE CASE) SCENARIOS ....................................................................... 11
1.2.1 Simple Coverage Integration (Integration and Overlay) ................................................ 12
1.2.2 Statistics ............................................................................................................................... 12
1.2.3 Site Suitability Studies ........................................................................................................ 12
1.3 WEB COVERAGE PROCESSING SERVICE (WCPS) .................................................... 13
1.4 CONTRIBUTIONS ................................................................................................................. 15
1.5 THESIS OUTLINE ................................................................................................................. 17
LITERATURE REVIEW .................................................................................................................. 18
2.1 WEB SERVICE COMPOSITION ........................................................................................ 18
2.1.1 Automatic Service Composition ........................................................................................ 18
2.1.2 Web Service Orchestration ................................................................................................ 20
2.1.3 Geoprocessing Services Composition ................................................................................ 20
2.2 SCHEDULING ........................................................................................................................ 21
2.3 QUERY PROCESSING OVER DISTRIBUTED DBMS .................................................... 23
2.4 GRID-BASED DISTRIBUTED QUERY PROCESSING ................................................... 25
2.5 RASDAMAN AND MULTIDIMENSIONAL ARRAY QUERY PROCESSING ............ 27
2.6 SUMMARY: DQP VS. DISTRIBUTED WCPS PROCESSING ....................................... 27
DISTRIBUTED WCPS FRAMEWORK .......................................................................................... 29
3.1 RELATED WORKS ............................................................................................................... 29
3.2 OVERVIEW OF D-WCPS FRAMEWORK ........................................................................ 32
3 ...................................................................................................................................................... 32
3.2.1 Overview of D-WCPS Query Processing .......................................................................... 32
3.2.2 Inter-Tuple Parallelism ...................................................................................................... 33
3.2.3 Query Re-writing ................................................................................................................ 34
3.2.3.1 Single Node Optimizations ................................................................................................. 34
3.2.3.2 Multi-Node Optimizations - Left-Deep Tree Transformation to Bushy Tree ............... 37
3.2.4 Distributed Query Modelling and Its Orchestration ....................................................... 37
6
3.3 THE FRAMEWORK’S REGISTRY .................................................................................... 39
3.4 CALIBRATION AND PROFILING ..................................................................................... 41
3.5 SERVICE ADAPTIVITY ....................................................................................................... 43
3.6 SUMMARY ............................................................................................................................. 43
COST MODEL FOR DISTRIBUTED QUERY PROCESSING ................................................... 45
4.1 LITERATURE REVIEW....................................................................................................... 46
4.2 DISTRIBUTED WCPS PROCESSING MODEL ................................................................ 47
4.3 DISTRIBUTED WCPS COST MODEL ............................................................................... 48
4.3.1 Communication Cost .......................................................................................................... 50
4.3.2 Intermediate Data Storage Cost ........................................................................................ 50
4.3.3 Data Read Cost .................................................................................................................... 51
4.3.4 CPU Cost .............................................................................................................................. 52
4.4 EVALUATION ....................................................................................................................... 56
4.5 SUMMARY AND OUTLOOK .............................................................................................. 57
COST BASED QUERY TREE DECOMPOSITION ...................................................................... 59
5.1 RELATED WORKS ............................................................................................................... 60
5.1.1 List Scheduling Algorithms ................................................................................................ 60
5.1.2 Clustered Scheduling Algorithms ...................................................................................... 62
5.1.3 Duplication Based Algorithms ........................................................................................... 62
5.1.4 Arbitrary Processor Network (APN) Algorithms ............................................................ 63
5.1.5 Evolutionary Algorithms .................................................................................................... 63
5.1.6 Scheduling Algorithm For D-WCPS ................................................................................. 63
5.2 DECOMPOSITION ALGORITHMS ................................................................................... 65
5.2.1 Reducing the Number of Servers Used in Scheduling ..................................................... 67
5.2.2 Pre-Scheduling Unary Node Clustering ............................................................................ 68
5.2.3 Multi-Processing Capabilities of Servers .......................................................................... 70
5.2.4 HEFT Scheduling Criteria Modification .......................................................................... 72
5.2.5 Memory Constraints Consideration .................................................................................. 75
5.2.6 Intra-Operator Parallelism ................................................................................................ 77
5.2.7 Clustering and Cluster Scheduling .................................................................................... 81
5.3 TAKING A STEP BACK – THE BIG PICTURE ............................................................... 83
PERFORMANCE EVALUATION ................................................................................................... 84
6.1 IMPLEMENTATION AND TEST FRAMEWORK ........................................................... 84
6.2 EVALUATION OF SCHEDULING ALGORITHMS ........................................................ 86
7
6.2.1 Tree Properties .................................................................................................................... 87
6.2.1.1 Tree Width ........................................................................................................................... 88
6.2.1.2 Sequential Factor ................................................................................................................ 88
6.2.1.3 Cost Skew ............................................................................................................................. 90
6.2.1.4 Computation to Communication Ratio (CCR) ................................................................. 92
6.2.2 D-WCPS Framework Properties ....................................................................................... 93
6.2.2.1 Average Coverage Processing Speed To Network Speed Ratio (PS/NS-R) .................. 94
6.2.2.2 Servers Available ................................................................................................................ 94
6.2.2.3 Servers Heterogeneity ......................................................................................................... 95
6.2.2.4 Initial Data Distribution ..................................................................................................... 95
6.3 PERFORMANCE EVALUATION OF HEFT MODIFICATIONS .................................. 97
6.3.1 Node Clustering And Reduction of Number of Servers .................................................. 97
6.3.2 Multi-Processing Server Capabilities ................................................................................ 99
6.4 OPTIMIZATION AND EXECUTION MODEL GAIN ..................................................... 99
6.5 CONCLUSION ..................................................................................................................... 102
CONCLUSION AND OUTLOOK .................................................................................................. 103
7.1 CONCLUSION ..................................................................................................................... 103
7.1.1 The WCPS Registry .......................................................................................................... 103
7.1.2 Query Optimization .......................................................................................................... 104
7.1.3 Query Scheduling .............................................................................................................. 104
7.1.4 Adaptive Query Orchestration ........................................................................................ 104
7.1.5 Parallelism in WCPS ........................................................................................................ 105
7.2 BENEFITS OF D-WCPS ..................................................................................................... 105
7.3 WHEN D-WCPS INFRASTRUCTURE IS BENEFICIAL .............................................. 106
7.4 OUTLOOK ............................................................................................................................ 107
Appendix 1 – Coverage Operators Weights ................................................................................... 108
The table in this appendix displays the experimentally determined values weights of different
coverage processing operators ............................................................................................................ 108
REFERENCES .................................................................................................................................. 109
8
Chapter 1
INTRODUCTION
Geographical Information Systems (GIS) have witnessed progression from being a
mainframe system [125], through being desktop system, to becoming a server based system.
Today, distributed, service-oriented is often used for geospatial data access and processing.
This has been motivated by several reasons, some of which include:
- Availability of high-speed networks.
- Exponential increase in sizes of spatial data which is distributed across several data
centres. Hence, a server does not usually have all the data needed for some geo-
processing task.
- High CPU computation cost of geo-processing tasks such that distributed processing
pays off.
- Increase in the GIS application requirements. Such application requirements vary
from simple 2-D and 3-D map visualization and data download, to complex
computation such as statistical analysis, data mining, data (ocean, atmosphere, and
climate) modeling, and image classification.
- Server limitations with respect to the fact that some specialized operations may not be
available on the server, or some servers’ processing capacity is higher than the others.
Service-oriented systems provide the possibility of solving geo-related tasks by using
distributed and multi-owner resources. It has also spawned the creation of Spatial Data
Infrastructure (SDI) which is “a coordinated series of agreements on technology standards,
institutional arrangements, and policies that enable the discovery and use of geospatial
information by users and for purposes other than those it was created for” [119]. Examples of
such SDI includes INSPIRE [58], and NSDI [55]. The development of SDI promises a
platform for an efficient server-based access to and processing of geo-spatial data though the
use of web services. Usually, the interfaces of geospatial web services are standardized by the
Open Geospatial Consortium (OGC). Examples of these services include the OGC Web
9
Coverage Service (WCS) [85] for accessing coverage1
data which, according to [81],
encompasses any spatio-temporally extended phenomenon; the OGC Web Coverage Service
- Transactional (WCS-T) used for adding, updating and deleting coverage data on a server
[15] ; the OGC Web Processing Service (WPS) which defines a generic service that offers
any sort of geo-processing functionality to clients across a network [93]. In addition, by
supporting the orchestration of geo-processing workflow, service-oriented systems can
provide the platform for distributed computing. It is envisioned that such orchestrated process
will be embedded into some overall service which hides the complex data evaluation
workflow behind simple, easy-to-use service. Such an easy-to-use service will, in turn,
determine how to efficiently aggregate several other services to fulfill the client request.
In order to fully exploit the potentials of distributed computing using geo-processing service-
oriented systems, efficient tasks scheduling algorithms, as well as dynamic service
composition and orchestration techniques are fundamentally important. Usually, geo-service
orchestrations today are performed manually, such as in the case of cascading OGC Web
Map Service (WMS) [59] and WPS requests, and process-based compositions e.g. BPEL [1],
which effectively hardwires the processing workflow configuration. In the case of heavy-
weight services, such as defined by the OGC WPS, it turns out very difficult at least to find
methods for flexible, automatic orchestration. We claim that this is due to the lack of an
explicit, machine-understandable semantics of WPS. The Web Coverage Processing Service
(WCPS) Standard [86], however, accomplishes interoperability by defining a declarative
query language for server-side processing of multi-dimensional spatio-temporal data. This
language has a formal semantics definition; hence, it is machine readable and semantic web
ready. Due to the fact that the meaning and effects of service query requests expressed in a
language with formalized semantics, e.g. WCPS, is known both to the client and servers, [89]
opined that automatic service dispatching, chaining and optimizing is possible. Hence, a
WCPS server is able to automatically extract portions that are best resolved locally, distribute
other requested parts to other suitable servers, re-collect results, package them, and return
them without any human interference.
The use of query language approach (based on WCPS) for automatic geo-service composition
is motivated by SQL processing in distributed databases [29]. Typically, queries written in
SQL, a form of declarative language with a formalized semantics for expressing computation,
are automatically partitioned into several sub-queries which are scheduled on different
1 See 1.3 for more details about coverages
10
servers and executed. However, we focus on service based geospatial data processing using
declarative request languages rather than tabular data which are typically managed by
distributed databases.
Therefore, this thesis aims at efficiently answering queries on large, complex spatio-temporal
data sets distributed across a number of heterogeneous computing nodes. The aim is that
incoming query requests, expressed in WCPS, are automatically split into sub-requests.
Subsequently, these sub-requests are processed by suitable nodes in the network to
collectively produce the final result for the client. Task distribution is based, among others,
on the individual node capabilities and availability, network capabilities, and source data
locations. A key goal is to dynamically optimize a global service quality function while also
considering a global service cost function.
1.1 MOTIVATION
The internet and distributed computing has made access to distributed data and services
relatively easy. However, the ever-increasing application requirements and developments in
technologies have introduced other issues that need to be resolved with respect to distributed
computing. In this thesis, we investigated distributed service oriented computing in the geo-
computing domain, with an emphasis on coverage processing. The term “coverage”, in the
general definition of OGC [81] and ISO [82], encompasses any spatio-temporally extended
phenomenon. Similarly, several types of coverages’ internal structure (geometrical
arrangement for mapping a location to a value) exist, some of which include hexagonal,
triangular, rectangular etc structures. However, as currently overarching query languages in
this generality (with respect to the internal structure of coverages) are not sufficiently
understood, WCPS focuses on raster data. Simply put, raster coverage in WCPS consists of a
gridded multi-dimensional array of some dimensionality, some extent (its spatio-temporal
domain) where each grid cell (pixel, voxel) contains some values which represents some
information. Examples of such data include 1-D sensor time series; 2-D satellite imagery,
thematic maps, digital elevation models; 3-D x/y/t image time series, 3-D x/y/z exploration
data, and 4-D atmospheric simulation data and ocean model data.
Based on this, some of the motivations for this thesis are outlined below.
1.1.1 Interoperability and Sharing
The well-defined semantics of WCPS query has the potential of enabling machine-to-
machine communication. Hence, one of our primary aims is to create a system where several
11
servers at different locations can collaboratively execute geo-processing tasks specified as
WCPS query transparently without human interference. This would enable:
1) Data Sharing: Users and processes get and use coverages irrespective of the location
in the system.
2) Load Sharing: Tasks can be more quickly and reliably done by using as many servers
as possible within the system irrespective of the process and initial coverages’
location.
3) Applications Sharing: Similar and different coverage processing applications are
shared by all the servers in the system.
1.1.2 Efficiency
Sharing computational resources is not usually enough, but sharing it efficiently such that a
particular objective is satisfied. Such objectives include minimizing response time, total time,
network bandwidth usage or CPU usage for a service request, balancing the load on each
individual system, etc. In this thesis, however, the focus is more on minimizing response
time. Some of the issues addressed in this regards include query optimization, efficient
scheduling of query operators scheduling onto a heterogeneous sets of servers, and efficient
parallelization of query tree operators execution.
1.1.3 Dynamic service decomposition for distributed computation
Although dynamic service composition and orchestration is being researched extensively [14]
[17][18][44], several issues still exist, one of which includes the efficiency of many of the
models for specifying and executing distributed coverage processing. This thesis, therefore,
further explores the means of composing a query processing service dynamically by
decomposing a declarative query request received by the service. We also investigate the
means of orchestrating the composed service efficiently such that it is resilient to failure.
1.2 APPLICATION (USE CASE) SCENARIOS
Based on our motivation, we present some sample scenarios which highlights the importance
and use of distributed geo-processing. This focuses on coverage processing applications
which can be expressed as a WCPS query. These use cases are chosen to be as close to real
life applications as much as possible. Their representations (in the figures below) are over-
simplified form of true query tree representation, and the operations name are chosen for
simplicity and understanding as opposed to how they would be written in the query.
12
1.2.1 Simple Coverage Integration (Integration and Overlay)
In this scenario, SRTM Digital Elevation
Model (DEM) from say, NASA server, and
bathymetry data from, say GEBCO
(http://www.gebco.net/), are integrated and
overlaid with global political map that is
available on say, Server X (See Figure 1.1).
1.2.2 Statistics
Given the rainfall map of an area, say
location A, available on say, UK Meteorology
Office, and the NDVI map of the same area
available on say, Server X, this scenario
predict the NDVI of another area, say
location B, using the correlation between
rainfall map and NDVI at location A, and the
rainfall map at location B (which is also available from, say UK Meteorology office). A
sample query tree representation of this scenario is shown in Figure 1.2.
1.2.3 Site Suitability Studies
In this use case (as shown in Figure 1.3), the suitability of an area for siting a farmland was
assesed using a weighted index overlay analysis. For this proposed scenario, the data input
for the analysis include the
rainfall intensity map, soil maps,
land use maps, NVDI maps,
Digital Terrain Model (DEM),
and Road-buffer maps. It is also
assume that these coverages may
be available on several servers.
The formula for computing the
suitability of an area as given by
[41] is presented below
Predict
NDVI
Correlation Rainfall Map
UK Met
Rainfall Map
UK Met
NDVI
Server X
Figure 1.2: Statistical Computation.
NDVI
Classify
Apply
Weight
Rainfall
Classify
Apply
Weight
DEM
Classify
Apply
Weight
Soil
Classify
Apply
Weight
Land Use
Classify
Apply
Weight
Road Buffer
Classify
Apply
Weight
Overlay Analysis
Classify
Figure 1.3: Site Suitability Studies.
Overlay
Integration Country Map
Server X
Bathymetry Map
GEBCO
DEM
NASA
Figure 1.1: Simple Data integration.
13
∑
∑
where = weight score for a mapping unit (location), i.e a measure of how suitable the
location is for a particular activity, = Score for the kth parameter of jth class of the ith
input map, i.e the pixel value at a particular point in the ith input map = Weight of ith input
map, which depends on how much effect the input map has on the overall suitability of the
area.
These use cases demonstrates the practical applications of the distributed WCPS processing.
Several servers/institutions would, therefore, be able to share their data, applications, and
load dynamically and efficiently based on our proposal in this thesis.
1.3 WEB COVERAGE PROCESSING SERVICE (WCPS)
The WCPS is the language used in this thesis for the declarative request based service
decomposition; although the query decomposition and execution model (introduced in
subsequent chapters) is not to be restricted to it. The WCPS standardizes the interfaces of a
coverage processing service. By standardizing the request type, it specifies the syntax and
semantics of a query language (service request) which allows for server-side retrieval and
processing of multi-dimensional geospatial coverages representing sensor, image, or statistics
data [86]. Furthermore, WCPS queries are given as expressions composed from coverage-
centric primitives. These primitives are largely grouped as shown below:
- Geometric operations extract some subset of cells which together again form a
coverage. Trimming retrieves a sub-coverage whose dimensionality remains
unchanged. Slicing delivers a cut-out with reduced dimensionality. Extending
enlarges the dimensions of the coverage.
- Induced operations apply cell type operations to all cells in coverage simultaneously.
This includes arithmetic, trigonometric and logical operations, and type casting.
- Coverage summarization includes aggregation operations like count, average min,
max, some and all quantifiers.
- Coverage constructors and aggregation operations whose combination present a
generalized way of writing coverage processing expressions.
- Scaling and reprojection (warping an image into another coordinate reference system)
operations which constitutes non-atomic function.
14
- Data format encoding specifies how coverage-valued results are to be shipped back to
the client. The list of such encodings includes formats like TIFF, NetCDF, or SEG-Y.
As such, the WCPS language can be understood as a declarative notation for a coverage
processing workflow. Shaped in the style of XQuery and the tradition of SQL, WCPS defines
a declarative, set-oriented query language on such raster coverages. A WCPS query contains
an iterator definition, a processing expression, and optionally a filter expression. The overall
query structure is as follows:
for c1 in (C1,1, C1,2,…, C1,m),
cn in (C2,1, C2,2,…, C2,m)
…,
cn in (Cn,1, Cn,2,…, Cn,m)
where booleanExpr(c1, …, cn)
return processingExpr(c1, …, cn)
This can be seen as a nested loop where each ci is bound to the Ci,j coverages in turn. For
each variable binding, the “where” predicate booleanExpr() is evaluated first. Only if
the boolean expression evaluates to true will the processingExpr() will be evaluated on
the current variable assignment and its result element will be added to the result list. The
result element of a processing expression is either coverage, or scalar summary data, or
coverage metadata. We, therefore, introduce WCPS by way of example.
Assume a WCPS server offers 3-D satellite image time series stacks, S1 and S2, plus a 2-D
bitmask M with the same spatial extent as the time series cubes. Then, the following query
returns those cubes where, in time slice T, threshold value V is exceeded in the red band
somewhere within the mask area:
for s in ( S1, S2),
m in ( M )
where some( s.red[ t(T) ]> V and m>0)
return encode( s/max(x), “netcdf” )
The subsetting operation in square brackets specifies a cut along axis t. The expression
enclosed in the some() operator evaluates to a Boolean data cube. The aggregator conflates
this to a single Boolean value which is true iff there is at least one cube cell with a value of
true. Those result cubes which pass this filter are delivered to the client in the NetCDF
format.
15
The WCPS query processing model is based on an adapted rasdaman query processing model
[99] enhanced with geo-semantics. It consists of list processing tree derived from a modified
relational model, and several coverages processing trees. The list processing tree specifies the
assignment of coverages to coverage iterators. As shown in the Figure 1.4, the leafs of the list
processing tree are the coverage lists
The list processing tree contain three relational operations – the cross product (x) of the
different coverage lists , the selection (σ) of tuples from the resulting cross product
table based on the predicate defined in the “where” clause, and the application (α) where the
coverage processing expression is evaluated on the current iterators' coverages assignment.
On the other hand, coverage processing trees are based on coverage processing expressions,
which consists of booleanExpr() and processingExpr(). The tree is made up of coverage
processing operators OPx as show in the sample tree is shown in the Figure 1.5.
1.4 CONTRIBUTIONS
The overall aim of this thesis is investigating the means to dynamically and efficiently
distribute WCPS-based web service request (which is in fact, a declarative geospatial data
processing query), across several nodes as sub-requests, managing the execution of, and the
α
x
σ
𝐿 𝐿2 𝐿𝑛 .....
process
i- ngExpr()
Processing Tree
boolean
Expr()
Processing Tree
List
Processing
Coverages
Processing
Trees
Li = ( Ci,1, Ci,2, ...Ci,m )
Figure 1.4: WCPS Query Tree.
OPj OPk
OPi
Coverage Reader
image_a image_b image_c
Coverage Reader Coverage Reader
Figure 1.5: Coverages Processing Tree.
16
aggregation of the results of the sub-requests. Thus, this project centres around two research
questions namely:
1. Dynamic service composition: aggregation of services on the fly based on a clients
geo-raster data processing request.
2. Efficient service composition: aggregation of services based on an optimal workflow
execution plan (based on factors such as individual node capabilities and availability,
network capabilities, and source data locations etc) for processing of coverages.
To this end, an infrastructure for distributed execution of WCPS-based geoprocessing tasks is
proposed in this thesis. Similarly, algorithms for decomposing WCPS queries, cost model for
determining query execution costs, techniques for decentralized orchestration of services,
mechanisms for dynamic load balancing and error recovery, mirrored resource-aware registry
for storing and retrieving WCPS server information etc were also developed.
The main contributions of this project are:
- A framework for Distributed WCPS (D-WCPS) in which several servers can
collaboratively share computation, load and data. Servers can join and leave the
network with minimum disruption to the network.
- An infrastructure for mirrored resource-aware WCPS registry. The registry contains
information about servers’ capacity with respect to D-WCPS and changes to it are
synchronized on all servers.
- A D-WCPS cost model which can estimate the time that it takes a distributed query to
finish execution. This is used for scheduling query operators in the decomposition
algorithms.
- Server calibration methodology by which each server can estimate its capacities with
respect to distributed coverage processing.
- Query optimization algorithms for optimizing a query tree for distributed execution.
- Query decomposition algorithms by which query trees are partitioned into sub queries
which can be executed in parallel on several heterogeneous servers. This involves the
scheduling of a query tree (workflow) operators with the aim of minimizing the
distributed query execution duration.
- P2P-based orchestration model for execution of a D-WCPS query using several
WCPS services.
Thus, this thesis involves a highly interdisciplinary approach which combines results from
17
distributed computing, workflow scheduling, service oriented computing, distributed
databases, databases, query optimization, and grid / cloud computing.
1.5 THESIS OUTLINE
The outline of this project is therefore given: Chapter 2 presents the state of the art with
respect to distributed geo-processing, dynamic service-oriented computing, service registries,
distributed databases, and grid computing. In chapter 3, the framework for D-WCPS, WCPS
optimization algorithms, D-WCPS orchestration model, parallelism techniques in D-WCPS
and the resource aware registry were proposed. The distributed coverages processing cost
model and server calibration techniques were advanced in Chapter 4. WCPS query
scheduling algorithm is presented in Chapter 5. The performance of the D-WCPS system
was evaluated in Chapter 6, and in Chapter 7 summarizes the project, and presents an
outlook.
18
Chapter 2
LITERATURE REVIEW
The project focuses on the means to efficiently and dynamically distribute the execution of a
geoprocessing query on several geo-services. As such, it integrates different research areas of
computing science. These include web service composition, service oriented geoprocessing,
distributed query processing, array query processing, tasks scheduling, and grid computing.
Therefore, the state of the art is reviewed under these broad headings.
2.1 WEB SERVICE COMPOSITION
Building large complex scientific applications by recursively aggregating web services
together to get a new composite service has been an active area of research and different
approaches have been taken to accomplish this. According to [95], the challenges of service
composition are associated with the automated discovery, composition, enactment, and
monitoring of collections of web services to achieve a specified objective i.e. a requirement
which could not be fulfilled by a single service. Other challenges with respect to service
composition include service workflow optimization, dynamic service reconfiguration,
security, process modeling etc. With regards to geoprocessing, the general approaches used
for, and the issues connected with service composition are presented.
2.1.1 Automatic Service Composition
With respect to service composition, several models exist, most of which are based on either
workflow, Artificial Intelligence (AI) planning or a combination of both. Some of these
include BPEL [121], pi-Calculus [98], state chart [76], the Mealy model [110], the Roman
model [27], OWL-S [30], BPML [120], etc.
BPEL (Business Process Execution Language) is the de facto standard for specifying
interactions between services [121]. However, because of the hardwired configuration of
process workflow of BPEL, studies [79] show that BPEL cannot fully address concerns like
automatic service composition, dynamic process optimization, decentralized orchestration
etc. Hence, the use of semantic web which facilitates the automated discovery, composition
19
and execution of the web service [30]. In semantic web service composition, the semantic
annotation (what the service does) and functional annotation (how the service behaves) of the
component services are stored in a registry. And starting with a goal description, the registry
can be queried recursively for services defined through input, output, pre-conditions, and
effects as ontological concepts in order to determine the services to be invoked. Semantic
service composition, therefore, draws on the goal-oriented inferencing from AI planning and
theorem proving [102]. However, as shown by [109], it turns out difficult to establish not just
syntax (i.e., function signatures), but semantics (i.e., effects) of web services. Furthermore,
according to [109], OWL-S (the semantic markup language for web services) scores very low
in its handling of complex processes. Also, [23] stated that scalable algorithms to synthesize
the required control structures are not easily provided using OWL-S models. Furthermore,
given the overhead incurred in semantic web service composition (which includes the
reasoning and service instantiation costs) for each service request received, distributed query
processing using semantic web service composition techniques is not efficient.
However, this thesis focuses on service based computing by aggregating services selected
from a group of services offering similar functionality (coverage query processing) such that
the response time is minimized. The task to done is declaratively specified in a query which
is written in a language whose syntax and semantics is formalized and standardized. Because
the services in our framework are similar and the semantics is built in the request model,
reasoning is done directly on the code to be executed as opposed to reasoning on the services’
semantic description and service request.
Furthermore, service composition using a group of services offering similar functionality has
generally been based on the Quality of Service (QoS) of the web services
[26][43][124][49][34]. The overall quality of a service is modeled as the weighted sum each
of the QoS factors, as given by the Service Level Agreement (SLA) with the client. Some of
the QoS factors include performance metrics, security, transactional integrity, scalability,
execution price, reputation, reliability, availability, and throughput. The SLA dictates the
weight of each quality factor. [49][124] also provide a model for determining the quality of a
composite web service given the quality of its component web services. Typically, services
with the highest QoS are selected from a group of service to perform a task. Some drawbacks
of their techniques includes
- Vagueness in the definition of the QoS factors e.g. how objectively can security,
reputation, availability etc be measured
20
- Service selection criteria do not consider the effect the selection of a service for a task
will have on other tasks in a workflow. For instance, because a service will always
have the same QoS for similar tasks, the same service will be selected for the
execution two different but similar tasks that can run in parallel using two different
services.
2.1.2 Web Service Orchestration
The orchestration of a composite web service can either be centralized (mediator-based),
semi-decentralized, or decentralized (P2P-based) [24]. In centralized orchestration, one web
service, often referred to as mediator, controls the sequences of operations, data transfer, and
interactions of other web services. The bottleneck created by the mediator, inefficient overall
data transfer, and the inability to scale constitute some of the criticisms of this approach [24].
In semi-centralized orchestration, the mediator still controls the sequence of operations;
however, service-to-service data transfer no longer passes through the mediator. Instead, the
URL of the to-be-transferred data is passed from a server to the next through the central
server as opposed to the actual data. However, scalability and bottleneck created by the server
coordinating the orchestration still remains an issue [24]. In decentralized orchestration, the
coordination of the execution of the composite service is shared by all services (peers)
involved. The orchestration information required by each web service is embedded in the
service request. Hence orchestration information is recursively passed from one server to
another. However, decentralized orchestration may not be efficient if service composition is
dynamic, i.e., in scenarios where each of the services has to recursively compose their service
before orchestrating it [107][19]. Therefore, decentralized orchestration infrastructure such as
[19] separated the service composition from orchestration.
2.1.3 Geoprocessing Services Composition
Geo services are modular components of geospatial computing applications which are self-
contained, self-describing and can be published, located and invoked across a network to
access and process geospatial data from a variety of sources [1]. They form the basic
components for distributed geoprocessing framework over the internet. Being a type of web
services, geo services share the strengths and criticisms of web services, however, the
properties and weaknesses particular to them are discussed in this section. Usually, geo
services are standardized by OGC and some of the OGC-standardized services include WMS,
WCS, WCS-T, WPS, WFS, CS-W, and WCPS. Geo services, sometimes, differ from
conventional web services with respect to service discovery, description, messaging and
21
composition, however, the basic concept underlining both are the same. Several geo services
offering similar or different data and functionality can be combined in a complex workflow to
form a new composite service which can then, be orchestrated. Typically such orchestrations
today are performed manually using hardwired processing configurations, such as in the case
of cascading Open Geospatial Consortium (OGC) Web Map Service (WMS) and WPS
requests and BPEL-based OGC services orchestration[105][4][14][111]. Also, [21] proposed
a transactional interface for the WPS (WPS-T) which is effectively a “geoprocessing
workflow” execution service while [44] presented a framework for enactment of geospatial
workflows in the SAWGEO project. Due to its lack of semantic description of data, services
and service requests, OGC services do not have automatic service composition capabilities
and this has been an active field of research. In [97][92][72], different approaches to
automatic composition of geoprocessing service based on OWL-S were presented, however,
their composition techniques share the shortcoming of semantic web service composition.
Similarly, in order to improve the performance of composite geo-services, [70] investigated
geo-service composition, whereby services are selected from a group of similar services to
participate in a business process based on QoS functions. Also, [68][17] and [18]
demonstrated the composition and orchestration of geo-services in grid and cloud computing
framework. However, to a large extent, these are based manually pre-configured composite
services, and the efficiency of the composite service with respect to execution duration was
not considered.
2.2 SCHEDULING
The decomposition of a query request involves the scheduling of the component tasks or
operators in the query. Task scheduling, which can be defined as the allocation of tasks to
computing servers, is an NP-complete problem [52], hence, research into this problem has
produced several heuristics based algorithms. The algorithms can be classified as local or
global; static or dynamic; application centric or resource centric; distributed or centralized;
cooperative or non-cooperative; approximate or heuristic; adaptive (based on either resource,
application, or dynamic performance adaptation) or non-adaptive, homogenous systems-
based or heterogeneous systems-based, etc [38]. A few of these classification schemes
relevant to this thesis are discussed below.
Based on the static vs. dynamic scheduling scheme, a schedule is static if all the to-be-
scheduled tasks are scheduled at the scheduler’s compile time i.e. before being executed [28];
however, each of the tasks is scheduled as they arrive (at runtime) in dynamic scheduling.
22
Generally, dynamic scheduling is used when tasks runtime behavior cannot be determined in
advance. In this sort of scenario, scheduling with the objective of minimizing response time is
almost impossible [54]; therefore, tasks are typically scheduled with the aim of balancing
loads on all available servers i.e. keeping all servers running [54]. Hence, a dynamic
scheduling system would normally have the runtime system state estimation and decision
making components.
On the other hand, static scheduling is usually used when the cost of processing of a task can
be estimated before the execution, and it can be classified based on dependencies of the tasks
on each other. In independent task scheduling, all the tasks to be executed are independent
from and not related in any way to any other task to be scheduled. Examples of algorithms
used in this scenario include Opportunistic Load Balancing (OLB), Minimum Execution
Time (MET), Minimum Completion Time (MCT), Min-min, Max-min, Duplex, Genetic
Simulated Annealing (GSA), Genetic algorithms, Simulated Annealing, XSuffrage etc [32].
Conversely, in dependent task scheduling, the task to be scheduled is usually a workflow of
tasks which are normally represented as Directed Acyclic Graph (DAG). Broadly speaking,
DAG scheduling algorithms can be broadly classified into list scheduling algorithms,
clustering algorithms, Arbitrary Processor Network (APN) algorithms, duplication based
algorithms, guided random search algorithms [112]. In list scheduling, the nodes of a DAG
are placed in a list arranged by the descending order of some assigned priority computed for
the nodes. Nodes with a higher priority are scheduled before nodes with lower priority.
Clustering algorithms typically aim at minimizing data transfer costs in a DAG by grouping
together heavily communicating nodes into a cluster. In duplication based algorithms, tasks
are redundantly duplicated on different resources in order to reduce communication cost.
APN algorithm takes the architecture and topology of the network and its routing strategy
into consideration while scheduling. Guided random search algorithms use the principles of
evolution and natural genetics such as inheritance, selection, recombination, mutation,
crossover to find an optimal configuration for a system, given some specific constraints.
Furthermore, scheduling algorithms can also be classified using their objective functions.
According to [123][118] [101][56][54], some of these objective functions response time
(makespan) minimization, overall total time minimization, data transfer size minimization,
CPU usage minization, load balancing, minimizing energy usage (green computing),
maximizing fault-tolerance, maximizing availability, satisfying client’s processing server
requirement , satisfying a SLA etc.
23
However, because a coverage processing request can represented as a tree which is a form of
DAG, this project focuses on statically scheduling DAG (workflow), over heterogeneous set
of computing resources such that the response time is minimized.
2.3 QUERY PROCESSING OVER DISTRIBUTED DBMS
One of the strengths of database systems is their ability to allow users specify queries
declaratively (sufficiently describe the result without specifying the evaluation process),
together with their capacity to find an optimal execution plan for such queries. A DBMS
optimizer estimates the costs, using a cost model, of a range of alternative execution plans.
However, the emergence and growth of distributed database, defined as a single logical
database which is spread physically across several systems that are connected via
communications network, introduces new query processing challenges. Distributed DBMS’s
can be classified by their architectural models, heterogeneity of DBMS’s software, level of
control of each server, data distribution strategy etc [29]. A distributed DBMS is homogenous
if all DBMS’s of the participating servers are the same; otherwise, it is regarded as being
heterogeneous. In federated or multi-database systems, [75] the individual database systems
are administered autonomously, and the data are shared by the importing and exporting of
part of the different peer database’s schema. With regards to architectural models, the main
classifications of distributed databases include client-server, peer-to-peer [8], and wrapper-
mediator [53]. Similarly, several schemes exist for distributing database relations and these
include vertical fragmentation, horizontal fragmentation, data replication, and several
combinations of these[29][91][94].
Distributed Query Processing (DQP) over databases has been intensively researched since the
70’s [29]. One of the aims of DQP is the scheduling of query operators on resources such that
an objective function is achieved while ensuring the optimum parallelization of the execution
of the query operators. Usually, the objective function is minimizing the response time
(makespan) of the query. The cost factors [94] considered in the cost model for determining
response time may include the operators’ computation cost, the communication cost, the
memory storage cost, and the I/O cost (for both the initial data read, and temporary data read
and write).
In DQP [29], after the global queries are received, they are parsed and translated to a query
tree which is then optimized logically and physically using both single-node and multi-node
24
algorithms. Afterwards, the query tree operators are scheduled, and finally, executable code is
generated and executed for the scheduled tree. Query tree optimization and scheduling in
DQP still remain largely challenging [75]. To this end, several execution techniques, and
optimization and scheduling algorithms have been proposed. According to [29], optimization
techniques include row blocking (shipping tuples in block/batch-wise fashion), smooth
burstiness, caching and replication, parallelism (inter operator, intra-operator, inter-tuple, and
pipelined), double-pipelined hash join, semi-joins, joins with data partitioning etc. The query
scheduling algorithm adopted is dependent on the execution techniques. However, two main
search strategies are generally used for scheduling operators in distributed DBMS and these
are the deterministic strategies and randomized strategies [29][91][94]. Starting from the base
relations of the global, left-deep query tree, deterministic strategies build more complex (sub)
plans from simpler ones by generating all the likely optimal combinations of simple sub-
plans, and dropping the inferior ones. If the plan is built by doing a depth-first search, it is
referred to as greedy deterministic strategy; however, in dynamic programming, the plan is
built using a breadth first search. The deterministic strategy has high time complexity,
therefore, randomized technique for scheduling query operators was proposed. In the
randomized strategy, principles of evolution and natural genetics such as inheritance,
selection, recombination, mutation, crossover to find an optimal scheduling plan is used.
Here, an initial plan is constructed, and this is recursively, and randomly transformed to a
new plan if its fitness (given by a fitness function) is improved. Iterative improvement and
simulated annealing [56][84][96] are two of the algorithms used in randomized strategies.
Both differ on the conditions for stopping the recursive transformation.
Furthermore, in order to reduce the cost of scheduling, left-deep query trees are typically
considered for scheduling in DBMS-based DQP [29][91][94]. This is due to the fact that the
left-deep tree’s search space is much smaller than that of a bushy-tree (whose search space is
not restricted); hence, a near optimal solution can be more easily found. However, through
time (execution duration) in a left deep tree is usually high, and bushy trees offer the best tree
structure for inter-operator parallelism.
Some of the DBMS-based middleware using this approach include DISCO [13], Garlic [73],
Hermes [100], TSIMMIS [53], Pegasus [77], etc. Also, several levels of support for a
mediator-based data integration is provided by major database vendors such as IBM (using
DB2 Propagator), Sybase (Replication Server), Microsoft (SQL Server), and Oracle (using
Data Access Gateways) [5][29]. Classical systems like R* [40] integrate data from several
25
databases, while SDD-1[90], present mechanisms for distributed join processing algorithms
on a homogenous set of servers.
However, this thesis focuses on distributed execution of queries specifying the processing of
coverages available on different coverages databases on several servers. Because of the
fundamental difference in coverages and relational databases query processing (Section 1.3),
many of the scheduling algorithms, parallelization, optimization, and execution model for
DQP in relational databases would not suffice for coverages query processing. For instance, a
typical DBMS-based DQP will focus on scheduling relational join operators on servers based
on different join algorithms. However, in D-WCPS, scheduling the costly coverage
processing operators takes precedence over comparatively less expensive join operators.
Therefore, this thesis investigates which DQP algorithms can be adopted and /or modified in
the D-WCPS.
2.4 GRID-BASED DISTRIBUTED QUERY PROCESSING
Grid computing provides the platform for efficient sharing of resources in a distributed
computing environment which consists of several heterogeneous systems. The grid, which is
typically used for long-running applications whose input data may be distributed, is usually
composed of many service-based components. For instance, the Globus grid [57] consists of
Globus Monitoring and Discovery Service (MDS), Globus Replica Location Service (RLS),
Transformation Catalog (TC), GridFtp etc. While the Globus MDS collects real time
information of the status of the available resources, and their properties e.g. the scheduler
queue length, load, and available disk space etc, the GridFtp service is used for efficient and
reliable data transfer. For efficient distribution of tasks among various resources, the grid also
consists of scheduler service which allocates tasks to servers. Several middle-wares exist for
scheduling and/or executing tasks in the grid. Condor [31] and Legion [64] dynamically
schedules independent tasks based on some client’s server specification for the task. Condor
matches a task to a resource whose runtime status and properties fits the resource
requirements description (as given by the client) of the task. GridAnt [67] and DAGMan [25]
act as meta-schedulers for Condor and Globus grid by managing the scheduling order
(dependencies) of tasks in a DAG. Therefore, they dynamically determine the every next task
to be submitted to a Condor/Globus system (which does the actual scheduling). Similarly,
Pegasus system [77] maps tasks in abstract workflow to the servers in different computing
environment such as desktops, campus clusters, grids, and now clouds. It comprises of
Pegasus mapper, DAGMan and Condor/Globus. The abstract workflow can be composed
26
manually or in systems such as Kepler, Triana [65] etc. One major criticism of the above
described grid middle-wares is that their scheduling strategies do not typically consider
communication and data I/O costs. GrADS [51] uses application-dependent schedulers to
map data and tasks to resources, however, it is criticised for filtering of an extended resource
set to a few manageable ones in an application independent way. Similarly, AppLeS
(Application Level Scheduler) [103] comprises of several application-based agent. Each
agent selects resources and determines an adaptive, efficient schedule for the application the
agent is serving. The resource selection is based on resource capacities, access rights, client
directives etc while the scheduling can be based on any scheduling algorithm or heuristic
rules. However, as far as we know, there has not been any activity in creating an agent that
can be used for distributed query processing.
Grid data management has been challenging due to the size and complexity of the data,
hence, the use of grid enabled databases for managing the data. A distributed query over such
databases can, thus, achieve less expensive data integration. Although, some of the above-
mentioned grid middleware support DAG’s which can be used to represent the query tree,
many of the middleware in grid are not suitable for query processing. Hence, the proposal for
specific scheduler and executors for grid query processing. Some of the well-known generic
grid-enabled query processors include Polar [63], OGSA-DQP [78], SkyQuery [114],
CoDIMS-G [117]; and GridDB-Lite [104]. The main task of GridDB-Lite is the selection and
transfer of distributed scientific data from storage cluster to compute clusters. Polar is a
distributed object oriented query processor which accesses remote data using remote
operation calls. OGSA-DQP and CoDIMS-G are based on service oriented grid. OGSA-DQP
extends the concepts in Polar* by automatically composing a static, per-client DQP service
instance from a set of manually selected resources. It uses two grid services for its query
processing namely: Grid Distributed Query Service (GDQS) which decomposes a query tree
into several scheduled sub-query trees, and Grid Query Evaluation Service (GQES) which
execute and returns the result of each sub-query tree. CoDIMS-G, on the other hand, profiles
services in order to select the resources to use, and adaptively reschedules query operators
based on runtime conditions. Based on the assumption that data is re-used across queries,
[46] developed Active Proxy-G Service (APG) middleware, which schedules queries to fulfill
load balancing objective function and caches query results in order to optimize query
evaluation. SkyQuery provides an implementation of mediator-based DQP for querying on
27
distributed astronomic data archives in a SOA. Its mediator dynamically tests for
performance of the servers before scheduling and executing a query.
2.5 RASDAMAN AND MULTIDIMENSIONAL ARRAY QUERY PROCESSING
WCPS query processing is based on the concepts of rasdaman, a domain-independent array
database management, overlaid with geo-semantics. Rasdaman extends standard relational
database systems with the ability to store and retrieve multi-dimensional array data of
unlimited size through an SQL-style query language with highly effective server-side
optimization [87][88].The logical optimization strategy is based on a rich set (about 150) of
heuristic query re-writing rules, and physical optimization is driven by its indexes and tiling
information [99]. Furthermore, rasdaman stores coverages by partitioning the MDD into
arbitrary tiles and accesses the tiles affected by the query using the tile. It also processes a
query by streaming coverages tiles between query operators. Therefore, while basing our
implementation on rasdaman server – a comprehensive multi-dimensional array database
system, we extend the principles of WCPS to incorporate distributed processing in this
project.
2.6 SUMMARY: DQP VS. DISTRIBUTED WCPS PROCESSING
One of the main contributions of this thesis is the static scheduling of a DAG (workflow),
which represents a coverage processing query, over heterogeneous set of computing
resources such that the response time is minimized. Therefore, our work centres more on the
use of static scheduling algorithms for scheduling coverage query processing workflow. With
respect to the suitability of the frameworks that we have presented (in the SOA, grid and
database research) for distributed coverage processing, we note the following
In DBMS, grid and SOA-based DQP, many of the scheduling algorithms,
parallelization, optimization, and execution model deal with relational and xml
databases as opposed to coverages. For instance, a typical DBMS-based DQP will
focus on scheduling relational join operators on servers based on different join
algorithms. However, in D-WCPS, scheduling the costly coverage processing operators
takes precedence over comparatively less expensive join operators.
Because the grid deals with heavy weight, long running scientific applications, the costs
of the grid-based DQP scheduling and execution overhead is relatively small compared
to the query execution cost. Hence, grid systems typically schedules tasks dynamically
(during runtime) based on the systems state, or they may use expensive static
28
scheduling algorithms (whose cost is still very small compared to task execution costs).
However, in this thesis, we focus on medium weight applications (e.g. WCPS) whose
running time varies between milliseconds to minute Hence, cost of overhead of grid
based methods in D-WCPS is significantly large and inefficient.
Similarly, as far as we know, only intra-operator parallelism is used in the grid
computing frameworks, however, DQP framework should be able to handle other forms
of parallelism such as intra-operator and inter-tuple parallelism as well.
Current trends specific the use of SOA for enacting distributed systems. However, the
concepts and framework of DBMS-based DQP is typically not based on SOA as
compared with D-WCPS which is based on SOA.
Furthermore, the centralized mediator used for the maintaining the registry and/or
executing the distributed query in most of the mentioned systems constitute a
performance bottleneck and single point of failure. Hence, there is a need to distribute
both the registry and the distributed query orchestration.
Thus, this thesis involves the adaptation and combination of approaches used in different
computing frameworks for distributing the execution of a query. Some of these frameworks
include grid computing, service oriented computing, and distributed databases framework.
For instance, we integrate workflow scheduling (one of the strengths of grid computing) with
query optimizations techniques of distributed databases for dynamic distributed
geoprocessing in a semantic web.
29
Chapter 3
DISTRIBUTED WCPS FRAMEWORK
Having introduced the potentials of distributed processing of coverages using the WCPS, this
chapter presents Distributed WCPS (D-WCPS) - the framework in which WCPS servers can
efficiently share data, loads and geo-processing tasks. This comprises of several WCPS
servers and their resource-aware registries. The processing capacity and coverages’ metadata
of each server is published by the server and stored in the service registry. A WCPS server,
having received a query request, will query the registry for coverages locations and server
capabilities. Using this retrieved information, the server automatically extract portions of the
query that are best resolved locally, distribute other requested parts to other suitable servers,
re-collect results and package them. Coverage processing servers can therefore, dynamically
speed up and scale up their capacity in response to the computational cost and complexity of
a processing request. In addition, query optimization techniques, adaptive D-WCPS
orchestration model, and a mirrored resource-aware registry infrastructure for query
decomposition is proposed in this chapter. Query scheduling algorithms is, however,
presented in subsequent chapters.
3.1 RELATED WORKS
Distributed Query Processing (DQP) has been extensively studied in distributed database
systems. Traditionally, DQP focuses on loosely coupled relational databases with little or no
shared physical components. These can be classified as either homogenous or heterogeneous
(depending on the number of database management systems (DBMS) used), centralized or
decentralized (with respect to the level of control of the central server), and federated or
unfederated (based on whether the whole or part of a local database is shared) [75][29].
Similarly, several schemes exist for distributing database relations and these include vertical
fragmentation, horizontal fragmentation, data replication, and several combinations of these.
For DQP, a mediator-based method is typically used in classical DBMS. In this approach
[29], a mediator’s registry stores and integrates the data sources schema, statistics and
properties. It also contains the server properties of all the data servers. Global queries (queries
30
which address more than one server) are directed to the mediator which parses and translates
them into a query tree. Using information from the registry, the query tree is optimized
logically and physically using both single and multi-node algorithms. The query tree
operators are, later on, scheduled, and execution code is generated and run for the scheduled
tree. Usually, the orchestration and integration of partial results from other servers are done in
the mediator. Some of the DBMS-based middleware using this approach include DISCO
[13], Garlic [73], Hermes [100], TSIMMIS [53], Pegasus [77]. Also, several levels of support
for a mediator-based data integration is provided by major database vendors such as IBM
(using DB2 Propagator), Sybase (using Replication Server), Microsoft (SQL Server), and
Oracle (using Data Access Gateways) [5][29]. However, in the context of distributed
coverage processing, a DBMS-based DQP is not totally suitable due to the following:
- DBMS-based DQP’s scheduling algorithms, parallelisation and optimization
techniques, and execution model deals with relational and xml database as opposed to
coverages’ database. For instance, a typical DBMS-based DQP will focus on
scheduling relational join operators on servers based on different join algorithms.
However, in D-WCPS, scheduling the costly coverage processing operators takes
precedence over comparatively less expensive join operators.
- The mediator which is used for the maintaining the registry and executing the
distributed query constitute a performance bottleneck and single point of failure.
- Concepts and framework of D-WCPS is based on Service Oriented Architecture while
DBMS-based DQP is not.
Similarly, numerous studies have been done on DQP on grid computing resources. These
primarily focus on the volatility and unpredictability of the computing environment. The grid
system handles the underlining job of data transfer, security, instances creation, and resource
discovery and management while the database system handles the job of query processing.
Similar to DBMS-based DQP, these consist of a mediator which stores and integrate server
capabilities (static and dynamic) and schemas of several heterogeneous data sources in its
catalog. The mediator also compiles, optimises, partitions, schedules, and executes a query.
Query evaluation service, typically wrapped around a query execution engine, executes a
partial query, and ships the result to the mediator which integrates the results of several
partial queries. Some of the well-known generic grid-enabled query processors include Polar
[63], OGSA-DQP [78], SkyQuery [114], CoDIMS-G [117]; and GridDB-Lite [104].
Compared with our D-WCPS, we note the following about grid-based DQP
31
- Because the grid deals with heavy weight, long running scientific applications, the
costs of the grid-based DQP scheduling and execution overhead is relatively small
compared to the query cost. However, WCPS is typically, a medium weight
application whose running time varies between milliseconds to minutes. Hence, cost
of overhead of grid based methods in D-WCPS is significantly large and inefficient.
- Similar to DBMS-based DQP, it is mediator based, and focuses on relational and xml
–based query operators
Furthermore, a component common to all distributed systems is the registry. These can go by
different names such as Universal Description Discovery and Integration in SOA, OGC
Catalog Service for the Web in OGC web services, federated database catalog in distributed
databases, Monitoring & Discovery System in Globus Grid [73] etc. To ease management
overhead, registries are usually centrally located. However, besides having a performance
bottleneck and a single point of failure constituted by the centralized registry, this
architecture is not efficient in processing queries because of the communication delays
between the registry service and its client. Another proposal uses decentralized Peer to Peer
registry based on distributed hash table [22]. Although this system scales up and is resilient to
failure, they response times for a query is usually large. Also, its registry’s management is
complex, and it does not efficiently support range queries. Similarly, some other registries are
based on meta-directory architecture whereby a node stores meta-information about the
distributed registries [6]. This has the advantage of scalability with respect to data sizes,
however, performance is still an issue and it is prone to single point of failure. Hence, we
proposed the use of mirrored registry architecture for D-WCPS whereby the registry is
duplicated on all servers. This has the advantage of being highly efficient in its query
processing because it is available locally on every server. However, synchronization of all the
servers for transaction based queries (updates, insert, delete) can be costly. As it is not
expected that the rate of transaction in D-WCPS is going to be high, this cost will not be
significant. Besides, since the database is mirrored on all servers, the registry is more resilient
to failure, and the central server’s performance bottleneck is removed.
Overall, our proposal is unique in the following respect
- We focus on processing coverages, and more emphasis is laid on optimizing,
scheduling coverage processing operators rather than relational operators. We also
present an orchestration model which uses pipelined, intra-operator, and inter-
operator parallelism at various stages of the query execution.
32
- We use an architecture where every participating server can serve as a mediator. This
implies that every server can schedule queries (using information from a
synchronized, mirrored, local registry) and can execute either full or partial queries
3.2 OVERVIEW OF D-WCPS FRAMEWORK
On a high level, we present D-WCPS - a framework where several WCPS servers can
collaboratively share the computation of a geoprocessing service request. Every server in this
framework is a mediator i.e. every server has a local copy of the global registry of data and
services, and can decompose and orchestrate distributed queries. Information from the
registry is used to decompose a global query request into a distributed query request which is
then executed. As shown in Figure 3. 1, every D-WCPS server runs a WCPS and registry
services, and these in turn, are made up of several components. The modus operandi of these
services and their components are discussed in the sections below
3.2.1 Overview of D-WCPS Query Processing
The procedure of composition and execution of distributed WCPS is adapted from distributed
DBMS-query processing. After the global query (a query with no distribution information
which addresses more than one server) is received by any of the servers, the following actions
take place-
- The parser transforms the query to a query tree.
- The locations and metadata of the coverages to process, and the servers to be used for
the distributed processing together with their capacities (e.g., CPU speed) and
Query Parser
Query Tree Optimizer
Cost-Based Query Tree Scheduler
Distributed Query Orchestrator
External Service
Invocator
Query Processor
(rasdaman)
Coverages Database WCPS
Service
Server Calibrator
Registry Coordinator
Registry Service
Distributed WCPS Service
Data and Service Registry
Figure 3.1: Components of a D-WCPS Server.
33
restrictions (e.g., maximum memory available) are then determined by querying the
local WCPS registry.
- Based on the query structure, data sizes and location, the coverage processing query
tree is optimized for distributed execution by rearranging its operators using different
sets of query tree equivalence rules.
- Using the server capabilities and initial data locations, the query is decomposed to a
distributed query such that fulfils an objective function. Several objective functions
exists and these include minimizing execution time, total data transferred, and total
time spent on all servers; maximizing throughput; and load balancing on different
servers. Decomposition of the query tree involves the scheduling of the query tree
operators on different servers. This is an NP-complete problem, hence, the use of
heuristic-based algorithms [52].
- After the scheduling, the global query is re-written into a distributed query (query
with scheduling information). The distributed query is a recursively nested query and
this is executed using a P2P-based technique. Also, depending on the structure of the
query, different types of parallelism can be used during its execution. In addition, this
infrastructure ensures that the distributed execution is resilient by dynamically
adapting the queries during execution to the conditions in the network.
3.2.2 Inter-Tuple Parallelism
The processing model of WCPS, as shown in Chapter 1, consists of list processing tree and
coverage processing trees. The cross product operator of the list processing tree creates a
cartesian table of all the coverage lists bounded to each coverage iterator in the query. For
each of the tuple in the product table, the predicate expression and/or processing expression is
executed. Compared to the coverage processing operation, the cartesian product is a cheap
operation and so, in the D-WCPS, the operator is executed on the server which receives the
query. However, the execution of the coverage processing expressions is distributed on
several servers. Since each tuple can be processed independently of other tuples, we
parallelise the processing of each of the tuples. Therefore, our processing methodology is
such that the optimization, scheduling and execution of D-WCPS query are parallelised on a
tuple-by-tuple basis for all the tuples in the cartesian product table. So, given the example
query
For a in (X, Y),
34
b in ( Z )
where max(a) < avg(b)
return encode (cos(a) + min(b), “tiff”)
Table 3.1: Cartesian Product Table of WCPS Query
Tuples Iterator a Iterator b Query Materialization
1 X Z Predicate Expression : max(X) < avg (Z)
Processing Expression : Encode(cos(X) +
min(Z), “tiff”)
2 Y Z Predicate Expression : max(Y) < avg(Z)
Processing Expression : Encode (cos(Y) +
min(Z), “tiff”)
A cartesian product in Table 3.1 is created whose processing is then parallelized tuple-wise
3.2.3 Query Re-writing
In order to increase the efficiency of the execution of the D-WCPS, the global query is
optimized by re-writing of the query based on a set of equivalence rules. This is done using a
two-staged approach – applying single-node optimization before a multi-node optimization.
Generally, optimization re-arranges the ordering of operators of a query tree for efficient
execution. The optimizations in this thesis are applied to the coverage processing trees, and
not the list processing tree. Hence, any reference to operator with regards to query
optimizations henceforth implies coverage processing operator. Optimizations applied to the
coverage processing tree are hereby presented below.
3.2.3.1 Single Node Optimizations
The single node optimization increases the efficiency of a query execution on a single server
and is based on rasdaman query re-writing rules [99]. The rationale and proof of the
optimizations can be obtained from [99]. Overall, the idea of this type of optimization is to
minimize the size of the data processed by an operator. This is because, smaller input data for
an operator implies
- Less processing work would be done by the operator and
- Reduced data transfer time between an operator and its operand.
35
To describe the main optimization rules adapted from [99], we represent a coverage
expression as , a scalar expression as , and unary induced operation (an induced operation
with only one input) as . Binary induced operation (an induced operation with
two inputs) is represented by
- if both of its operands are coverages,
- if one of its operands is scalar and it is at the right side of the
expression
- if one of its operands is scalar and it is at the left side of the expression
i) Domain-reducing Operation-based Optimization
To reduce the sizes of coverages transferred to, and processed at several operators in the tree,
every coverage domain reducing operators is pushed down the query tree along induced
operators i.e. coverage domain reducing operator is recursively swapped a with its child
operator if the child operator is a type of induced operator. Coverage-size reducing operations
include slice, trim, and scale (in some instance). Therefore if represents trim, slice and
scale operator, the transformations on the tree is given as below
( )
( )
( )
( )
For instance, assuming OG represents coverage subsetting operation and OC represents some
unary induced coverage processing operation. By pushing down of the geometric operation as
shown in Figure 3.2, the cost of executing OC, and transferring data between OC and OG will
be smaller.
Large data transfer Smaller data
transfer
OC
OG
OG
OC
(a)
Figure 3.2: Pushing Down Coverage Reducing Operations.
36
ii) Operators’ Associative-based Optimization (for operators +, *, / , and, or,
==, ≠ )
In this optimization (shown in Figure 3.3), a scalar valued operand c of an operator a is
swapped with a coverage valued operand e of c’s sibling2 operator b if both a, b {+, *, / ,
and, or, ==, ≠}. From the Figure 3.3, the data transferred to operator b is reduced, and
consequently, cost of executing b is decreased, while the cost of processing of and data
transferred to at a remains constant. Mathematically
( )
( )
iii) Operations’ Distributive-based Optimization
The aim of this optimization is the reduction the total number of operators using their
distributive properties. As a result, less data would be transferred and less processing would
be done in the query tree execution. With the operands p1, p2, p3 being either scalar or
coverage expressions, and ( 2) { (+, *), (-, *), (or, and), (and, or) }, the transformation
(illustrated in Figure 3.4) is given as
( p1 2 p3 ) ( p2 2 p3 ) ( p1 p2 ) 2 p3
2 Sibling operators are operators which have the same parent operator
b c
a
d e
b e
a
d c
Figure 3.3: Operators’ Associative-Based Optimization.
Figure 3.4: Operators’ Distributive-Based Optimization.
a c
b
d e
a
b
c d
b
c e
37
iv) Aggregation Operations-based Optimization
Similar to the other optimizations, pushing down aggregation operation would ensure that
extremely small data sizes is transferred to and processed by the subsequent parent operator
of the aggregation operation. This rule therefore, states the condition under which the
aggregation operation can be pushed down. So, with the operands ( ) ( )
{( ), ( )}, where i and j are the operations names, the following rule
applies
( ) ( )
3.2.3.2 Multi-Node Optimizations - Left-Deep Tree Transformation to Bushy Tree
The execution duration of a left deep tree would be high, irrespective of scheduling algorithm
used. Therefore, to prepare the query tree for distributed execution (the aim of multi-node
optimization), we transform left-deep tree to bushy tree. This enables more operators to be
executed in parallel. Along this line and based on the associative properties of
connected/adjacent operators, we propose the following
( p1 p2 ) p3 ) p4) ( p1 p2 ) (p3 p4) , (for operators {+, *, or})
( p1 p2 ) p3 ) p4) ( p1 p2 ) (p3 2 p4) , (for operators -, /, and where 2 is +,*,
and or)
3.2.4 Distributed Query Modelling and Its Orchestration
A distributed orchestration model, whereby WCPS servers recursively invoke other servers
with a distributed query request, is adopted for executing distributed WCPS query. After a
server receives a distributed query request, it invokes other servers with partial query requests
as specified in the query, executes its local query requests, and integrates the results of the
Op
Op
Op Op
Op Op
Op
Op Op
Op
Op Op Op Op
Figure 3.5: Left Deep Tree Transformation to Bushy Tree.
38
external service invocations and local query execution. The process of integrating external
and local query responses may involve writing a temporary copy of very large data which
may be too big to fit into the systems memory. The distributed query request is composed by
the server which receives the global query using its scheduler. Therefore, every server peer
used in the execution of the distributed query need not run the scheduler on the query again
except if there is a significant change in the conditions in the network e.g. excessive load on
an execution server. In such scenario, the distributed query execution can be adapted at
runtime by the rescheduling of some of the partial queries to some other servers (see Section
3.5).
Similarly, the WCPS query syntax is modified to support such distributed execution. In the
introduced modifications, a coverage iterator will not only bind to coverages, but it can as
well bind to partial queries with a specified execution host. The processing expression in the
query specifies the local query to be executed and how to integrate the response to the
external web service invocation. Using the EBNF grammar where
- underlined tokens represent literals which appear “as is” in a valid WCPS expression
- brackets (“[…]”) denote optional elements which may occur or be left out;
- an asterisk (“*”) denotes that an arbitrary number of repetitions of the following
element can be chosen, including none at all;
- a vertical bar (“|”) denotes alternatives from which exactly one must be chosen;
The D-WCPS query syntax is shown below:
wcpsQuery:
for in ( | (wcpsQuery on host)) *( , in ( | ( wcpsQuery on host)))
[where booleanProcessCoverages()]
return processCoverages()
where
- “wcpsQuery” is the WCPS query expression. Note that it is be nested.
- represents coverage iterators for the respective coverage lists
- processCoverages() is a coverage processing expressions for processing coverages
where the predicate function booleanProcessCoverages() holds
39
- “host” represents the execution server for a inner WCPS query expression.
By the way of example, the query below shows different nesting levels of subqueries
For p in
S
ubQ
1
For r in (
S
ubQ
2 For t in (T)
return encode( cos( t),
“raw”)
on server_B Su
bQ
3
)
s in ( S )
return encode((a+b),“hdf”)
on server_A
)
q in ( Q )
return encode ( x + max(y), “tiff” )
The local server that receives or composes the above distributed query SubQ1 invokes
server_A with the SubQ2, and starts executing its local query. The local server then integrates
the local result with the SubQ2 results and further processes the data as specified in the query.
Server_A in turn, invokes server_B with SubQ3, and integrates its response with its local
query before finally shipping the result.
3.3 THE FRAMEWORK’S REGISTRY
As it has been noted in the related works section of this chapter, several web and grid registry
services architecture exists. Three main ones include centralized, decentralized and meta-
directory architecture. The architecture adopted for a framework depends on the trade-offs
between requirements such as efficiency, scalability, simplicity, availability, fault-tolerance,
ease of management, flexibility, allowance for redundancy, rate of update, support for
different kinds of queries(range or singleton query), ability to easily classify the registry
entries etc. Hence, there is no one-size-fits-all registry.
In the D-WCPS framework, emphasis is placed on efficiency, fault-tolerance, and ease-of-
configuration. In addition, we envisage a system whose rate of registry update is relatively
slow and which scales to several thousand nodes in terms of services registered; each service
on the other hand can also locally hold and serve several thousands of coverages. Hence, the
registry should be able to potentially hold millions of records with respect to coverages’
metadata. In this respect, we propose mirrored registry architecture whereby D-WCPS
registry is duplicated on every server. Because each server has a local copy of the registry,
40
querying to obtain the necessary services to use in the query decomposition and execution is
highly efficient compared to if the registry were to be external. This is because the
communication delays between an external registry service and the D-WCPS server can
offset the gain of decomposition for some light to moderate weighted queries. Although, this
has the setback of taking up extra disk space, however, the disk space used is negligible
compared to the typical sizes of coverages. Besides, since the database is mirrored on all
servers, the registry is more resilient to failure, and the central server’s performance
bottleneck is removed. Each local registry in the network is kept in sync with the others, and
updates are only made when a new server joins the network, a new data is added to /deleted
from a server, a server’s static properties change, or when the properties of some data saved
is changed. Therefore, the challenge in the networks is keeping servers in sync with minimum
effort, without necessarily overburdening a server, and without broadcasting synchronization
information to the network.
Based on the above requirements, we propose a registry architecture based on hierarchical
topology in arrangement of the servers in the D-WCPS overlay network (see Figure 3.6). To
join the network, each server registers with a server already in the network. Furthermore,
each server can only join the network from one server i.e. the server has only one gateway
and it can have several backup gateways. The backup servers are servers which are higher
than the gateway server in the hierarchical network topology. Similarly, several servers can
join the network through a (gateway) server. We refer to all the servers that share a gateway
Figure 3.6: Mirrored Registry Synchronization.
41
as sibling servers and/or children server of their gateway server. A server can only receive a
message from either its gateway or any of its child servers. When a server receives or
generates a message, it propagates it to all the servers connected to it (child servers and
gateway) except the source of the message. In this way, messages get sent to all the servers
without looping round the network endlessly. Furthermore, in order not to overload a server,
a server can have the maximum number of children servers it can have. If any intending
server wants to register with a server and its maximum number of child servers has been
reached, it forwards the registration information to its child servers in a round-robin fashion.
A server and its gateway will periodically send a keep-alive message to each other to monitor
when they are on or offline. If its gateway becomes unavailable, the server can re-register
with any other gateway server, most probably, its backup gateway servers.
Three interfaces are exposed by the registry service namely register, update, and info
interface. The registration interface is used for registering and de-registering servers. The
update is used to insert, delete or update information about servers or data, and the info
interface is used for management e.g. receiving and sending keep-alive messages, sharing of
network information between sibling3 servers.
3.4 CALIBRATION AND PROFILING
Information stored in the registry for each coverage include the data name, type, domain,
location, number of bands, tiling structure, etc. For each server, the information stored in the
registry include the overheads and speeds of a server with respect to coverage processing,
data write, and data read, average network speed to all servers in the network and overhead,
set of preferred servers together with the average network speed to them, memory available,
number of simultaneous processes it can run without degrading performance (which depends
on the number of virtual/physical processors), service endpoint etc. Every server has a
calibration engine which is used to measure its capacities for coverages processing. This
information gathered is published into the service registry and these include:
i) Data I/O Speed and Overhead
The read and write speed and overhead are obtained by writing data to and reading data from
the database, and these depend on type of disk systems used (e.g. whether it is RAID system,
virtual disk system, network disk, etc), and their speed, file system or database system used
and their configurations, etc.
3 Sibling servers are servers that have the same gateway server.
42
ii) Coverage Processing Speed
[66][33] opines that the quoted speed of systems in terms of MIPS, FLOPS, QUIPS, clock
rate cannot be used to determine the performance of a system due to factors such as inter-
processor communication, I/O performance, cache coherence and speed, memory hierarchy
etc. Therefore, we define the speed of processing of a system with regards to coverage
processing as the speed it takes a system to do a copy of a char-typed pixel from one memory
location to another. In other to calibrate the system with reference to the defined coverage
processing speed, a set of standardized queries are run on the system and their run time is
used to determine the speed. Furthermore, because systems typically have several physical
and/or virtual processors nowadays, systems can run several queries simultaneously, without
any significant degradation of its performance. Hence, we determine the maximum number of
queries a system can run without performance loss and store the information in the registry.
iii) Network Speed and Overhead
Assuming we have n servers in the network, storing the average network speed of each server
to each of the all other servers in the network would mean having (n-1)2 records of network
speed in the registry, which is overly large. Besides, network speed cannot be absolutely
determined, as it changes very rapidly. Therefore, instead of storing the network speed of a
server to each of all the other servers, we store the average network speed of a server to all
servers in the network. This is obtained by profiling the network speeds based on the
distributed queries executed by a server over time. This speed is assumed to be the network
speed of a server to every server in the network.
iv) Preferred Servers and Their Network Speed
Majority of scheduling algorithms especially, the list scheduling algorithms (see section
5.1.1) iterate over a list of servers before making the decision on the execution host of a task/
an operator. However, this is not efficient in a framework where there are a very large
number of servers. Therefore, in the D-WCPS framework, we assign to each server a set of
preferred servers. These are servers with high processing speed which have relatively high
network speed to the server. A server will prefer to use for these set of preferred servers
distributed geo-processing. We, therefore, restrict the servers which are iterated during the
scheduling to the set of servers consisting of the set of preferred servers of any server hosting,
at least, one coverage which is used in the query processing. In the case that the query so
expensive such that more servers will be needed for its execution, the preferred servers of the
43
set of preferred servers will be recursively requested and used in the scheduling process
(more details on the use of preferred servers in Chapter 5.2.1). Each server obtains its set of
preferred servers by profiling the response times of the query it sends out, and selecting the
servers with relatively high network and processing speed. These network speed and
overhead to the preferred servers from a server are stored in the registry.
3.5 SERVICE ADAPTIVITY
Conditions in the network will can change during the execution of the decomposition, and
this affect the quality of the schedule produced. Typically, adaptive scheduling systems
reschedule tasks based on operator-by-operator [7][39] basis. However, this would not be
efficient for most of the operators in WCPS because of the overhead of rescheduling can be
high compared with the cost of the operator. Hence we reschedule on at the level of sub-
query (group of operators). In this project, we address the following runtime disruption to the
network.
1) Server overloading
A server maintains a queue of every query it receives, and executes the queries in the query
based on FIFO. Using the estimated costs of queries on the queue, a server can approximately
determine the start time of the next query it receives. Similarly, a server also keeps a time
duration threshold within which a query request it receives must be executed. Any query
received whose start time exceeds the threshold is rescheduled. The rescheduling is however
based on the condition that the execution finish time of the rescheduled query is less than the
execution finish time of the query had it not been rescheduled (i.e. if it were to be one the
queue). The rescheduling process is based on the same scheduling concept of D-WCPS.
2) Server unavailability
If a server is not available, and the server is critical to the execution of the query, e.g. it has
some operations that no other server can perform or it has the data to be processed, then, the
query execution is terminated, and this information is cascaded to all the execution servers.
However, if the server is not critical to the execution of the query request, the partial query is
rescheduled to another server.
3.6 SUMMARY
The OGC WCPS offers a multidimensional raster processing query language with a formal
semantics which contain sufficient information for automatic orchestration. Based on this, we
44
present the D-WCPS infrastructure for distributed execution of WCPS query. This offers the
platform for dynamically composing WCPS services based on the query requests. Every
server in the D-WCPS network has a local copy of the WCPS registry, and servers
synchronize the updates to the registry with each other. Using the information from the
registry, and tuple-wise based parallelization, servers can optimize, decompose and execute a
received query. Furthermore, we present the model for an adaptive and decentralized
orchestration of the distributed WCPS query. Several servers can, therefore, efficiently
collaborate in sharing data, computation, loads and other tasks with respect to dynamic,
resource-aware coverages processing.
45
Chapter 4
COST MODEL FOR DISTRIBUTED
QUERY PROCESSING
Chapter 3 presents the infrastructure for distributed WCPS and the components of a D-WCPS
server together with their functions. However, the aim of this thesis is the decomposition of a
query using a cost-based scheduling of the query tree operators onto a heterogeneous set of
computing servers. Hence, to enhance automatic and efficient distribution of coverage
processing, a cost model for distributed processing of coverages based on WCPS is presented
in this chapter. This includes the algorithms for determining the cost of executing different
coverage data processing operations on different servers, together with the costs of data
transfer, data I/O, and data decoding and encoding etc.
To this end, we model a WCPS query as a query tree QT= (V, E) where
- V= {v1, v2,…vm|mℕ } is a set of nodes vi which represents coverage processing
operators
- E= {e1, e2, …em| m ℕ, ei =(vi, vj): vI, vjV } is a set of edges such that each edge ei
=(vi, vj) represents the dependence of operator vi V on vjV. This dependence
information represents the operators’ execution order.
The parent of an operator node v, given as
, is the operator to which the
operator v ships its execution result. Similarly,
the children of the operator v, given as
are the set of operators which
send their execution result to v. A sample
query tree is presented in Figure 4.1.
Figure 4.1: Sample Query Tree
46
4.1 LITERATURE REVIEW
Determining the distribution of geo-processing web services such as OGC services has
usually been manual and based on the functionality of the web service and rule-of-the-thumb
metrics[17][44][111]. However, to facilitate cost-effective, resource-aware distribution of
geo-processing services, algorithms for computing the costs of queries need to be developed.
In [59][116], profiling information is used to select web services used for processing a
client’s request, however, this system is based more on Quality of Service (QoS) criteria
rather than on any rigorously computed cost metric. Besides, it assumes a centralized
orchestration technique which is not cost-effective given the typical massive data transfer in
geoprocessing. A decentralized orchestration technique is, however, used in D-WCPS,
comparable to the P2P orchestration techniques described in [12]. Furthermore, for
realization of the accounting costs of processing in the clouds, [16] described an abstract
architecture using a pay-per-use revenue) model for geoprocessing services. However, the
model computes monetary costs during and/or after runtime as opposed to a priori execution
cost computation.
According to [36], the scheduling of computing tasks to different computing host is usually
based predictive policies which optimize a cost model, and a cost model assigns a value
(cost) to the execution of a particular execution schedule. In apriori scheduling, a schedule is
optimized using a particular cost model before runtime. Grid systems using apriori
scheduling, such as Apples[37] and GrADS[35] project, typically considers only the cost of
data transfer and task execution in their cost model. Similarly, the cost of distributed query, in
a distributed database system, is modelled as the cost of data transfer and tasks execution
[61][29]. However, data I/O (intermediate data write, and initial and intermediate data read)
costs are often neglected. This is based on the assumption that data sizes are relatively small,
and as such, are their I/O insignificant compared to the tasks duration. However, this cost can
be significant in middleweight applications (applications whose execution times is in
magnitudes of seconds to hours) which processes large-sized dataset, such as the case of
WPCS.
In predicting the performance of a system for a particular processing operation, [66][33]
opines that the quoted speed of systems in terms of MIPS, FLOPS, QUIPS, clock rate cannot
be used to determine the performance of a system due to factors such as inter-processor
communication, I/O performance, cache coherence and speed, memory hierarchy etc.
47
Therefore, the use of application-based performance metrics for benchmarking systems
[74][83] with respect to speed of an application processing.
Furthermore, as far as we know, relatively little attention has been paid to estimating cost of
executing coverage processing tasks on a systems before scheduling the tasks. However,
systems generally estimate the cost of tasks real-time using the runtime processing speed
(using on the duration and size of the tasks that has been done) and size of tasks that remains
to be done [80][45]. Consequently, their cost models cannot be used for a priori scheduling of
tasks. Therefore, using the semantics of the tasks to be done and system capabilities, the cost
of distributed executing coverage processing tasks on several servers is estimated.
4.2 DISTRIBUTED WCPS PROCESSING MODEL
As shown in Chapter 3, the distributed WCPS processing is specified as a recursive query,
where the execution host (WCPS service) of the higher level sub-query invokes the lower
level sub-query on another specified WCPS service. For example, consider the sample WCPS
query shown below.
For
a in ( coverage_a),
Sub-q
uer
y Z
b in ( for x in ( coverage_x )
return encode(cos(b),’tiff’)
on wcps_service_b ),
Sub-
quer
y X
c in ( for y in ( coverage_y)
return max(c )
on wcps_service_c)
Sub-
quer
y Y
return encode(a + b + c,"tiff")
It specifies that a WCPS server should
invoke a ‘wcps_service_b’ with a sub-query
X, and invoke a ‘wcps_service_c’ with a
sub-query Y. It should, then, locally execute
sub-query Z which integrates the results of
sub-query X and sub-query Y with its local
data. The tree representation of the above
query is shown is Figure 4.2. Note that the
“receive” and “send” node in the tree
carries the orchestration information.
Figure 4.2: Sample Distributed Query Tree
48
The execution of D-WCPS query is such that, coverage processing server recursively perform
the sequence below:
- Receive distributed query request
- Invoke all sub-requests at appropriate WCPS server
- Write to the disk (file system or database) coverage-valued response from requests
invoked. This is due to usually large-sized coverage response.
- Substitutes WCPS query with appropriate value for scalar-valued service response.
Scalar valued data includes strings and integers.
- Invoke local WCPS request and return response
Formally, we represent a distributed WCPS as a tree (see Figure 4.3) where
is a set of sub-query nodes and is composed of edges which indicates the
order of execution of the sub-queries. In turn, we model the sub-query c as a group of
connected or adjacent coverage processing operators, v, executed as a single unit on a
server , where is a set of computing hosts. In other words, is a (sub-)
query tree (Vc, Ec) where 2 is a set of nodes which represents coverage
processing operators in cluster , and 2 ( ) is a set of
edges in sub-query .
4.3 DISTRIBUTED WCPS COST MODEL
To facilitate the efficient distribution of geo-processing tasks, we present a rigorous cost
model which can be used to assess the suitability of distributed processing plans based on D-
Cluster_X
Cluster_Y D
C
F E
B
A
Cluster_Z
Figure 4.3: Distributed Query Tree Modeling
49
WCPS. Before going into cost model details, we define the following functions and
abbreviations
- pixelOutputSize(x): The number of pixels output of an operator or a (sub-)query tree.
For a (sub-)query tree, the pixelOutputSize() is the pixelOutputSize() of the root
operator of the (sub-)query tree.
- outputSize(x): The output size (in bytes) of an operator or a (sub-)query tree. This
varies with the data type and number of bands of the coverage. For a (sub-)query tree
y, the outputSize(y) is the OutputSize(r) of the root operator r of the (sub-)query tree.
- cost(x): The time duration for finishing a task x (which could be either data I/O, data
transfer, CPU processing etc)
- cpuCost(x): The CPU processing time for executing x on its allocated server
- readCost(x): The time it takes to read all the data processed by a (sub-)query or
operator x on its allocated server
- writeCost(x):The duration it takes (sub-)query x to be written on the server from
which it was invoked. Because DQT is a tree, the server which invokes x is the server
of its parent (sub-)query.
- commCost(ci, cj) :The cost of transferring the output of (sub-)query ci to the server of
(sub-)query cj.
- operatorType(v): The coverage processing operator type (e.g. tan, max) of an operator
node v.
Depending on the objective function to be satisfied by the distributed query; several cost
computation criteria and methodology can be defined. In this thesis, we model the cost of
executing a distributed WCPS as either the total cost (time) or the parallel cost (time).
The total cost of a DQT is the sum of all the processing cost of each sub-query on their
execution server, all data transfer (edges) cost, and all data writes and read cost in all sub-
queries in the distributed query tree. Because DQT is a tree-based structure, we compute the
total cost by recursively computing the cumulative cost of each sub-query in the DQT,
starting from the root sub-query i.e. the total cost of DQT is given by
where r is the root sub-query of DQT
The cumulative cost of a sub-query c, comprises of the CPU cost for
executing the sub-query, the cost of reading all the data used in the sub-query, and the cost of
50
transferring and writing of data from the children sub-queries, together the cumulative cost
for each of the children sub-queries
∑
The parallel cost is the duration of execution of the DQT; it is also referred to as the
makespan of the DQT. It is given by the critical path (most expensive path from the root to
the leafs) of the DQT. Similar to the computation of total cost, we recursively compute the
parallel cost of DQT. Assuming the function makespan(DQT) returns the parallel cost of a
DQT. makespan(DQT) can be given as
where r is the root sub-query of DQT
In subsequent sections, we discuss how to obtain the four main unknowns in computing both
the total cost and parallel cost of a DQT. These include cpuCost(x), readCost(x),
writeCost(x), commCost(xi, xj).
4.3.1 Communication Cost
Given a set of computing hosts H, and a function networkSpeed(hi ϵ H, hj ϵ H) which returns
the network speed from host hi to hj , the communication cost of the data transferred is:
( )
where returns the constant overhead cost incurred by a server
when it invokes another service and receives the response. A mesh topology is
assumed for the network because different network paths are usually taken from two servers
to each other. Therefore,
4.3.2 Intermediate Data Storage Cost
51
The size of a sub-query response and time difference between arrival different responses
requires temporarily writing the response to the disk as opposed to holding it in the memory.
Each server can then find an optimal means of accessing the temporary data e.g. through the
use of most efficient tiling strategy. Given a function writeSpeed(h ϵ H), The cost of
temporarily storing the output of sub-query c, on its parent cluster, parent(c), is therefore
given as:
( ( ))
where is the overhead incurred for temporarily storing
the data.
4.3.3 Data Read Cost
We represent cost of the data read for processing in a sub-query c as readCost(c). Because
several data are may be read for processing in the sub-query using the data read operator, we
model the readCost(c) as the sum of reading each coverages read in the (sub-)query. The data
read nodes of a query tree are the nodes, say x with operatorType(x) as “dataReader”.
∑
Where is the cost of reading individual coverages. Coverages are
read from the disk into the memory in blocks of tiles4. A tile block is read if its domain
5
intersects the coverage domain. Assuming Tc ={t1, … tn} represents sets of tiles whose
domain intersect the domain of coverage c, and a function coverageReadSpeed(h ϵ H) which
return the rate at which a servers read data into their memory, the data read cost is therefore
given as
∑
4 Tiles are the spatial unit by which coverages sub-divided, organized, and stored 5 Coverage/tile domain is the set of locations contained in (area covered by) the coverage/tile
52
where returns
the constant overhead incurred for accessing a
tile on a host h and tileSize(t) returns the size of
tile t. To obtain the tiles intersecting the
coverage domain to be read, the tiling
information of the whole coverage should be
known. Different tiling schemes and strategies
exist, and they can be broadly classified to
regular and irregular tiling [7]. The data read cost
model handles both type of tiling. Figure 4.4 shows the area and number of tiles read from a
coverage using irregular tiling strategy when coverage with a known domain is required.
4.3.4 CPU Cost
The cost of executing a sub-query on a server is modelled here as the recursive cumulative
cost of executing the “root” operator of the sub-query tree. Similarly, the cumulative cost of
an operator is the cost of executing the operator together with all its descendant operators.
One of the challenges of modelling CPU cost of operators their different functional semantic
(as given by the broad classification of the WCPS operators in Chapter 1). For example,
certain operators require several iterations, over their coverage domain, of their child operator
node as given in the query semantics, while others do not. Similarly, an operator may have
any arbitrary number of children operators which is usually not more than two in WCPS.
Therefore, taking into account the above-mentioned properties of operators, we model the
cost of the sub-query c as
where ‘r’ is the root operator of the sub-query c
of operator v is therefore defined below as
∑
where operatorCPUCost(v) is the actual cost of executing operator v on its execution host,
and it is a function of the output data size, data type, operator type and a server’s processing
speed. The is the number of times the child operator(s) of v is
Figure 4.4: Tile Based Coverage Reading
53
invoked in order to obtain output of v. Typically, number of child operator iterations is 1 for
most coverage processing operations in WCPS except some aggregation (coverage
summarization) and coverage constructor operations. For example, focal6
coverage
operations such as neighbourhood analysis e.g. slope, edge detection, image enhancement etc
would require iterations, over the domain of the resulting coverage, of any operation it
invokes order to generate an output pixel. Figure 4.5 shows the iteration of the max operator
over the domain of the output coverage created with coverage_constructor operator.
Furthermore, the CPU cost of an operator is dependent on the processing speed of the
execution host of the operator. However, it is difficult to obtain the speed from a system’s
quoted speed (in MIPS, FLOPS, QUIPS, clock rate) because of the different computer
architectures and hardware configurations available. This is further complicated by the fact
that systems treat different data types (integers, floating point, double etc) differently. We
therefore choose an application based performance metric to measure the speed of a system
for coverage processing. We denote the CPU speed of a server with respect to coverage
processing as the speed of performing copying a character-typed pixel from one memory
allocation to another. Pixel copy operation is used because it is one of the least costly
coverage operations and can therefore, server as a benchmark on which the costs of other
operators are based. Likewise, from the theory of relative performance [45] which states the
relative performance of two servers is directly proportional to their compute capacity, the
processing capacity of different systems can be compared based on this defined speed metric.
Furthermore, because the cost of executing a node varies with the type of operator, we
associate a weight with every operator in WCPS. The weight is given as the ratio of cost
6 http://webhelp.esri.com/arcgiSDEsktop/9.3/index.cfm?TopicName=Operators_and_functions_of_Spatial_Analyst
Figure 4.5: Iteration in WCPS
54
executing this operator on a unit data to the cost of char-typed pixel-copy operation on a
server. Theoretically, this ratio is constant on every server. Hence, we model the weight of an
operator as a function of the type of the operator, and represent the function returning the
weight as operatorWeight(operatorType(v)). Also, we model the cost of executing an
operator v as being dependent on the size of the output of the operator, given
as , and the number of pixels of the input coverage accessed by the
operator to generate a unit pixel in the output .
Therefore, given the speed of the coverage processing host of operator v given
as , the cost cost(v)of executing operator v on host h
is given as:
=
The well-defined semantics of the query and data ensures the input and output data size and
type, number of input pixels per unit output, and number of iterations per unit output of every
operator node is known. For example, induced expressions such as trigonometry, type-casting
etc operators would generate an output whose domain is the same as the domain of the input
coverage. Geometric expressions e.g. coverage sub-setting will generate coverage whose pix
output size smaller than input size, however, the number input pixels per output is unit
because it doesn’t need to access input pixels not which is not within the subset domain.
Scaling and some aggregation expressions e.g. max operator will yield result whose output
size is different from the input size and all the pixels of the input coverage will have to be
accessed. Therefore, the number of input pixels per output would have to be determined from
the query parameters. The various output sizes and data types are obtained from the input
data’s metadata and the semantics of the operators (which is well-defined). Format encoders
and decoder operations’ cost depend very strongly on the variability in values of the contents
of the coverage (if there is data compression). However, for this cost model; we assume that
the values of the pixels in the coverage to be encoded are not uniform.
55
For example, based on its query tree representation, Figure 4.6 provides a sample CPU cost
computation of a WCPS query shown below.
Node name: v1
operatorType (v1): Binary Add
operatorWeight(“Binary Add”): 2.16
pixelOutputSize(v1): 5000
inputPixelPerUnitOutput(v1): 1
noOfChildIterations (v): 1
coverageProcessingSpeed (host (v1)): 1
cumulativeCost (v1)= (2.16 * 1* 5000)/1 + 5000 +172200
= 188000
Node name: v2
operatorType (v2): Section
operatorWeight(“Section”): 1
pixelOutputSize (v2): 5000
inputPixelPerUnitOutput(v2): 1
noOfChildIterations (v2): 1
coverageProcessingSpeed (host(v2)): 1
cumulativeCost(v2) = (1*1*5000)/1 + 0 = 5000
Node name: v3
operatorType(v3): coverage constructor
operatorWeight(“coverage constructor”): 10.05
pixelOutputSize(v3): 5000
inputPixelPerUnitOutput(v3): 1
noOfChildIterations(v3): 5000
coverageProcessingSpeed(host(v3)): 1
cumulativeCost(v3)=(10.05* 5000*1)/1 + 5000*24.39
=172200
Node name: v4
operatorType(v4): max
operatorWeight(“max”): 1.71
pixelOutputSize (v4): 1
inputPixelPerUnitOutput (v4): 9
noOfChildIterations (v4): 1
coverageProcessingSpeed (host (v4)): 1
cumulativeCost (v4) = (1.71*1*9)/1 + 9 = 24.39
Node name: v5
operatorType(v5): Section
operatorWeight(“Section”) 1
pixelOutputSize (v5): 9
inputPixelPerUnitOutput(v5): 1
noOfChildIterations (v5): 1
coverageProcessingSpeed(host(v5)): 1
cumulativeCost(v5) = (1*1*9)/1 + 0 = 9
Figure 4.6: WCPS Query Tree CPU Cost Computation
56
for c in (coverageC),
d in (coverageD)
return encode(
(c[x(0:50), y(0:100)])
+
(coverage newCoverage
over px x( 0: 50 ),
py y ( 0 : 100 )
values
avg(d[x((px - 1):(px + 1)),
y((py - 1):(py + 1))]) )
, "raw")
The query performs addition of two coverages (binary add). The first coverage is obtained
from the sectioning of a coverage (section). The second coverage is created (coverage
construtor) and its pixels are given the average (avg) pixel value of a 3x3 pixels kernel
window (section) bordering the corresponding cell in another coverage. In each operator node
representation (grey box) are the properties of the operator node and the CPU cost of
processing the operator node on a host based on our model.
4.4 EVALUATION
For the computation of the cost, each server in the D-WCPS framework calibrates itself and
publishes its capacities in the registry (as given in Chapter 3). Server properties which are
used for cost computation include the data write, data read, CPU and network speed and
overhead. Similarly, the coverage’s metadata information such as domain, data type, tiling
strategy etc are also used in the cost determination
We also experimentally determine the weights of different coverage processing operator
types on different computer architectures. Such machines include
- Server V: Ubuntu OS, 32 bit, 2 Gb RAM
- Server W: Suse Linux OS, 64-bit, 16Gb RAM, Dual Core Intel, 3.0 GHz
- Server X: Ubuntu OS, 32-bit, 4Gb RAM, Intel Core Duo, 2.2 GHz
- Server Y :Ubuntu OS, 32-bit, 715Mb RAM, Intel Core 2.5 GHz
- Server Z: Ubuntu OS,32-bit, 3Gb RAM, Intel Core Duo, 2.2 GHz
It is assumed that the variation in the weights of operators on different machines will not
significantly affect the weights’ trend as evidenced in the correlation table (Table 4.1) of the
operator weights obtained from different servers. Appendix 1 displays the values weights of
different coverage processing operators.
57
Table 4.1: Correlation Matrix of Operator Weights on Different Servers.
V W X Y Z
V 1 0.867 0.83 0.915 0.896
W 1 0.755 0.934 0.89
X 1 0.951 0.823
Y 1 0.926
Z 1
An average accuracy of 71% was obtained when the result of the average 100 repeated run on
the above servers of the 5 distributed coverage processing queries was compared to the result
of the cost model. This is probably due to the dynamic nature of the network speed and server
load, and the variability of the coverage operator weights from one server to the other. The
experiment also highlights the following
- Significant cost factors vary with the query. The CPU cost becomes significant for
aggregation operations while in induced operations, the data transfer and intermediate
storage costs become significant
- Data formats encoding affects the total cost significantly. Encoding into jpeg format is
not appropriate for coverages because its compression is lossy; although its CPU cost
and data transfer cost is small. Png encoding is suitable for some coverages data type,
and compared to tiff encoding, its data transfer cost is smaller; however, its encoding
cost is usually greater than tiff’s encoding cost. The preference should be based on
prevailing system and network speed.
4.5 SUMMARY AND OUTLOOK
We presented a distributed coverage processing cost model in this chapter. This consists of
the communication, data read, intermediate data storage, and CPU cost for the distributed
coverage processing. We also presented in this paper a novel method of computing the CPU
cost of processing coverages on different servers based on the semantics of the processing
operation. Experimental evaluation of the cost model shows some deviation of the cost of
execution to that obtained from the cost model. This is due to the dynamic nature of some of
58
the cost factors such as the network speed and server load. Such dynamic behaviour will be
factored into the cost model in the future.
59
Chapter 5
COST BASED QUERY TREE
DECOMPOSITION
In Chapter 3, we present the infrastructure for distributed WCPS and the components of a D-
WCPS server together with their functions. WCPS query tree was also shown to be consisting
of both list processing tree and coverage processing trees and we proposed the use of inter-
tuple parallelism procedure for processing the list processing tree of the WCPS tree.
However, in this chapter, the focus is on the mechanisms, procedure and algorithms for
specifying the scheduling, decomposition and execution of coverage processing component
of the WCPS tree onto a set of heterogeneous computing servers. To this end, we assume that
the cross product table from the WCPS query list processing tree consists of only one tuple.
Hence, any reference to a query tree later in this chapter implies coverage processing query
tree. Furthermore, because a coverage processing query tree is a form of Directed Acyclic
Graph (DAG), the use of DAG scheduling algorithms was adopted for WCPS query
decomposition. Formally, a WCPS query can be modelled as an operator tree QT= (V, E)
where
- V= {v1, v2,…vm|mℕ } is a set of nodes vi which represents coverage processing
operators. Note that nodes and operators are used interchangeably in this chapter when
they both refer to an operator of a coverage processing tree.
- E= {e1, e2, …em| m ℕ, ei =(vi, vj): vI, vjV } is a set of edges such that each edge ei
=(vi, vj) represents the dependence of operator vi V on vjV.
This is shown in Figure 5.1. The parent of an operator
node v, give as , is the operator to which the
operator v ships its execution result. Similarly, the
children of the operator v, given as are the
set of operators which send their execution result to v. Figure 5.1: Sample WCPS Query Tree
60
5.1 RELATED WORKS
The concept of scheduling has been extensively studied since the 1970’s. However,
continuous developments in technologies together with its consequent constraints, and the
ever increasing user and applications requirements with respect to scheduling necessitate
continuous improvements to the existing algorithms. In Chapter 2, the broad classification
schemes of different scheduling strategies and algorithms were presented. These include
static vs. dynamic scheduling, dependent vs. independent tasks scheduling, homogenous
systems vs. heterogeneous systems scheduling, objective function based classification, etc.
However, this thesis focuses on static scheduling of dependent tasks (DAG) onto
heterogeneous set of resources with the aim of minimizing the execution time. Typically,
static scheduling is used when the properties of a workflow (e.g. execution and
communication costs, task dependencies, etc) are known a priori [54]. In DAG scheduling,
tasks are optimally assigned to processors such that tasks dependencies conditions are
satisfied, and an objective function is achieved. This has been shown as an NP-complete
problem [48], hence, heuristic-based algorithms are used rather than exhaustive search of all
possible scheduling scenarios. Several static scheduling heuristics exists, their general
classification include -list scheduling algorithms, clustering algorithms, arbitrary processor
network algorithms, duplication based algorithms, guided random search algorithms[112].
We however note that the above classification is not complete in its entirety, and some
algorithms can fall into more than one category. The general classification of the algorithms
is, thus, presented below.
5.1.1 List Scheduling Algorithms
In list scheduling [48], the nodes of a DAG are placed in a list arranged in the descending
order of a previously computed priority of the nodes. Nodes with a higher priority are
scheduled before nodes with lower priority. Various methods are usually used to compute the
nodes’ priority. These are usually based on the sum of the cost of nodes (CPU processing)
and edges (data transfer) on the path from operator node to either the root (exit) node or the
leaf (entry) node or some combination of the two. In a heterogeneous systems environment,
the CPU speed, and network speed used in computing the priority can be taken as either of
the average, minimum, maximum, or mode of all CPU speed and network speed of all servers
in the network. After prioritization stage, the scheduling is done using different heuristics
such as min-min, min-max random, round robin and min-min algorithms, multiple rounds,
XSuffrage [39][50]. Based on the method used for prioritizing, the heuristic used for
61
scheduling and the method used for choosing the servers that will participate in a scheduling,
several list scheduling algorithms exists. These include HLFET (Highest Level First with
Estimated Times) algorithm [113], ISH (Insertion Scheduling Heuristic) Algorithm [123],
MCP (Modified Critical Path) algorithm [123], ETF (Earliest Time First) algorithm [98],
DLS(Dynamic Level Scheduling) algorithm [123], CLANS [123], FCP (Fast Critical Path)
algorithm [10] , FLB (Fast Load Balancing)[11], CPR (Critical Path Reduction)[9] , LLB
(List-based Load Balancing), CPOP(Critcal Path On a Processor) [48], Dynamic Level
Scheduling (DLS) [113], Levelized Min Time (LMT) [47], Heterogeneous N-predecessor
Decisive Path (HNDP)[101], HEFT[47] etc. However, it has been shown that Heterogeneous
Earliest Finish Time (HEFT) algorithm is one of the best algorithms [39][101] in systems
consisting of several heterogeneous servers.
In HEFT, tasks are prioritized using their ALAP (As-Late-As-Possible time) which is the sum
of the costs of the nodes and edges on the path between an operator node and the root
operator (exit node). These cost computation are based on the average CPU and network
speed of the network. Using this computed priority, the times each node in the priority queue
finishes execution on each server in the network is computed. This is derived from the
following:
- The time the server becomes ready for execution, given the time that the operators
already scheduled on it finishes execution.
- The latest time that the result of the all of children of the operator, arrives at the
operator’s execution host. This is dependent on the time the children finishes
execution and the duration of time it takes the result of the children to be transferred
to the execution host of the operator.
- The duration of time it takes the server to execute the operator
The operator is therefore scheduled on the server on which it finishes execution the earliest.
One of the criticisms of list-based scheduling is that if follows a local and greedy approach.
Hence,[69] proposed a variant of HEFT which looks ahead by one step and predict several
scheduling outcomes before scheduling an operator node. This however comes with an
increased complexity, and it does not always yield performance gain. Similarly, because each
of the server in the distributed systems’ server list are iterated for each node scheduled, other
criticism of list-based algorithms is its efficiency in scheduling DAG with a lot of operator
nodes in a system with a large number of servers. [47] also shows that HEFT does not always
perform well when the cost to communication ratio of the scheduled DAG is low.
62
5.1.2 Clustered Scheduling Algorithms
Another class of scheduling algorithms is the clustering algorithms which typically aim at
minimizing data transfer costs in a DAG by grouping together heavily communicating nodes
into a cluster because data transfer cost between nodes in a cluster is zero. This usually has
done in two stages – clustering stage (where the DAG is segmented into different clusters),
and post-clustering stage (where the clusters are refined and assigned to an execution host).
Initially, each node in the DAG is assumed to be a cluster. This clusters are then, further
refined by iteratively merging them, without backtracking, and with the aim of satisfying an
objective functions. Optimal clustering of DAG is known to be an NP-complete problem
[39], hence, the use of heuristics. Such heuristics include Dominant Sequence Clustering
(DSC)[115] algorithm, CASS-II(Clustering And Scheduling System) algorithm [62], EZ
(Edge-zeroing) algorithm [118], LC (Linear Clustering) algorithm [108], MD (Mobility
Directed) algorithm [56], DCP (Dynamic Critical Path) algorithm [56]. In the post clustering
stage, it is assumed that the unlimited number of servers (processors) is unlimited; therefore,
simple heuristics such that minimizes inter-server communication are used [56] to determine
the server of a cluster.
For instance, in the Dominant Sequence Clustering (DSC) algorithm, the dominant sequence
of ready tasks (clusters) is used to determine which clusters to merge. In DSC, the dominant
sequence is the sum of the cost along the path from a cluster to the exit (root) cluster, and the
cost along the path from the cluster to the entry (leaf) cluster computed dynamically after
every cluster merging process (as merging zeroes the cost of the edges in a cluster). Ready
clusters, which are clusters which have all its predecessor clusters have been scheduled, are
merged with their predecessor clusters if this reduces the parallel execution time.
Some of the criticisms of a lot of clustering algorithms include the fact that the clustering
process does is typically based on minimizing communication cost with little regards for
CPU costs. Furthermore, they are based on the assumption that there is unlimited number of
servers which is not always true, and they are more suited for systems consisting of
homogenous set of servers/processors.
5.1.3 Duplication Based Algorithms
63
In duplication based algorithms, tasks are redundantly duplicated on different resources in
order to reduce communication cost. Some of the algorithms based on this includes PY
algorithm (named after Papadimitriou and Yannakakis), LWB (Lower Bound) algorithm,
DSH (Duplication Scheduling Heuristic) algorithm, BTDH (Bottom-Up Top-Down
Duplication Heuristic) algorithm, LCTD algorithm , CPFD (Critical Path Fast Duplication)
algorithm , TDS (Task Duplication-based Scheduling Algorithm), TANH (Task duplication-
based scheduling Algorithm for Network of Heterogeneous systems) [23][56][39][106].
5.1.4 Arbitrary Processor Network (APN) Algorithms
APN algorithms takes the architecture and topology of the network and its routing strategy
into consideration while scheduling both nodes (tasks) onto processors and edges (data
transfer) onto the network. Some of the algorithms are MH (Mapping Heuristic) algorithm,
DLS (Dynamic Level Scheduling) algorithm, BU (Bottom-Up) algorithm, and BSA (Bubble
Scheduling and Allocation) algorithm [56].
5.1.5 Evolutionary Algorithms
These use principles of evolution and natural genetics such as inheritance, selection,
recombination, mutation, crossover to find an optimal configuration for a system, given some
specific constraints. According to [84][96], “Candidate solutions to the optimization problem
play the role of individuals in a population, and the fitness function determines the
environment within which the solutions "live"”. By using survival of the fittest techniques,
combined with a structured yet randomized information exchange, a GA can mimic some of
the innovative attributes of a human search Some of these algorithms include Genetic
Algorithm, Simulated Annealing, Genetic Simulated Annealing, Tabu, and A*[56].
5.1.6 Scheduling Algorithm For D-WCPS
Generally, some criticisms of many of the above listed algorithms includes the following
- They do not support intra-task parallelism i.e. a node is always executed by just one
processor. However, the node can be very expensive such that splitting the node
execution up by partitioning its input data (when possible) and executing the process
on it on several processors may pay off.
- They do not take into account that some servers may be able to execute several
independent tasks of a DAG in parallel without any significant change to its
processing speed and performance. This usually, depends on the number of either
physical or virtual processors on the system.
64
- The scheduling of each of the node in most of the algorithms is local (they do not
consider the global effect of a node’s scheduling decision). On the other hand, global
scheduling is NP-complete. However, query trees have some properties such as the
convergence of the DAG nodes towards the root node which can be used to improve
scheduling. Not factoring in node convergence in the tree scheduling may imply that
sibling nodes (nodes sharing the same parent) may be scheduled far apart from each
other, thus, making the overall scheduling worse off.
Besides, the D-WCPS execution model introduces some constraints to the scheduling
procedure which includes the following.
- Coverages are typically large-sized and not all servers may be have sufficiently large
memory to process a given sub-query. Therefore, memory available on a server vis-a-
vis estimated memory required by a sub-query is considered in our scheduling
algorithms.
- Most of the algorithms assume that a node can be executed when all its predecessor
nodes have been executed and their results have been received. However, in D-WCPS
processing, a sub-query, which is a cluster of node operators executed as a single unit,
is only executed iff all the children sub-queries have been processed and their results
are received and saved. For instance, in the distributed WCPS query in Figure 5.2,
Cluster_X cannot be executed until Cluster_Y and Cluster_Z have finished execution.
Therefore, the fact that all the predecessor nodes of a node have finish execution does
not mean the node will be immediately executed. For example, the node B, cannot be
executed even if node D has finished execution, but it can only be executed after the
Cluster_X
B
A
C
Cluster_Y
D Cluster_Z E
G F
Figure 5.2: Distributed Query Tree
65
all the predecessor sub-queries of its sub-query (Cluster_Y and Cluster_Z) have been
executed and their results have been received.
Besides, there is an overhead, and possibly, data writing cost (for sub-queries which
have predecessor sub-queries) associated with executing a sub-query. Therefore,
treating each operator as a sub-query unit which can be executed once their
predecessor operators have finished execution (as most scheduling algorithms would
assume) will therefore introduce these other costs to the distributed query execution.
For example, making operator nodes like B (in Figure 5.2) a sub-query in order to it
be able to execute independently of other operators in the Cluster_X introduces
additional costs to the execution of B.
Therefore, we base our WCPS scheduling algorithms on grouping of operators into
clusters which can be executed as a unit.
5.2 DECOMPOSITION ALGORITHMS
In order to model the decomposition algorithm for WCPS query, we represent the distributed
query as a tree (see Figure 5.2) where is a set of sub-query7 (cluster) nodes
and is composed of edges which indicates the order of execution of the
clusters. In turn, we model the cluster c as a group of connected or adjacent coverage
processing operators, v, executed as a single unit on a server , where is
a set of computing servers. In other words, is a (sub-)query tree (Vc, Ec) where
2 is a set of nodes which represents coverage processing operators in cluster
, and 2 ( ) is a set of edges in cluster . A cluster,
, can have a set of child(ren) clusters, given as , which represents the set of
clusters whose execution result serves as input for c. Conversely, a cluster c can also have
a parent cluster, parent(c), which represents the cluster to which a cluster c sends its
execution output. Sibling clusters are clusters which share the same parent cluster. A
scheduled cluster is a cluster which have an execution host, however, it is unscheduled if it
has no execution host. A DQT is ready for execution if all clusters have been
scheduled to an execution host.
Furthermore, the aim of this thesis is the transformation of a coverage processing tree
to a distributed query tree . This involves grouping of operator nodes into
7 Cluster and sub-query are used inter-changeably in this chapter
66
sub-query clusters and scheduling them on a processing server – a procedure naturally
suitable for the clustering algorithms. Clustering algorithms, however, typically aim at
minimizing data transfer costs in a DAG by grouping together heavily communicating nodes
with little or no regards for the CPU cost. Besides, clustering algorithms are usually more
suited for homogenous system environment with an unlimited number of processing servers.
In contrast, it has been shown that HEFT (a list scheduling algorithm) outperforms other
algorithms with respect to quality of the schedule and time of scheduling of the operator
nodes in a heterogeneous system [112]. However, this is not suitable for distributed WCPS
based on the reasons highlighted in Section 5.1.6 of this Chapter. Therefore, we cluster
coverage processing operations using the principles of a modified HEFT.
Similar to clustering algorithms, the clustering procedure is started by initially assuming that
each operator node in the query tree is a sub-query cluster i.e. it is assumed that only one
operator is present in each sub-query in the initial DQT (distributed query). In other
words, . For example, given a WCPS query (shown below) which comprises
of three operators (c, sin, *)
“for c in (sample_image), return encode(sin(c)*20,
“raw”)”
Based on the assumption that each operator in the query above is a sub-query cluster and
using the D-WCPS syntax as given in Chapter 3, the sample new initial distributed query
generated is given below as
for c in ( for d in ( for e in (sample_image)
return encode(e, “raw” ) )
return encode (sin(d), “raw”) )
return encode(c*20, “raw”)
Similar to clustering algorithms, the initial clusters generated are then recursively merged and
scheduled without backtracking, however, we base this merging and scheduling on the
principles of HEFT. Because our scheduling algorithm is based on clustering using HEFT
and we initially assume that each operator node is a cluster in WCPS, any reference to
operator nodes with respect to HEFT implies cluster in our WCPS query scheduling.
Furthermore, we define a cluster merging function which returns a new
cluster by merging a cluster into its parent cluster . We also present some properties of
the newly cluster generated by merging other clusters below.
67
Similar to HEFT algorithm, we also estimates the cost of the initial DQT, using the cost
model in Chapter 4, based on the assumption that
- The query is executed on a set of homogenous system whose CPU, network, data I/O
speed and overhead is taken to be the mean of their corresponding values over all the
servers participating in the distributed query processing.
- Each of the clusters in the initial DQT is executed on different servers.
The ALAP (As-Late-As-Possible time) of each cluster, defined as the latest possible time a
cluster will finish execution, is computed from the initial DQT. This is given as the costliest
path from a cluster to any of its leaf clusters and is used to determine the order of scheduling
of the operators. Operators with higher ALAP are scheduled first.
Several criticisms of HEFT were presented in the literature review of this chapter. Therefore,
the modifications to HEFT with respect to the identified limitations, and the use of the
modified HEFT for clustering and scheduling of coverages processing operations are
presented below.
5.2.1 Reducing the Number of Servers Used in Scheduling
Giving the fact that HEFT iterates over all servers available in a network for each cluster
scheduled, HEFT is not efficient in systems with have a large number of processing servers.
Some systems address this issue by restricting the servers in scheduling to only the servers
hosting the data to be processed [78]. This however does not scale up with computation cost
per server. Hence, we propose a system whereby the number of servers used changes
dynamically during scheduling based on the computation cost of the query. The initial set of
servers consists of servers used in the query consists of servers which hosts the data to be
used in the query. However, at any point during the scheduling when the time all the servers
will finish executing the tasks scheduled on them is more than the time a cluster is ready to be
executed, then new servers are added to the list of server. In other words, new servers are
68
added to server list when all servers are busy and all the children of any unscheduled cluster
have finished execution. The servers added to the server list are the set of ‘preferred servers’
(Chapter 3) of each of the servers in the current server list.
So assuming the time a host becomes available for execution is given by the
function where is the current set of servers, the time a cluster finishes execution
is given by , and the list of preferred server of a server is given
by . We therefore define the function
which returns a set of servers to use in the scheduling given a current set of servers , and is
given as
{ |
( )
, otherwise
5.2.2 Pre-Scheduling Unary Node Clustering
Similarly, in order to reduce the time spent on scheduling the clusters and to improve the
quality of the schedule generated, we reduce the number of clusters in the initial DQT before
scheduling by merging the clusters. Because merging n-ary8 clusters with any of their child
clusters will reduce the benefits of parallelism, we merge only unary clusters9. Besides, we
base our merging decision on the Computation-to-Communication Ratio (CCR) properties of
the clusters which is given by the function . is defined as the ratio of the cost
of the processing the cluster to the cost of transferring the data needed by the cluster to the
cluster’s processing host i.e.
∑
8 Clusters with any number of child clusters 9 Clusters with only one of child cluster
69
Because we are merging unary clusters in the initial DQT prior to their schedule i.e. pre-
schedule clustering, we use the average CPU and network speed of the D-WCPS system to
compute the CCR. Our criterion for merging clusters is based on these two axioms:
- A unary cluster will usually be scheduled on the server of its child cluster in a
homogenous server system. This is due to fact that extra communication cost and
overhead is incurred if the unary cluster is scheduled on a server different from the
server of its child cluster.
- In a heterogeneous system environment, a unary cluster can only be scheduled on a
server different from the server of its child cluster on these conditions
o There is a server with a processing speed much higher than the processing
speed of the child cluster.
o The cost to communication ratio of the cluster is high enough to offset the
performance loss incurred in data transfer. Generally, the higher the CCR of a
cluster, the better the scheduling decision made on it.
However, clusters are being merged before scheduling, the processing server for
either the child or parent cluster cannot be determined, therefore, we base our
estimations on the CCR of the clusters.
Hence, starting from the root cluster in the query tree, we recursively merge unary cluster
with its child cluster when the CCR of the new cluster is more than the individual CCR’s of
the component clusters. The rule based merging criteria is therefore given below
( ) ( )
So, not only will merging unary clusters reduce scheduling time, it will also help making
better scheduling decisions. This is because an increased CCR ensures that a server with
processing capacity much higher than the one that would have been used if we had not
increased the CCR, will be selected and used for processing the cluster. The pseudo-code for
the clustering procedure is therefore given below
Input: cluster c
Output: cluster c
Variables: child cluster c’
Cluster(c)
IF |children(c)| > 1
FOR EACH
70
END FOR
ELSE IF | | == 1
FOR EACH
IF ( ) ( )
Return ’
ELSE
Cluster (c’)
Return c
END IF
END FOR
END IF
END
5.2.3 Multi-Processing Capabilities of Servers
In HEFT, it is assumed that a server can only execute one process (sub-query cluster) at a
time. However, depending on the number of physical or virtual processors a server has, a
server usually, can execute several processes (sub-queries) concurrently without significant
loss of performance. The data I/O on the server for different processes, however, cannot be
parallelized. Hence, to remodel HEFT such that it considers the multi-processing capabilities
of systems and for subsequent scheduling algorithms, we define and explain the following
terms and functions.
returns the set of processors on a server, while
returns the server to which a processor belongs.
The finish time of a cluster which is the time the scheduled cluster finishes
execution and it is given as
where is the time the cluster starts executing, is duration of
execution of and is the duration of reading of all data
necessary for executing
Processor which executes a cluster is given as the
Processor Ready Time is the time the last cluster scheduled on a
processor finishes execution.
71
Host Ready Time returns the earliest time a server becomes available for
processing. This is given as the earliest time one of the processors on a server
becomes available for processing i.e.
This is the earliest time any cluster newly scheduled to the server can start
executing.
is a function which returns the scheduled cluster, say , given an
unscheduled cluster and a set of computing servers . The processor with the
earliest is assigned any newly scheduled cluster on a host.
’
( )
Data Arrival Time gives the time the result of a cluster arrives at the
calling host, where a calling host of a cluster is the server which invokes sub-
query on its processing server,
As stated earlier, we cannot parallelize data I/O on each server, we, therefore, model time the
results of execution of cluster c is received and temporarily written ’s calling host (host
from which the cluster sub-query c was invoked) as Write Finish Time and this is given by
the function . We define as the addition of the earliest time the result of
cluster c can start writing, and the writing cost of the result of c. In estimating the time the
result of c can start writing, we compare and select the highest of either of
a. the time the calling host of (server which invokes the query) becomes
available i.e. ;
b. the time any other data, which arrives earlier than response of , finishes
writing on the server i.e.
( ) ( ) ( ) ; and
c. the time the response of the cluster arrives i.e. .
Hence, is given as
72
( ( ) ( ) ( )
)
Where is the time taken to temporarily write the output of the
cluster .
Finally, we define start time of a cluster as the time the execution of can start. This
is given as the time when the host of is available, and all the result of all the children of
cluster have been temporarily written. i.e.
5.2.4 HEFT Scheduling Criteria Modification
In HEFT, scheduling is based on the criteria that operators are allocated to the server on
which they finish executing the earliest. Hence, it, like many other scheduling algorithms, is
often criticized for being local i.e. it does not consider the global effect of its decision in the
scheduling a cluster. On the other hand, globally scheduling a query tree, as well as globally
re-adjusting the schedule of a query tree, given a local decision is NP-complete [55]. A tree,
however, has a property which is not considered in HEFT scheduling - clusters in a tree
converges from leafs to the root. Using the DQT in Figure 5.3 as an example, HEFT would
schedule clusters B and C (and its descendants e.g. cluster D) independently without
considering the fact that the results of B and C are immediately integrated by cluster A.
Therefore, execution servers of B and C can be far apart such that data transfer costs makes
the integration more expensive than what it would have been if convergence of clusters B and
C have been considered in their scheduling. In addition, the network distance to the server
which receives the initial query from the client is not considered while scheduling a cluster
D
B C
A
Figure 5.3: Sample WCPS Query Tree – Motivation for Criteria Change
73
and this has the implication that clusters can be scheduled further away from server which
receives the query. However, by taking the convergence of the tree’s clusters into account
during scheduling, we would ensure that, with respect to network proximity, the execution
servers of sibling clusters10
converge to a server. Likewise, the processing servers of clusters
in the query tree would converge to the server which initially receives the query.
Therefore, we modify HEFT such that servers of sibling clusters as well as servers of
descendant clusters of an ancestor cluster converge in terms of proximity (closeness) in the
network. However, the challenges with respect to converging of the clusters’ servers are
a. Query tree is scheduled bottom-up, i.e. a cluster is always scheduled before its
parent cluster, and hence, it is difficult to know which server the cluster’s
server has to converge towards.
b. Back-tracking previous scheduling decisions because of a new one lead to NP-
completeness. Hence, we cannot reschedule a previously scheduled cluster,
based on a later decision on the scheduling of its parent or sibling.
In our approach to address the problem identified above we assign to every cluster a server to
which they will most likely ship their execution results (i.e. most likely execution server of
the parent of the cluster) during the scheduling. This server, called the ‘Cluster Convergence
Server’, is represented as , and is returned by the function given a cluster . We
then schedule a cluster on the server on which it executes and finishes transferring its
execution result to its the earliest. The of a cluster is selected at scheduling time
based on the following heuristics
a. The of the root cluster is the server which originally receives the query
(called master server) because it is the server which is going to return the
execution result to the original client
b. Because the sibling cluster with the highest will still be executing by
the time all its other siblings finishes execution in an homogenous computing
system, the other sibling clusters usually transfers their data output to the
server of the cluster with the highest for these reasons
i. Parallel time can be saved because output data from sibling clusters
with lower (which should normally finish earlier than sibling
10 Clusters which share the same parent cluster
74
with highest ALAP) can be transferred while the cluster with highest
is being executed.
ii. The output of sibling cluster with highest (which is the cluster to
finish execution the latest) will not be transferred. Thereby, not
increasing the parallel time.
Moreover, sibling cluster with highest is scheduled first in HEFT.
Therefore, the of a cluster is the execution server of its sibling cluster
with the highest
c. In case none of the sibling clusters has been scheduled, the of the cluster
with the highest is taken to be the of its parent cluster. This is in
order to make the servers of descendants of a cluster to converge to its likely
execution server.
Therefore, we define the function which returns the server to which a cluster converges
( ) as
( )
( )
Furthermore, we therefore introduce the term “Cluster Convergence Server Data Arrival
Time” which is the time a node finishes its execution and transfer of its result to its “Cluster
Convergence Server”. The function which returns the “Cluster Convergence Server Data
Arrival Time” represented by is given by
( )
where ( ) represent the duration of time of data transfer of the output of
cluster from its execution host to its
Therefore, we schedule a cluster on the server on which “Cluster Convergence Server Data
Arrival Time” is the minimum. Compared with HEFT which considers only the execution
finish time of operator c, ( ), we schedule the cluster while taken into consideration
their finish time as well as convergence of query tree clusters.
75
5.2.5 Memory Constraints Consideration
Due to the typically large sizes of coverages and the limitation of available memory on the
processing server, there is a restriction on the total sizes of the data that can be processed in a
query, beyond which the query processing will fail on the server. Hence, in attempt to factor
in this constraint into the scheduling, we model the memory requirements for executing a
coverage processing (sub-)query.
Usually, a lot of systems would read all the data to be processed into the memory at once
before processing, however, this does not scale with respect to data sizes. Therefore, systems
like rasdaman address the issue of data sizes scalability using tile streaming techniques [56].
In this technique, data which are partitioned into arbitrary tiles are processed by pipelining
tiles from one operator to the next. In other words, each query operator output one processed
tile which it forwards to its parent operator while receiving the output tile of its child(ren)
operators for processing. Hence, a server with limited memory can conveniently scale with
respect to the sizes of data it processes. We also note that systems not using tile streaming
techniques are comparable to systems using tile streaming procedures when there is a single
tile for each of the data processed. In the tile streaming model, however, memory
requirements of a query depends on several factors such as
a. Type of operators, for instance, whether an operator is blocking or non-
blocking operator, unary or binary, and whether an operator re-tiles the data or
not determines the size of memory that is used in query processing.
b. Type of data output by each operator with respect to the number of bands, data
type, and dimensionality.
c. Differences in cost of executing different operations, which may lead to tile
accumulation at operator with a much higher cost than its preceding operators
d. The order or sequence of operators in the query tree.
e. The depth of the query tree (with respect to the number of operators from root
to the each of the leafs of the tree).
f. The initial tiling structure of each of the data to be processed and how these
differ from one data in the query to the other.
g. The order in which the tiles was read e.g. memory requirement would be
different if very large tiles were read consecutively from when the reading of
large tiles is interspersed with the reading of small sized tiles.
h. The number of tiles to be processed in the query.
76
Because of the complex interactions between the factors listed above, it is difficult to pre-
determine memory requirements of a query. Therefore, we focus on roughly estimating the
memory requirements by using a simplified approach whereby we assume the following:
a. The operators in a query are executed one at a time on a server, and an
operator finishes processing all its input data before any other operator in
query tree can be executed.
b. An operator allocates memory for its output data while processing, and the
memory allocated for the output of an operator’s child operator are only de-
allocated (released) when the operator finishes processing. Hence, the memory
used by an operator while it is running represented by
is given by
∑
Where the returns the size of data output from an operator
c. Executing an operator implies recursively executing all its children operators
before the operator, which consequently indicates that memory will be
required for the execution of its descendant operators. So assuming
returns the memory required during the operator ’s
descendants’ execution, the memory required by an operator , represented
by is therefore given as
d. Each child of an n-ary operator is executed and its result is held in the memory
before another child of the n-ary operator is executed. Based on the
processing procedure described, the memory requirements of an operator
during the execution of its descendants operators,
called , can be computed. This is given as the addition of
the size of the output of its child(ren) operator already in the memory, and the
memory requirement of the child operator currently being executed. Since this
depends on the order of execution of the child operators which we cannot
determine before execution, we base our prediction of
before execution on the worst case scenario, and this is given as
77
( ∑
)
This is the general expression for n-ary operators. In case of binary
expressions which is typically common in WCPS, the above expression
reduces to
( ) ( )
Where and are each of the two operands of the binary operations.
Overall, the memory required by a (sub-)query c, given as is the memory
required by the root operator of the query.
( )
If this is more than the memory available on a server h, which is by the
function , the cluster will not be scheduled on the server
5.2.6 Intra-Operator Parallelism
A task (a sub-query cluster in this case) is always scheduled for execution on only one server
in HEFT. However, a task can be so expensive such that running the task in parallel on
partitions of its input data on several servers pays off. Hence, we modify HEFT such that it
supports intra-task parallelism. This involves the splitting of the data to be processed, the
transferring each partition to its processing host, the execution of the task in parallel on the
data partitions and the transfer of the data back to the host where they are finally merged.
Although, intra-task parallelism reduces parallel CPU time for executing the original task,
however, the costs of other tasks involved, which includes data splitting, data transfer,
temporary data writing, and data merging costs and overheads can offset the gains of intra-
task parallelism. In a homogenous system, the cost of an intra-operator parallelized task can
be over-simplified as
Where is the number of servers, represents the intra-operator parallelism tasks whose
costs decreases with addition of new servers e.g. the CPU and data transfer (to and from each
server) costs, while signifies the costs that increases with the addition of new server for
processing the task e.g. all the overheads in the query processing, and data splitting and
merging costs.
78
The situation is much more complicated in heterogeneous system because we cannot
predetermine the number of servers, exactly which servers to use, and data partitioning
strategy (sizes of data to be allocated to each server) such that will minimize the task
execution time.
So, assuming we intra-task parallelize the execution of a cluster , we represent the set of
clusters processing partitions of the input data for on the set of computing hosts as
. The results of each of are transferred to the server where they are
merged together by the cluster denoted as , therefore, we note that
Furthermore, the partitioned results each of the children ( ) of the un-partitioned
cluster serve as input for , hence,
( ) { ( )|
where the ( ) function returns the same cluster as but whose output is
partitioned given the size of data allocated for processing by the cluster .
In addition, due to the heterogeneity of the server capacity, together with the fact that they
have different Ready Times (time they finish the tasks already scheduled on them), data
partitions of different sizes are sent to different servers for processing. The sizes of data
allocated to a server are based on the ratio of the time duration between:
- The earliest time the cluster can begin executing on any server which is given by the
time the last child of cluster c finishes execution i.e. , and
- The time the server , if un-partitioned cluster is scheduled on it, will finish the
execution and transfer of its result to the server which executes the results merging
cluster i.e.
The server which execute the results merging cluster i.e. is taken to be
the server on which the un-partitioned cluster finishes execution the earliest.
The time duration which determines the data allocated to each is therefore
given by the function which is therefore defined as
79
( )
The larger this time duration gets, the less suitable the server will be for use in intra-operator
parallelism, and the less the data that is allocated to it for. So, for each the
partition of the output of allocated to for processing is returned by
which is given as
( (
∑ ⁄ ))
We, therefore model the duration of execution of an intra-task parallelized cluster as the sum
of the following
- The cost of reading partitioned data in parallel from their data hosts. Each data read
cost ( ) for a partition is associated with overhead of
the host from which the data is read, given as . Data on
the same hosts are read sequentially; however, if they are on different hosts, they are
read in parallel. Overall, the partitioned data read cost is given as
( ∑
{ ( )|
)
This cost increases with every additional cluster added to because of the
overhead involved.
- The cost of executing the sequence of operations listed below for each
executed in parallel on their individual servers
o Transferring partitions of data from the various data hosts to where they are
executed, and
o Execution of cluster on their allocated server
80
o Transferring the result of to the server which merges the result
This is therefore given as
( ( ) ( )
( ))
Because less data will be transferred and processed with the addition of clusters
to , this cost reduces with increase in number of clusters
- The cost of merging each of the output of each which is given as
( ) . This cost also increases with addition of new cluster to the
cluster set
Overall the cost of intra task execution of a cluster c, segmented into cluster set is
given as
( ∑
{ ( )|
)
( ( )
( ) ( )) ( )
Therefore, we recursively add one cluster at a time to the set of clusters executing
the cluster and re-distribute the input data of cluster c on the new set of cluster iff
the intra-task parallel cost is reduced with the new cluster addition
i.e.
where ( ( ) ) is the cluster added to the
set of clusters for intra-task parallelism. The processing server of the new cluster is selected
81
from the lists of servers used in the distributed processing sorted according to their time
duration for executing the un-partitioned cluster .
Hence, we present the pseudo-code for the function which returns the
cluster which merges the output of set of clusters if intra-task parallelism pays
and is used for the task execution, otherwise, it returns the original cluster. , on the
other hand, consists of the set of clusters used for intra-task parallelism which minimizes the
response time for execution a cluster .
Input: cluster c
Output: (either the clusters output merging cluster or the original cluster)
Parallelize-Task(c, H)
// initialize the output to only one cluster
//initialize the cluster to return to input cluster
FOR EACH SORTED BY
IF
//adds more servers if all servers are busy
//create an operator
which merges the output of all
END IF
END FOR
Return
END
5.2.7 Clustering and Cluster Scheduling
In our algorithm, we cluster and schedule coverage processing operators based on the
principles of HEFT. For each unscheduled cluster whose children clusters have all been
scheduled, and given a set of hosts H, we generate a set of provisionally scheduled cluster
from which the final scheduled cluster is selected. The provisionally scheduled
cluster comprises of
- The set of clusters which comprises of cluster scheduled on each of the processing
servers. i.e.
82
- Cluster of the cluster c with any of its scheduled child clusters c’ i.e.
|
- Clusters created by merging c with all its children clusters which have the same
processing servers. i.e.
|
Hence
|
|
The of each cluster computed, and the cluster with the minimum
whose execution server has enough memory to process it is selected and added
to the schedule i.e. the cluster c is added to the schedule if
( ) ( )( )
Before a cluster is scheduled, its entire children cluster must have been scheduled, the
scheduler must add more servers to the server list if all the servers are busy, and the Cluster
Convergence Server for the cluster must be determined. After a cluster is scheduled, the intra-
task parallelism algorithm is applied in order to parallelize the execution of the cluster if it
yields performance gain. Therefore, the pseudo-code for the function which
merges and schedules a cluster c given a set of computing servers H is therefore given below
Input : Unscheduled cluster , Set of Servers H
Output : Scheduled Clusters c
Schedule( H)
For in children( ) ordered by
END FOR
// tests if all servers are busy and add more servers
// determine the server the cluster should converge towards
( )
( )
83
//return a scheduled cluster by selecting a cluster c with minimum
CCSDAT(c ) from a list of PSC(c’,H) and whose execution server
has enough memory to process
// returns intra-task parallelized cluster if it yields performance gain
return
END
5.3 TAKING A STEP BACK – THE BIG PICTURE
We therefore present summary of the procedure followed for the cost-based decomposition of
the WCPS query tree.
- Given a query tree where is the set of operators and is the set of
edges
- Transform to an initial distributed query tree where the set of
clusters (sub-query) is and is the set of edges joining the clusters. In this initial
transformation, each operator is mapped to a cluster and their edges
are correspondingly mapped
- Initialize the number of hosts to host having the data i.e.
- Compute the of each cluster
- From the root cluster, recursively merge unary clusters based on their CCR properties
i.e. using the pseudo-code as given in 5.2.2
- Compute of each cluster c given by
- Beginning from the root, recursively schedule clusters using the function
where r is the root cluster and H is the initial set of processing
servers.
84
Chapter 6
PERFORMANCE EVALUATION
In this chapter, we demonstrate the practical significance of the query optimizations,
decomposition and orchestration as detailed in the previous chapters. Because the objective
of this study is the minimization of the query execution time, the performance evaluation is
based on the gains of the query optimizations, and the duration of, together with the speedup
achieved by the scheduling algorithms. To this end, we start this chapter by describing our
prototype evaluation framework, after which we present the performance evaluation our
scheduling algorithms, before finally demonstrating the gains of our query optimizations and
execution model.
6.1 IMPLEMENTATION AND TEST FRAMEWORK
Our implementation and test framework consists of 23 servers whose distributed coverage
processing properties, as given by their calibrators are given in the Table 6.1 below
Table 6.1: Test Framework D-WCPS Coverage Processing Servers Properties
No. of
Server
Type of
Processor
No. of
Proces
sors
per
server
Availa
ble
RAM
(GByte
s)
Avg
Network
Speed(MB
ytes per
second)
Avg Coverage
Processing
Speed
( Giga-cells per
second)
Avg
Coverage
Read Speed
( MBytes
per second)
Avg Write
Speed (
MBytes
per
second)
1 Intel(R)
Xeon(R) CPU
3.00GHz
4 16 9 30 90 40
4 Intel(R)
XEON(TM)
CPU 2.20GHz
2 4 10 12 60 30
13 Intel(R)
XEON(TM)
CPU 2.20GHz
2 1 10 12 60 30
85
1 Intel(R)
Pentium(R)
CPU 3.00GHz
1 0.77 6 8 60 30
4 Intel(R)
Xeon(R) CPU
2.20GHz
2 4 6 21 78 34
One of the servers was initially chosen as the D-WCPS server with which other servers have
to register before they can recursively start registering other servers. Each server in the
framework can register a maximum of five servers; therefore, we have a two-level hierarchy
of servers with respect to our overlay network configuration. This is as shown in the Figure
6.1.
In case any gateway11
server fails, the time period before the servers can re-configure
depends on the time gateway server is discovered to have failed by its children12
and/or
gateway servers. This, in turn, depends on
- The frequency of synchronization of updates, insert and delete on the servers
- The frequency at which a child server sends keep alive message to its gateway server.
Theoretically, an increase in the frequency at which keep-alive messages are sent
implies that unavailable servers will be more promptly discovered. However, this
leads a higher network bandwidth usage, and increase in the tasks done by each server
(sending keep-alive message requests and response).
A server can re-register with any other server in the network, but preferably, its backup
gateway server in case its gateway server fails. Similarly, synchronization of all servers with
update, delete and insert messages is dependent on the size of the message, the network speed
between the servers and the longest depth of the hierarchical network configuration. Due to
fluctuation in the network, it is difficult to accurately assess the efficiency of the
11 A gateway server is the server with which some other servers register 12 Children server of a server, say X, are the servers which has server X as their gateway server
Figure 6.1: Overlay Network Configuration of the D-WCPS server
86
synchronization. However, in our framework, synchronization duration typically varies
between fractions of seconds to 2 seconds for messages whose sizes ranges between 0 to 5
Mbytes (approximately 10,000 records).
6.2 EVALUATION OF SCHEDULING ALGORITHMS
To model the performance of our decomposition algorithms
for distributed WCPS query, we assume that a scheduling
function , transforms a query tree QT to a
distributed query tree DQT by mapping QT’s operators into
sub-query clusters i.e. .
The sequential execution duration of QT which is the cost
of execution QT on a server, is given as the sum of the cost of the components operators of
QT. We represent this as Finish Time of QT. For example, assuming the query tree
in Figure 6.2 with operator nodes A, B, C is executed on a server, then
Similarly, the parallel execution duration of DQT given as execution Finish Time of a
distributed query, , is the sum of the costs along the critical path of the distributed
query. Since the ALAP (See Section [5.2]) of a node is the cost of all nodes and edges on the
critical path from a node to any of its leaf nodes, is therefore given as the of
the root operator of the DQT. To recap, the of an operator is given as
where is the
cost of data transfer from to .
Hence, assuming operators B and C are executed in parallel on different servers in the sample
query tree in Figure 6.2, the Finish Time of the query tree is given as
where the costs returns the cost of data transfer from node B to A, and
C to A respectively.
In distributed systems, the main performance evaluation criterion used when the objective of
the system is the minimization of the execution duration of a task is the speedup [29][94]. In
D-WCPS, the speedup attained by transforming query tree QT to distributed query tree DQT,
represented by , is given as
A
C B
Figure 6.2: Simple WCPS
Query Tree
87
By having the goal of minimizing the query execution duration, we aim at maximizing the
speedup of the distributed query. Using the query tree in Figure 6.2 as an example, we note
the following
- Because the execution of A is not parallelized, increase in cost of A will reduce the
speedup, while increase in the costs of B and C will increase the speedup. This
conforms to Amadhl’s law [41][122] which models the relationship between the
expected speedup of parallelized implementations of an algorithm relative to the serial
algorithm, under the assumption that the problem size remains the same when
parallelized. However, the law models only simple trees executed in a homogenous
servers framework. Similarly it does not factor costs of data transfer in its
computation of the expected speedup of a distributed workflow.
- Similarly, increase in the cost of data transfer i.e. cost(B,A) and/or cost(C,A), relative
to the cost of B and C respectively, can decrease the speed up because of the
additional costs this introduces to the distributed query execution. For instance, if the
cost of the operators B and C is significantly large compared to the cost of transfer of
the data from B and C to A, then, we have a situation where parallelizing pays off,
otherwise, the gains of parallelization are offset by the data transfer costs.
These rules of the thumb, with some modification, apply to query trees whose structures are
more complex than Figure 6.2.
Likewise, assuming n servers are used in executing the distributed query DQT; the efficiency
of the distribution is therefore given as . It is a value, typically between 0 and 1,
which indicates how well-utilized the servers are in solving the problem, as compared to the
cost of communication and synchronization between the systems.
Several factors affect the execution Finish Time, and consequently, the speedup of a
distributed query DQT. Some of these are based on the original query tree structure and
properties, while several others depend on the D-WCPS framework. These factors are
discussed below
6.2.1 Tree Properties
These properties are based on the structure and characteristics of the query tree. In computing
the values of these properties for a given D-WCPS infrastructure, we assume that the
processing and network speed of each server in the network are respectively the average
processing speed of all the servers and the average network speeds between all servers in the
network. Using a set of 2-leaf, 4-leaf and 8-leaf query trees, which are query trees with 2, 4, 8
88
leaf operators respectively, we therefore, evaluate the relationship between the query tree
properties and our query decomposition algorithm. Unless otherwise stated, the sizes of the
initial data read from each sever (i.e. the size of data processed at each leaf operator) for
performance evaluation is 200 gigabytes.
6.2.1.1 Tree Width
Assuming there is no intra-operator parallelism, the width of the tree is the maximum number
of operators whose execution can be parallelized. This is given by the number of the leaf
operators in a tree. Hence, the maximum speedup achievable is limited to this value.
6.2.1.2 Sequential Factor
In simple terms, the sequential factor [122] of a binary
operator can be viewed as the ratio of the cost of least cost
child operator relative to the cost of the operator. The least cost
child operator is used because it determines the maximum
performance gain obtained by parallelizing the execution of
the two children operators. For example, in Figure 6.3, the
sequential factor of node A is the least cost of either operators B and C relative to the cost of
node A. If the cost of A is very high compared to least cost of either B or C, parallelizing the
execution of B and C will yield very little performance gain, and vice versa.
The definitions of sequential factor in literature typically apply only to a simple “three-node
graph with one binary operator” [3][4]. Therefore, for our performance evaluation purpose,
we extend the definition to cover the more complex query tree. For this extension, we define
the sequential factor of a node as the ratio of the minimum ALAP of child operators to the
recursive sum of cost its parent operator. This recursive sum of all the parent operators to the
root operator referred to as the As-Soon-As-Possible time of the operator i.e. is
given as
( )
Likewise, in order to make this node’s sequential factor value comparable with the sequential
factor of other nodes in the tree or other trees, we scale the value to between 0 and 1.
Therefore, the sequential factor of a binary operator node v, given as is defined as
A sequential factor value of 1 implies that is infinitely
smaller than , while a value of 0 indicate the opposite.
A
C B
Figure 6.3: Simple WCPS
Query Tree
89
Furthermore, since
- Only the critical path of a query tree contributes to the execution duration of the query
tree, and
- The contribution of the sequential factor of each of the operators in the query to the
overall sequential factor of the tree varies with the cumulative cost 13
of the operator.
This contribution of a node is given as the weight of the sequential factor of
the node.
We, therefore define the sequential factor of a tree as the weighted sum of the sequential
factors of the entire node on the critical path. So, assuming CP represents the sets of nodes on
a critical path, then, the sequential factor of a query tree QT is
( ∑
) ∑
For example, assuming the critical path in the query tree Qt in Figure 6.4 below is A-C-F, the
sequential factor of Qt is given as
(
) ∑
The sequential factor of a node C is given as
where
,
, and the
A sequential value of 0 implies the cost is concentrated in the leafs of the query tree; while a
sequential value of 1 indicates that the cost is concentrated in the root.
13 Given in the cost model chapter, and it is the sum of an operator together cost of all its descendant operators
A
C B
E D F
Figure 6.4: Sample Query Tree for Sequential Factor Computation
90
We therefore experimentally determine the performance of our decomposition algorithm
relative to sequential factor. The queries are used in this experimental evaluation is such that
minimizes the effects of the other execution duration determination factors (which are further
listed in this section). To minimize the data transfer costs, we typically use aggregation
operations of WCPS in a lot of the test queries. The graph in Figure 6.5 below therefore
shows how the speed up of our query scheduling and decomposition algorithm varies with
each of the sequential factor of a 2-leaf, 4-leaf and 8-leaf tree. Furthermore, the speedup
achieved with respect to the sequential factor of each of test queries does not vary with
different initial data sizes. This is due to the homogeneity of the servers, and the fact that
sequential factor is agnostic to data sizes as it is a ratio of computing costs.
Figure 6.5: Performance Evaluation Based on Sequential Factor
An increase in the value of the sequential factor indicates that the cost is concentrated in the
more on the parent operators than the children. An implication of this is that the chance of
parallelizing, and, consequently, the speed up for the query is reduced (as shown in our
graph), and vice versa. Speedup of our decomposition however approaches the maximum
achievable with a decrease in the sequential factor. Therefore, an optimum tree structure, with
reference to sequential factor, is obtained if all the costs are concentrated in the leaf
operators. Such trees have the potential of speedup which equals to the number of leaf
operators.
6.2.1.3 Cost Skew
91
In a oversimplified terms and using the Figure 6.6 as an
example, the cost skew [122] of a binary operator A, can be
viewed as the ratio of the difference between the execution
costs of its highest cost child operator, say B and lowest
cost child operator, say C, to the cost of the highest cost
child operator, say C. A ratio of 1 implies a very high skew,
while a ratio of 0 implies that they are balanced. However, in a more general term, we define
the skew of an operator v as the
( ( ) ( ) )
Decision to use ALAP of operators instead of the cost of the operator is due to the fact that
the skew is an indicator of the imbalance in the relative execution finish times of the child
operators of an operator, and these finish times are determined by their ALAP. A skew value
of zero indicates there is no skew while a value of 1 indicates that the execution finish time of
one operator is infinitely larger than the execution finish time of the other operator.
Similar to the sequential factor computation, we define the skew of a tree as the weighted
skew of the operators along the critical path, where the weight is given as the ratio of the
cumulative cost of an operator to the sum of all cumulative cost of all operators on the critical
path. So, assuming CP represents the sets of nodes on a critical path, then, the cost skew of a
query tree QT is
( ∑
) ∑
Likewise, we determine the effects of skew factor on our query decomposition algorithms
based on the assumption that the other distributed query execution duration factors have their
optimum values, and thereby, minimizing their effects. The result is presented in Figure 6.7.
A
C B
Figure 6.6: Simple WCPS
Query Tree
92
Figure 6.7: Performance Evaluation Based on Cost Skew
Similar to the sequential factor, our decomposition algorithms achieves a maximum
attainable speedup if the cost skew is as low as possible, however, in instances where the
skew approaches 1, the speedup does not fall below the value of 1. Also, the speedup
achieved with respect to the cost skew of each of test queries does not vary with different
initial data sizes. This is due to the homogeneity of the servers, and the fact that cost skew is
agnostic to data sizes as it is a ratio of computing costs
6.2.1.4 Computation to Communication Ratio (CCR)
This is the ratio of the computation costs (operator node costs) to data transfer costs (cost of
the edges) in a query tree. This is CCR of a
∑
∑
Data transfer is one of the major sources of the drawbacks of
distributed processing, as it can drastically reduce the gain of
distributed processing and in some cases, increase the
distributed execution duration above single server execution
duration. This is even of greater concern in distributed geo-
processing which typically involves transfer of large sized
data. Using the query tree in Figure 6.8 as an example, if the costs of both edges AC and AB
are greater the cost of B or C, then, the execution duration for a query tree where B and C are
distributed will be more than the execution duration if the query is not distributed. High CCR
indicates that that costs of data transfer is insignificant compared to the computation costs,
hence, computation can be distributed with little concern about communication costs.
Furthermore, not only does high CCR ensure that the fastest servers can be used in a system
comprising of heterogeneous set of servers, it also guarantee that the cost data transfer for the
A
C B
Figure 6.8: Simple WCPS
Query Tree
93
integration of data from sibling operators14
executed in parallel on different servers are
minimized. Figure 6.9 shown the graph of CCR plotted against the speedup obtained after a
set 2-leaf, 4-leaf and 8-leaf tree are decomposed and executed.
Figure 6.9: Performance Evaluation Based on Computation to Communication Ratio
We therefore present the experimental evaluation of how our algorithms speedup relatively to
the CCR, based on the assumption that cost skew and sequential factor approaches zero, and
each of the data processed by the leaf operator are located on different servers.
From the graph, the speedup increases with CCR. In addition, very low CCR degrades the
speedup of the query to values below 1, which implies that the execution duration for the
distributed query is much more than the execution duration if the query was to be done on a
server. However, this is mainly due to the distributed initial locations of the input data.
In our scheduling algorithm, data is usually transferred.
- If its transfer will reduce the execution finish time of the operator nodes which
processes the data
- If data from more than one servers have to be integrated. In this scenario, performance
gain is not the main criteria. However, even with this scenario, data is transferred in
such a manner that the finish time of the operator that processes the data is minimized.
Overall, this graph shows that our decomposition algorithm achieves speedup close to
maximum achievable speedup where the CCR is large; however, speedup which is less than 1
can occurs with low CCR.
6.2.2 D-WCPS Framework Properties
These include the properties of the D-WCPS framework which determines the speedup of the
schedule. These are given below.
14 Operators which share the same parent or ancestor operators
94
6.2.2.1 Average Coverage Processing Speed To Network Speed Ratio (PS/NS-R)
High processing speed of servers and network speed between servers in the network
generally implies smaller execution duration. In addition, the higher the average processing
speed of the servers in the infrastructure, the higher the influence it will have on queries with
high CCR. So also, the higher the average network speeds between the servers in the
network, the higher the query response time for queries with a much lower CCR. Therefore,
the coverage processing speed, the network speed and their ratio determine the rate of
processing and the suitability of the network for the processing of certain forms of query.
Due to the fact that it is practically difficult to experimentally control the network speed, we
hypothetically evaluate the effects of PS/NS-R using synthesized data for a simple two-leaf,
three-node tree. In computing the value of PS/NS-R for this evaluation, the coverage
processing speed is held constant and the network speed is varied. Figure 6.10 therefore
highlights the effects of varying the PS/NS-R on the query execution duration for a query tree
with different CCR’s. For each of CCR’s, there is a PS/NS-R window within which
increasing the PS/NS-R will yield performance again, and above which will yield no
performance gain
Figure 6.10: Performance Evaluation Based on Average Coverage Processing Speed to Network Speed Ratio
6.2.2.2 Servers Available
The maximum number of simultaneous processes that can be run in the network is given as
the sum of the concurrent processes each server in the network can run, and this serves as the
upper limit of the speedup of achievable in query decomposition. Hence, the maximum
speedup attainable given a query is limited to the least of either the number of leaf operators
in the tree or the number of simultaneous processes that can run in the network. In cases
95
where total number of simultaneous processes than can run is less than width of the trees,
speedup is usually determined by the data distribution of the leaf operators - the effects of the
which is described later in 4.2.2.4. The load of other services running on a server too can
affect the speed up of the queries; however, the D-WCPS infrastructure assumes a set of
dedicated servers.
6.2.2.3 Servers Heterogeneity
By server heterogeneity, we mean the variability of the coverage processing speeds, and
network speeds between the servers in the network. Theoretically, this is respectively the
standard deviations of the processing speeds and network speeds in the network. Data is more
likely to be transferred from slower servers to faster servers when the processing speed
heterogeneity, processing speed to network speed ratio of the network, and the CCR of the
query are high. Based on simulated data for a 4-leaf tree with a high CCR (approximately
10), whose initial input data is distributed on the slowest servers in a network, we present the
performance of our decomposition algorithm with respect to server heterogeneity. As shown
in Figure 6.11, increase in heterogeneity increases the speedup if the data is initially
distributed on the slowest sets of servers.
Figure 6.11: Performance Evaluation Based on Server Heterogeneity
However, if the initial data is distributed on the fastest sets of servers, server heterogeneity
will, most likely, not affect the query decomposition efficiency.
6.2.2.4 Initial Data Distribution
Lastly, the initial data distribution affects the distributed query execution duration and
speedup. Its effects, however, cannot be exclusively determined as they are dependent on the
96
other D-WCPS framework and query tree properties. In determining the implications of the
initial data distribution, we note the following:
- Initial data server typically determines the servers for many operators in the query,
most especially the leaf operators, although its significance decreases up the tree
depending on the CCR of the operators.
- The processing speed of the initial data host, the size of output data from the leaf
operators, availability of server with higher processing speed and high network speed
and the overall CCR of the tree, determine whether the
data is transferred from the initial processing server or
not. And this, consequently, affects the finish time of
the distributed query. For example, if data to be
processed by nodes B and C in the query tree in Figure
6.2 are on very slow servers, and there is a much
faster server in the network, and the CCR of A is very high, then the data may be
transferred from B and C’s to the very fast server for processing A, otherwise, it is not
transferred.
- The load on the initial server and the CCR of the leaf operator can determine if it is
transferred out to another server or not
- CCR of two leaf operators having the same parent operator
can determine if it is better to have the initial data on the
same server or not.
- The presence of more than one leaf operator which are not
immediately integrated (e.g nodes D and H in Figure 6.13),
and whose data are on the same server can also determines
whether the distribution pays off or not.
- The initial data sizes present on each server and the output
sizes of the leaf operator also determine the execution
duration
Since various scenarios can play out with respect to query and framework properties,
determining distribution strategy of initial data is another area of research on its own in
distributed databases [1] [91]. These distributions are typically based on the statistics
collected with regards to popular query types, and framework properties. This is however,
beyond the scope of this research.
A
C B
F
H G
E D
Figure 6.13: Sample
Query Tree
A
C B
Figure 6.12: Simple WCPS
Query Tree
97
6.3 PERFORMANCE EVALUATION OF HEFT MODIFICATIONS
Having examined the overall effects of our decomposition algorithm, in this section, we
present the performance gain of the HEFT modifications we introduced in Chapter 5.
However, to give an idea of the contributions of each cost components of a distributed query
tree, in Figure 6.14, we present the cost of each of the constituent of the execution duration
the of a two-leaf distributed query tree, having a CCR value of 1 and with an 0.2 seconds
average cost per operator. The initial data size for each leaf operator in this test query is
200MB. For the given query tree, the parsing, optimizing and scheduling have very minimal
effects on the cost of distributed query tree. The relative value of these cost components
however varies with the changes in the properties of the query.
Figure 6.14: Cost Constituents of Distributed Query
Furthermore, the evaluation of the HEFT modifications is presented below.
6.3.1 Node Clustering And Reduction of Number of Servers
From [47][29], the time complexity of HEFT is given as 2 where v is the number
of operators, and p is the number of servers. Therefore, the server reduction and unary node
clustering techniques (Chapter 5) reduce the time spent during scheduling operation.
As such Figure 6.15 shows the performance gains in scheduling duration when scheduling
query trees different number nodes and with different average number of operators per
cluster.
98
Figure 6.15: Scheduling Duration Gains Due to Unary Node Clustering
Furthermore, the CCR of set of induced operators in WCPS is typically very small (as low as
0.1 in some cases), and as shown in Figure 6.9, a low CCR degrades the quality of the
schedule most especially in a heterogeneous server environment. Clustering of unary nodes,
however, increases the CCR of the cluster, and consequently, the schedule. In Figure 17, we
present the performance of clustering by varying the number of unary operators per cluster
for a 4-leaf query tree with a constant cost, and whose initial data are on the slowest servers
in the network. From the Figure 6.16, when the CCR reaches a certain value due to the
increased number of operators per cluster, coverages can be transferred from slower servers
to faster servers without loss of performance. As a result, more operators can be executed on
faster servers which consequently, imply smaller query execution duration.
Figure 6.16: Distributed Execution Performance Gains of Clustering
99
6.3.2 Multi-Processing Server Capabilities
Similarly, we evaluate our modification to the HEFT such that it includes the multi-
processing capabilities of servers in its scheduling assumptions. However, it is difficult to
model the gains of this modification against any performance criteria. Therefore, for
evaluation purposes, we compare the query execution duration when the modification is used
to when it is not used. For this experiment, we used sample queries with a wide range of
different structural properties (width, sequential factors, cost skew and CCR). The result of
this is presented in Figure 6.17. The thick red line in the figure shows indicates where the
execution Finish Time without the HEFT modification is the same as when there is
modification.
Figure 6.17: Server Multiprocessing Capability Performance Evaluation
From the figure, the execution duration of more queries are reduced by increasing the average
number of processors per server. This is because faster servers in the network can be used to
can run several simultaneous processes. Besides, data transfer costs will be reduced since
data need not be transferred because one of the processors on a server is busy.
6.4 OPTIMIZATION AND EXECUTION MODEL GAIN
In Chapter 3, we detailed the query tree optimizations used in our framework which includes
both single node and multi-node optimizations. Single node optimizations were based on the
optimizations presented by [7] and the evaluation of the optimizations are as presented in [7].
In addition however, we examined the effects of coverage subsetting operators push-down
100
optimization, “left-deep tree to bushy tree” optimization and inter-tuple parallelism with
respect to distributed processing.
Due to the fact that the effects of “left-deep tree to bushy tree” optimization are random i.e.
there is no systematic way of associating their effects with a cause, we compare the execution
durations when there is optimization to when there is no optimization. Therefore, the Figure
6.18 highlights the effects of this optimization. The red line on the graph indicates points
where the execution duration is the same whether there is optimization or not, and points
below and above the red lines respectively indicate the performance gain and loss due to the
optimization.
Figure 6.18: Left Deep Tree to Bushy Tree Optimization Evaluation
Similarly, the coverage subsetting operators push-down optimization has the effect of
reducing the size of data transferred processed by each server and transferred from one server
to another. This optimization gain is dependent on the percentage of the original data that is
sub-set. Figure 6.19 highlight the gains of pushing down sub-setting operators in a distributed
processing system, given the percentage of the total initial data retrieved by the sub-setting
operator. Due to the large processing and inter-server data transfer costs, the smaller the
percentage gets the smaller the distributed execution duration, and the larger the performance
gain.
101
Figure 6.19: Performance of Subsetting Operator's Push Down
Lastly, we studied the performance of inter-tuple parallelism of the Cartesian product table
created from the WCPS query, and the graph in Figure 6.20 highlights the gains of this
processing model for scenarios in which no resources is shared in the execution of each tuple
of the table.
Figure 6.20: Performance of Inter-Tuple Parallelism
102
6.5 CONCLUSION
In this Chapter, we described our prototype evaluation framework, and demonstrated the
efficiency of our scheduling algorithms, and the gains of our query optimizations and
execution model. The query structure and D-WCPS framework properties which affects the
query execution duration were also presented and in some cases, defined. Their effects were
also observed with respect to our query scheduling and orchestration algorithms. Overall, the
performance evaluation indicates that reasonable speedup can be achieved with our
decomposition algorithms. The scenarios whereby the distributed execution is worse off than
a single node execution are such that initial data distribution does not permit the processing to
scale properly. Furthermore, we noted that varying the initial data sizes when the
performance gains determinants remains constant does not affect the performance gains.
Hence, it can be inferred that the algorithm scales with data sizes with the only limitation to
scaling being tile sizes, available memory (assuming all performance factors discussed in this
Chapter remains constant).
103
Chapter 7
CONCLUSION AND OUTLOOK
The OGC WCPS offers a multidimensional raster processing query language with a formal
semantics which contain sufficient information for automatic orchestration. Based on this, we
implemented the D-WCPS (Distributed WCPS) – a framework in which coverage processing
query can be dynamically distributed among several WCPS servers. In this Chapter, we
summarize the work that we did, and we highlights the knowledge gaps that needs to be
filled.
7.1 CONCLUSION
Using the components of the D-WCPS server, we provide a broad overview of this thesis.
7.1.1 The WCPS Registry
The registry, which is the information store of all servers and coverages in the D-WCPS, is
mirrored across all servers in the network. Server information stored in the registry includes
the server properties specific to distributed coverage processing as given in our proposed
cost model. These include coverage processing speed (a benchmark we defined), data I/O
speed, average network speed etc, and they are obtained by the calibrator which each server
implements and runs periodically. Similarly, changes in the information in a registry due to
update, insert, and delete, are propagated to all servers such that the transaction message is
only sent once to a server using the overlay network for the D-WCPS. In the performance
evaluation, it takes around 0 - 5 secs to synchronize the 23 servers in our test framework with
10,000 records of information. Furthermore, because the registry is locally available for all
servers, querying is much more efficient than when the registry is on an external service.
Similarly, risk of single point of failure and processing bottleneck created by a single server
(if the registry we centralised) is removed. Also, due to the fact that each server holds the
location of all the servers in the network in its registry, servers in the network can more easily
reconfigure themselves by registering with another server, in case their gateway server
becomes unavailable.
104
7.1.2 Query Optimization
Each D-WCPS server also has the component which optimizes a query tree both for single
node and multi-node executions. Single node optimizations are based on algorithms and rules
in [99], while the multi-node algorithm transforms a tree from being a left deep tree to bushy
tree. Similarly, in the evaluation of these algorithms, distributed coverage processing shows
significant gain when the algorithms were used compared to when they were not. For
example, the push down of coverage subsetting operation implies less data will be transferred
between servers which in turn, mean that less processing task will be done by servers.
Similarly, transforming a tree from left deep tree to bushy tree implies that more operations
can be parallelized, thereby, the minimizing of the execution duration.
7.1.3 Query Scheduling
[47] shows clustering algorithms performs well when the CCR is low (a common occurrence
in geo-processing due to their large-sized dataset), however, clustering algorithms are not
very suitable in heterogeneous server environment. On the other hand, HEFT has been
proven to be one of the best scheduling algorithms [39]. Hence, we cluster query operators
using some modified principles of HEFT. The HEFT modifications were introduced to
improve on HEFT with regards to query tree processing with large-sized data. These
modifications include pre-schedule clustering, multi-processor server capability, memory
constraint consideration etc. As shown in the performance evaluation, the speedup obtained
by our decomposition algorithm is dependent on several factors such as width, sequential
factor, cost skew, and CCR of query tree, together with the D-WCPS framework properties.
However, overall, the query execution time is reduced except in some extreme cases where
the initial data is distributed and the CCR of the tree is extremely low. Similarly, query
execution durations improves when the HEFT modification we proposed were used
compared to when they were not.
7.1.4 Adaptive Query Orchestration
A P2P orchestration model, whereby WCPS servers recursively invoke other servers with a
distributed query request, is used for executing D-WCPS. After a server receives a distributed
query request, it invokes other servers with partial query requests as specified in the query,
executes its local query requests and integrates the results. The integration may involve
writing a temporary copy of the data due to the fact that partial query results can be larger
than the main memory. The distributed query request is composed by the server which
105
receives the initial global query. The WCPS query syntax is modified to support such
distributed execution. In the introduced modifications, a coverage iterator of WCPS query
will not only bind to coverages, but can as well bind to partial queries with a specified
execution host.
7.1.5 Parallelism in WCPS
We propose the use of three forms of parallelisms at various stages during the query
execution. These include inter-operator parallelism for parallelizing the execution of different
operators in a coverage processing tree, and inter-tuple parallelism for parallelizing the
optimization, scheduling and execution of a set processing tree on a tuple-by-tuple basis. The
evaluation of inter-tuple parallelism shows that speed up of the magnitude of the number of
tuples in the cross product table of the WCPS query tree. We also conceptually describe intra-
operator parallelism with regards to distributed coverages processing in a heterogeneous
server framework. This involves splitting up the execution of an operator node in the query
tree by partitioning the data, transferring the data to their execution hosts, executing the
operator on the input data partitions, transferring the results back to the servers where they
are merged, and merging them.
7.2 BENEFITS OF D-WCPS
In this thesis, we presented the D-WCPS- a framework for resource-aware decomposition of
geoprocessing services based on a declarative request language. Some of the benefits of the
D-WCPS infrastructure are listed below
- Coverages Metadata Sharing: The infrastructure allows servers to share their
coverages metadata with other servers, and to synchronize the modifications to the
data more efficiently. Coverages can therefore be more easily located without less
risks of single point of failure or service registry bottleneck.
- Efficient Data Integration: For less compute intensive tasks, data from several
servers can be efficiently and optimally combined such that will minimize tasks
running time. For example, the query tree optimization algorithm ensures that not all
data is transferred from one server to the next for data integration.
- Computation Sharing: For compute-intensive queries, geo-processing tasks can be
optimally distributed among servers based on their coverage processing capabilities
106
such that will minimize the tasks execution duration. The tasks scheduling algorithms
ensures that the tasks are properly allocated.
- Dynamic composition of services: Complex services can be composed automatically
without human interference for the executing geo-processing queries. Hence, amateur
GIS users and mobile devices can write simple queries which can invoke complex
interaction between different services to generate a response the query request. Power
users too can actually write and execute a distributed WCPS queries
- Efficient orchestration of services: The P2P orchestration model we used ensures
less data transfer between servers and minimum distributed processing bottleneck
compared with centralized orchestration
7.3 WHEN D-WCPS INFRASTRUCTURE IS BENEFICIAL
In the performance evaluation (Chapter 6) of this thesis, we presented the various factors that
affect the performance gains of distributed coverage processing. These include the query tree
properties [6.2.1] such as the tree width [6.2.1.1], query tree’s sequential factor [6.2.1.2],
query tree’s cost skew [6.2.1.3], computation to communication ratio of query tree [6.2.1.4],
etc.; and the framework’s properties [6.2.2] such as number of available servers [6.2.2.2],
server heterogeneity [6.2.2.3], initial data distribution [6.2.2.4], etc. The complex interactions
between these factors make it impossible to consistently enjoy performance gains for every
type of query in every type network configuration of the WCPS servers. Our proposed
decomposition and orchestration algorithms however, ensure that worst case solution is
always avoided in the instances where performance gain is elusive. Besides, every D-WCPS
infrastructure has several other benefits (as listed in the Section 7.2) besides performance
based benefits.
Typically, the use of D-WCPS would be recommended in the instances when:
- When only raster datasets are involved in the computation. In situations where
vectorial spatial data is involved, they would have to be rasterized. Similarly, when
other types of coverages different from rasters e.g. curvilinear grids, are to be used in
the computation, then, the cost model proposed in this thesis would need to be
modified to suit the coverage type.
- There is a need for several institutions to share their spatial data with one another
efficiently.
107
- In the situation where applications and/or the datasets used in some geospatial
analysis tasks are available across several institutions, and these needs to be
efficiently integrated.
- A server needs to share its coverages processing loads with other servers that are
willing to collaborate either within or across institutions.
- There is a need to speed up of query processing time within an institution. This can be
achieved by probably using a several off-the-shelf commodity hardware servers or by
instantiating servers on demand using cloud computing infrastructure such as Amazon
EC2.
7.4 OUTLOOK
With regards to distributed coverage processing, we identify some knowledge gaps that needs
to be filled and some of these include
- Distributed Tile Stream Processing: In this processing model, it is envisaged that
coverages would be processed by pipelining coverage tiles between operators
scheduled on different servers. Using pipelined parallelism for distributed query
processing introduces new research challenges such as cost modelling for the tile
stream processing, scheduling of operators on set of heterogeneous servers based on
the cost model, investigating the possible infrastructure to use for the distributed
processing, researching how this can be integrated into sensor web etc
- Coverage Processing on Non-Dedicated Set of Servers: Another goal in distributed
coverage processing is executing the processes on non-dedicated servers, most
especially, mobile devices.
- Non-Gridded Coverages: Another research endeavour would be to adapt this study
to suit non-gridded coverages.
108
Appendix 1 – Coverage Operators Weights
The table in this appendix displays the experimentally determined values weights of different
coverage processing operators
Operation Description Input Data type Value
Average cell values Char 1.706
Average cell values Float 1.848
Average cell values Double 1.96
Condense Char 6.654
Condense Float 8.062
Condense Double 7.588
Coverage Constructor Char 8.614
Coverage Constructor Float 9.354
Coverage Constructor Double 9.508
Natural Log Double 6.734
Natural Log Float 6.666
Natural Log Char 7.54
Section Char 1
Section Double 1.8842
Square root Float 2.422
Square root Double 2.982
Square root Char 3.28
109
REFERENCES
1. A. Alves et al.: Business Process Execution Language for Web Services, Version
2.0. (2007) http://docs.oasis-open.org/wsbpel/2.0/OS/wsbpel-v2.0-OS.pdf
2. A. Brunstrom, S.T. Leutenegger, R. Simha, Experimental evaluation of dynamic
data allocation strategies in a distributed database with changing workloads. In:
Proceedings of CIKM ’95, 1995
3. A. Doyle, and C. Reed, 2001. Introduction to OGC webservices: OGC
interoperability program white paper. www.opengis.org
4. A. Friis-Christensen, L. Bernard, I. Kanellopoulos, J. Nogueras-Iso, S. Peedell, S.
Schade, C. Thorne (2006): Building service oriented applications on top of a spatial
data infrastructure — a forest fire assessment example. AGILE 2006.
5. A. Gounaris(2005). Resource aware query processing on the grid. Thesis report,
University of Manchester, Faculty of Engineering and Physical Sciences
6. A. Kassim, B. Esfandiari, S. Majumdar, and L. Serghi, A flexible hybrid architecture
for management of distributed web service registries, in Communication Networks
and Services Research (CNSR), vol. 5, 2007
7. A.M.A. Ghanem, A.I Saleh, H.A. Ali, High performance adaptive framework for
scheduling Grid Workflow applications. Computer Engineering and Systems
(ICCES), p. 52-57, 2010
8. A. Oram. Peer-to-Peer: Harnessing the Power of Disruptive Technologies.O’Reilly,
2001.
9. A. Radulescu, C. Nicolescu, A. J. C. van Gemund, and P. P. Jonker. CPR:
Mixed task and data parallel scheduling for distributed systems. In IPDPS, 2001
10. A. Radulescu and A. J. C. van Gemund. Fast and Effective Task Scheduling in
Heterogeneous Systems. In Heterogeneous Computing Workshop, 2000
11. A. Radulescu and A. J. C. van Gemund. FLB: Fast load balancing for distributed-
memory machines. In Proc. Int’l Conf. on Parallel Processing, 1999
12. A. Serge, O. Benjelloun, and M. Milo 2004. The ActiveXML project: an overview.
Gemo research report no. 344.
13. A. Tomasic, L. Rashid, and P. Valduriez, Scaling heterogeneous database and design
of DISCO, in Proceedings of the 16th International Conference on Distributing
Computing Systems (ICDCS), Hong Kong, May 1996
14. A. Weiser, A. Zipf, 2007 , Managing Earth Observation Data with Distributed
Geoprocessing Services. Web Service Orchestration (WSO) of OGC Web Services
(OWS) for Disaster Management
15. A. Whiteside (ed.), "Web Coverage Service (WCS) -- Transaction operation
extension 1.1.4". OGC document 07-068r3
16. B. Baranski, T. Deelmann, and B. Schäffer (2010). Pay-per-Use Revenue Models for
Geoprocessing Services in the Cloud. 1st International Workshop on Pervasive Web
Mapping, Geoprocessing and Services (WebMGS 2010). Como, Italy
110
17. B. Baranski, , A grid-enabled OGC Web Processing Service (WPS). Presentation at
OGC-OGF Collaboration Workshop at The Open Grid Forum (OGF-22), 2008,
Boston, USA
18. B. Bastian, R.Richard, Geoprocessing in the Clouds. FOSS4G Sydney, 2009
19. B. Benatallah, M. Dumas, Q.Z. Sheng and A.H. Ngu, Declarative Composition and
Peer-to-Peer Provisioning of Dynamic Web Services, Proc. Int"l Conf. Data Eng.
(ICDE), pp. 297-308, Feb. 2002.
20. B. Benatallah, Q.Z. Sheng, and M. Dumas, "The Self-Serv environment for web
services composition," IEEE Internet Computing, vol. 7, no. 1, pp. 40-48, 2003.
21. B. Schaeffer, Towards a Transactional Web Processing Service (WPS-T). GI-Days
2008, Münster, Germany
22. C. Schmidt, and M. Parashar, “A Peer-to-Peer Approach to Web Service
Discovery,” World Wide Web Journal, Vol. 7, No. 2, June 2004, pp. 211-229.
23. C.H. Papadimitriou and M. Yannakakis, “Towards an Architecture-Independent
Analysis of Parallel Algorithms, SIAM J. of Comp., vol. 19, no. 2, pp. 322-328, Apr.
1990.
24. Chafle, G., Chandra, S., Mann, V., Nanda, M.G. Decentralized Orchestration of
Composite Web Services. In: WWW 2004 Conference, pp. 134–143 (2004)
25. Condor Team, "Dagman (directed acyclic graph manager),"
http://www.cs.wisc.edu/condor/dagman/
26. D. A. Menascé, QoS Issues in Web Services, IEEE Internet Computing, v.6 n.6,
p.72-75, November 2002 [doi>10.1109/MIC.2002.1067740]
27. D. Berardi, D. Calvanese, G. De Giacomo, M. Lenzerini, and M. Mecella.
Automatic composition of e-services that export their behavior. In Proc. 1st Int.
Conf. on Service Oriented Computing (ICSOC), volume 2910 of LNCS
28. D. D. Gajski, and J. Pier, 1985. Essential issues in multiprocessors. IEEE Computer
18, 6 (June)
29. D. Kossmann. The state of the art in distributed query processing. ACM Comput.
Surv., 32(4):422–469, 2000.
30. D. Martin et al(2004). OWL-S: Semantic Markup for Web Services. W3C
31. D. Thain, T. Tannenbaum, and M. Livny. Condor and the grid. In Fran Berman,
Geoffrey Fox, and Tony Hey, editors, Grid Computing: Making the Global
Infrastructure a Reality. JohnWiley & Sons Inc., 2003.
32. D. Tracy, et al, A Comparison of eleven Static Heuristics for Mapping a Class of
Independent Tasks onto Heterogeneous Distributed Computing Systems. J. Parallel
and Distributed Computing, 61, pp.810-837, 2001
33. D.J. Lilja 2000. Measuring computer performance: a practitioner's guide, Cambridge
University Press, New York.
34. F. Baligand, N. Rivierre, T. Ledoux. A Declarative Approach for QoS-Aware Web
Service Compositions. B. Kramer, K.-J. Lin, and P. Narasimhan (Eds.): ICSOC
2007, LNCS 4749, pp. 422-428, 2007.
35. F. Berman et al. 2005. New grid scheduling and rescheduling methods in the GrADS
project, International Journal of Parallel Programming 33 (2005), pp. 209–229.
111
36. F. Berman, 1998. High-performance schedulers. In The Grid: Blueprint for a New
Computing Infrastructure, edited by I.Foster and C. Kesselman, 279-309. San
Francisco: Morgan Kaufmann.
37. F. Berman, R. Wolski 1997. The AppLeS project: A status report, in: 8th NEC
Research Symposium, Berlin, Germany, 1997.
38. F. Dong, "WORKFLOW SCHEDULING ALGORITHMS IN THE GRID". PhD
Thesis,Queen’s University Kingston, Ontario, Canada , 2009.
39. F. Dong, S.K. Akl, Scheduling algorithms for grid computing: State of the art and
open problems, Technical Report No. 2006-504, School of Computing, Queen’s
University, Kingston, Ontario, Canada, January 2006.
40. F. Mackert, and M. Lohman, R* Optimizer Validation and Performance Evaluation
for Distributed Queries, Proceedings of the 12th International Conference on Very
Large Data Bases, p.149-159, August 25-28, 1986.
41. G. Amdahl (1967). "Validity of the Single Processor Approach to Achieving Large-
Scale Computing Capabilities" (PDF). AFIPS Conference Proceedings (30): 483–
485.
42. G. Bonham-Carter, "Geographic Information System for Geosicientists: Modelling
with GIS". Pergamon Publication. 1994
43. G. Canfora , M. Di Penta , R. Esposito and M. L. Villani, An approach for QoS-
aware service composition based on genetic algorithms, Proceedings of the 2005
conference on Genetic and evolutionary computation, June 25-29, 2005, Washington
DC, USA [doi>10.1145/1068009.1068189]
44. G. Hobona, D. Fairbairn & P. James. "Workflow Enactment of GridEnabled
Geospatial Web Services". In Proceedings of the 2007 UK eScience
45. GIS Wiki http://wiki.gis.com/wiki/index.php/Platform_Performance
46. H. Andrade. T. Kurc. A. Sussman, and J.Saltz. Active Proxy- G: Optimizing the
query execution process in t h e Grid. In Proceedings of the 2002 ACM/IEEE
SC Conference ( S C 2002). ACM Press. Nov. 2002
47. H. Topcuoglu, S Hariri and M.-Y Wu, Task scheduling algorithms for
heterogeneous processors, 8th IEEE Heterogeneous Computing Workshop (HCW
'99) (1999)
48. H. Topcuoglu, S. Hariri, and M.-Y. Wu. Performance- effective and low-
complexity task scheduling for heteroge- neous computing. IEEE Transactions on
Parallel and Distribured Systems, 13(3):260-274,2002.
49. H. Cao, H. Jin, S. Wu, L. Qi, ServiceFlow: QoS Based Service Composition in
CGSP. Proceedings of IEEE EDOC'06.
50. H. Casanova, A. Legrand, D. Zagorodnov and F. Berman, Heuristics for Scheduling
Parameter Sweep Applications in Grid Environments, in Proc. of the 9th hetero-
geneous Computing Workshop (HCW'00), pp. 349-363, Cancun, Mexico, May 2000
51. H. Dail, O. Sievert, F. Berman, H. Casanova, A. YarKhan, S. Vadhiyar, J. Dongarra,
C. Liu, L. Yang, D. Angulo, and I. Foster. Scheduling in the grid application
development software project. In J. Nabrzyski, J. Schopf, and J. Weglarz, editors,
Grid resource management: state of the art and future trends. Kluwer Academic
Publishers Group, 2003.
112
52. H. El-Rewini, T. Lewis, and H. Ali, Task Scheduling in Parallel and Distributed
Systems, ISBN: 0130992356, PTR Prentice Hall, 1994.
53. H. Garcia-Molina, Y. Papakonstantinou, D. Quass, A. Rajaraman, Y. Sagiv, J. D.
Ullman, V. Vassalos, and J. Widom. The tsimmis approach to mediation: Data
models and languages. J. Intell. Inf. Syst., 8(2), 1997
54. H. Rotithor, Taxonomy of Dynamic Task Scheduling Schemes in Distributed
Computing Systems, IEE Proc. on Computer and Digital Techniques, vol. 141, no.
1, pages: 1-10, January 1994.
55. http://www.fgdc.gov/nsdi/nsdi.html as seen on (Dec 05, 2011)
56. I. Ahmad, Y. Kwok; M. Wu, Analysis, evaluation, and comparison of algorithms
for scheduling task graphs on parallel processors, Proceedings of the 1996
International Symposium on Parallel Architectures, Algorithms and Networks,
p.207, June 12-14, 1996
57. I. Foster, The globus toolkit for grid computing Proceedings of the 1st International
Symposium on Cluster Computing and the Grid (2001).
58. inspire.jrc.ec.europa.eu, seen on (Dec 05, 2011)
59. J. Beaujardiere. OpenGIS Web Map Service (WMS) Implementation Specification,
Version 1.3.0, OGC 06-042, 2006
60. J. Hu, C. Guo, H. Wang, Zou H. 2005. Quality Driven Web Services Selection, E-
Business Engineering, IEEE International Conference on, pp. 681-688, IEEE
International Conference on e-Business Engineering (ICEBE'05)
61. J. Lee,J. Park, M. Chung , J. Min, 2010. An intelligent query processing for
distributed ontologies, Journal of Systems and Software, Volume 83, Issue 1, SI:
Top Scholars, January 2010, Pages 85-95, ISSN 0164-1212,
62. J. Liou and M. A. Palis, An Efficient Task Clustering Heuristic for Scheduling
DAGs on Multiprocessors, in Proc. of Workshop on Resource Management,
Symposium of Parallel and Distributed Processing, 1996.
63. J. Smith, A. Gounaris, P. Watson, N. Paton, A Fernandes, and R. Sakellariou.
Distributed query processing on the grid. International Journal of High Performance
Computing Applications, 17(4), 2003.
64. J. Subhlok, P. Lieu and B. Lowekamp, Automatic Node Selection for High
Performance Applications on Networks, Proc. the 7th ACM SIGPLAN Symposium
on Principles and Practice of Parallel Programming, pages: 163-172, Atlanta,
Georgia USA, May 1999.
65. J. Yu and R. Buyya. A taxonomy of scientific workflow systems for grid computing.
SIGMOD Record, 34(3), 2005.
66. J.L. Gustafson, Q.O. Snell. 1995. HINT: A New Way To Measure Computer
Performance, Proceedings of the Twenty- Eighth Hawaii International Conference
on System Sciences HICSS-95
67. K. Amin, G.v. Laszewski, M. Hategan, N.J. Zaluzec, S. Hampton and A. Rossi,
“GridAnt: A client-controllable Grid workflow system”, in D. King and A. Dennis
(eds.), Proc. of the 37th Annual Hawaii International Conference on System
Sciences (HICSS'04) – Track 7. IEEE Computer Society, 2004, p. 70210c.
113
68. L. Di , A. Chen , W.Yang , Y. Liu , Y. Wei , P. Mehrotra , C. Hu ,and D. Williams,
The development of a geospatial data Grid by integrating OGC Web services with
Globus-based Grid technology, Concurrency and Computation: Practice &
Experience, v.20 n.14, p.1617-1635, September 2008
69. L. F. Bittencourt , R. Sakellariou , E. R. M. Madeira, DAG Scheduling Using a
Lookahead Variant of the Heterogeneous Earliest Finish Time Algorithm,
Proceedings of the 2010 18th Euromicro Conference on Parallel, Distributed and
Network-based Processing, p.27-34, February 17-19, 2010
[doi>10.1109/PDP.2010.56]
70. M. Alrifai , D. Skoutas , T. Risse, "Selecting skyline services for QoS-based web
service composition", Proceedings of the 19th international conference on World
wide web, April 26-30, 2010, Raleigh, North Carolina, USA
[doi>10.1145/1772690.1772693]
71. M. Alrifai, T. Risse, P. Dolog, and W. Nejdl. A scalable approach for qos-based web
service selection. In Service-Oriented Computing 2008 Workshops, pages 190-199.
Springer-Verlag, 2009.
72. M. Gone, and S. Shade (2008). “Towards semantic composition of geospatial web
services using WSMO in comparison to BPEL”. International Journal of Spatial
Data Infrastructures Research, Vol. 3, 192-214.
http://ijsdir.jrc.ec.europa.eu/index.php/ijsdir/article/view/72/103
73. M. J. Carey , L. M. Haas , P. M. Schwarz , M. Arya , W. F. Cody , R. Fagin , M.
Flickner , A. W. Luniewski , W. Niblack , D. Petkovic , J. Thomas , J. H. Williams ,
and E. L. Wimmers, Towards heterogeneous multimedia information systems: the
Garlic approach, Proceedings of the 5th International Workshop on Research Issues
in Data Engineering-Distributed Object Management (RIDE-DOM'95), p.124,
March 06-07, 1995
74. M. Nicola, I. Kogan, and B. Schiefer, 2007. An xml transaction processing
benchmark. in SIGMOD Conference, pp. 937–948.
75. M. Ouzzani , A. Bouguettaya, Query Processing and Optimization on the Web,
Journal of Distributed and Parallel Databases, v.15 n.3, p.187-218, May 2004
[doi>10.1023/B:DAPD.0000018574.71588.06]
76. M. Pistore, F. Barbon, P. Bertoli, D. Shaparau, and P. Traverso, “Planning and
Monitoring Web Service Composition”, The 2nd ICAPS International Workshop on
Planning and Scheduling for Web and Grid Services, 2004. pp. 106-115.
77. M.C. Shan, Pegasus architecture and design principles, in Proceedings of the
ACMSIGMOD International Conference on Management of Data, Washington, DC,
USA, June 1993
78. M.N. Alpdemir, A. Mukherjee, A. Gounaris, N.W. Paton, P. Watson, and A.A.A
Fernandes, OGSA-DQP: A Grid Service for Distributed Querying on the Grid.
EDBT 2004. LNCS, vol. 2992, pp. 858–861. Springer, Heidelberg (2004).
79. N. Cova Suazo, and J.O.O Aguirre, (2005). Aspect-oriented Web services
orchestration., Proc. 2nd International Conference on Electrical and Electronics
Engineering(pp. 72-76).
114
80. N. Sandeep, P. Biswajeet, Surface area processing in GIS,
https://www.gisdevelopment.net/technology/gis/pdf/ma03191.pdf
81. N.n. 2007 "Abstract Specification Topic 6: Schema for coverage geometry and
functions". OGC 07-011.
82. N.n., 2008 "Geographic Information - Coverage Geometry and Functions". ISO
19123:2005
83. Oracle Berkeley DB: Performance Metrics and BenchmarksAn Oracle White Paper
August 2006
84. P. Bajpal, M. Kumar, “Genetic Algorithm – an Approach to Solve Global
Optimization Problems”, Indian Journal of Computer Science and Engineering. Vol
1. No 3. pp.199-206, 2010.
85. P. Baumann (ed.), "WCS 2.0 Core Interface Standard". OGC document 09-146r1
86. P. Baumann (ed.), "Web Coverage processing Service (WCPS) Interface Standard".
OGC document 08-068r2
87. P. Baumann , A. Dehmel , P. Furtado , R. Ritsch , N. Widmann, The
multidimensional database system RasDaMan, Proceedings of the 1998 ACM
SIGMOD international conference on Management of data, p.575-577, June 01-04,
1998, Seattle, Washington, United States [doi>10.1145/276304.276386]
88. P. Baumann, P. Furtado, R. Ritsch, N. Widmann: The RasDaMan Approach to
Multidimensional Database Management. Proc. 12th Annual Symposium on Applied
Computing (SAC'97), San Jose/USA, February 28 - March 2,1997, pp. 166-173
89. P. Baumann and S. Keens, "OWS-4 Workflow IPR Workflow descriptions and
lessons learned OGC 06-187r1 version0.0.9". OGC Discussion Paper, OGC. 2007
90. P. Bernstein , N. Goodman , E. Wong , C Reeve , and J Rothnie, Jr., Query
processing in a system for distributed databases (SDD-1), ACM Transactions on
Database Systems (TODS), v.6 n.4, p.602-625, Dec. 1981
[doi>10.1145/319628.319650].
91. P. M. G. Apers, Data allocation in distributed database systems, ACM Transactions
on Database Systems (TODS), v.13 n.3, p.263-304, Sept. 1988
[doi>10.1145/44498.45063]
92. P. Yue, L. Di, Wenli Yang, G. Yu, and P. Zhao. 2007. Semantics-based automatic
composition of geospatial Web service chains. Comput. Geosci. 33, 5 (May 2007),
649-665. DOI=10.1016/j.cageo.2006.09.003
http://dx.doi.org/10.1016/j.cageo.2006.09.003
93. P.Schut (ed.), "OpenGIS Web Processing Service" OGC 05-007r7 version 1.0.0.
94. R. Elmasri and S.B Navathe, Fundamentals of Database Systems, Reading, MA,
Addison-Wesley, 2000
95. R. Hull and S. Jiawen (2005). Tools for Composite Web Services: A Short
Overview. ACM Sigmod , 34, 86-95.
96. R. Hwang , M. Gen , H. Katayama, A comparison of multiprocessor task scheduling
algorithms with communication costs, Computers and Operations Research, v.35
n.3, p.976-993, March, 2008 [doi>10.1016/j.cor.2006.05.013]
115
97. R. Lemmens, A. Wytzisk, R. de By, C. Granell, M. Gould and P. van Oosterom,
Integrating semantic and syntactic descriptions to chain geographic services. IEEE
Internet Computing, 10 5 (2006), pp. 42–52.
98. R. Milner. Communicating and Mobile Systems: The pi-calculus. Cambridge
University Press, 1999
99. R. Ritsch, "Optimization and Evaluation of Array Queries in Database Management
Systems". PhD Thesis, TU Muenchen, 1999.
100. S. Adali, K.S. Candan, Y. Papakonstantinou, and V.S. Subrahmanian, “Query
caching and optimization in distributed mediator systems,” in Proceeedings of ACM
SIGMOD International Conference on Management of Data, Montreal, Canada,
June 1996
101. S. Baskiyar, C. Dickinson, Scheduling Directed Acyclic Task Graphs on
Heterogeneous Processors Using Task Duplication, Lecture Notes in Computer
Science, vol. 2913, Springer, Berlin, 2003, pp. 259–267.
102. S. Hu , V. Muthusamy , G. Li , H. Jacobsen, Distributed automatic service
composition in large-scale systems, Proceedings of the second international
conference on Distributed event-based systems, July 01-04, 2008, Rome, Italy
[doi>10.1145/1385989.1386019]
103. S. Liu, H.A. Karimi, "Grid query optimizer to improve query processing in grids",
Future Generation Computer Systems, 2008, in Press.
104. S. Narayanan, T. Kurc, U. Catalyurek, and J. Saltz, Database support for data-driven
scientific applications in the grid. Parallel Processing Letters. v13 i2. 245-271
105. S. Ninsawat, V. Raghavan and S. Masumoto, Development of Distributed Web
Service for Geoprocessing and 3D Visualization in Web-GIS Clients, Proceeding of
the GIS-IDEAS2008, 4-6 December 2008, Hanoi, Vietnam, pp. 233-237.
106. S. Ranaweera and D. P. Agrawal, A Task Duplication Based Scheduling Algorithm
for Heterogeneous Systems, in Proc. of 14th International Parallel and Distributed
Processing Symposium (IPDPS'00), pp. 445-450, Cancun, Mexico, May 2000.
107. S. Yu, J. Liu, J. Le, “Decentralized web service organization combining semantic
web and peer to peer computing,” Springer-Verlag, Lecture Notes on Computer
Science, vol. 3250, pp. 116-127, 2004.
108. S.J. Kim and J.C. Browne, “A General Approach to Mapping of Parallel
Computation upon Multiprocessor Architectures,” Proc. of Int ’1 Conference on
Parallel , vol. 11, pp. 1-8, Aug. 1988.
109. Srivastava B, Koehler J (2003) Web Service Composition – current solutions and
open problems. ICAPS 2003 Workshop on Planning for Web Services, Trento, Italy
110. T. Bultan, X. Fu, R. Hull, and J. Su. Conversation specification: A new approach to
design and analysis of e-service composition. In Proc. Int. World Wide Web Conf
(WWW), May 200
111. T. Fleuren, P. Müller (2008) BPEL workflows combining standard OGC web
services and grid-enabled OGC web services. Proceedings of the 34th Euromicro
Conference on Software Engineering and Advanced Applications, Parma, Italy
112. T. Hagras, J. Janeek, Static vs. Dinamic List-Scheduling Performance Comparison,
Acta Polytechnica, Vol. 43, No. 6/2003
116
113. T. L. Adam , K. M. Chandy and J. R. Dickson "A comparison of list schedules for
parallel processing systems", Commun. Ass. Comput. Mach., vol. 17, p.685 ,
1974.
114. T. Malik, A Szalay, T. Budavari, and A. Thakar. SkyQuery: A web service approach
to federate databases. In CIDR,2003.
115. T. Yang and A. Gerasoulis, “DSC: Scheduling Parallel Tasks on an Unbounded
Number of Processors,” IEEE Trans. Parallel and Distributed Systems, vol. 5, no. 9,
pp. 951-967, Sept. 1994.
116. T. Yu, J. Lin, 2004. Service Selection Algorithms for Web Services with End-to-End
QoS Constraints, In: Proc. of the IEEE Intl. Conference on E-Commerce
Technology,129 – 136
117. V. Fontes , B. Schulze , M. Dutra , F. Porto , and A. Barbosa, CoDIMS-G: a data
and program integration service for the grid, Proceedings of the 2nd workshop on
Middleware for grid computing, p.29-34, October 18-22, 2004, Toronto, Ontario,
Canada [doi>10.1145/1028493.1028498].
118. V. Sarkar, Partitioning and Scheduling Parallel Programs for Multiprocessors, MIT
Press, Cambridge, MA, 1989.
119. W. Kuhn, "Introduction to Spatial Data Infrastructures". Presentation held on March
14 2005. Available from: http://www.docstoc.com/docs/2697206/Introduction-to-
Spatial--Data-Infrastructures
120. W.M.P. Van der Aalst, M. Dumas, A.H.M. ter Hofstede, and P. Wohed, Pattern-
Based Analysis of BPML (and WSCI). QUT Technical report, FIT-T R-2002-05,
Queensland University of Technology, Brisbane, 2002
121. Web Services Business Process Execution Language Version 2.0. (2007). OASIS .
122. X. Li and M. Malek, "Analysis of speedup and communication! computation ratio in
multiprocessor systems", Proc. Real-Time Systems Symposium, 1988, pp. 282-288.
123. Y. Kwok , I. Ahmad, Static scheduling algorithms for allocating directed task graphs
to multiprocessors, ACM Computing Surveys (CSUR), v.31 n.4, p.406-471, Dec.
1999 [doi>10.1145/344588.344618]
124. Y. Liu, A.H.H. Ngu and L. Zeng, QoS Computation and Policing in Dynamic Web
Service Selection, Proc. 13th Int"l Conf. World Wide Web (WWW), May 2004.
125. Z. R. Peng, M. S. Tsou (2003) Internet GIS: Distributed Geographic Information
Services for the Internet and Wireless Networks. John Wiley & Sons, New York