accelerator for apache spark functional specification · accelerator for apache spark –...

46
http://www.tibco.com Global Headquarters 3303 Hillview Avenue Palo Alto, CA 94304 Tel: +1 650-846-1000 Toll Free: 1 800-420-8450 Fax: +1 650-846-1005 © 2006, TIBCO Software Inc. All rights reserved. TIBCO, the TIBCO logo, The Power of Now, and TIBCO Software are trademarks or registered trademarks of TIBCO Software Inc. in the United States and/or other countries. All other product and company names and marks mentioned in this document are the property of their respective owners and are mentioned for identification purposes only. Accelerator for Apache Spark Functional Specification 23 August 2016 Version 1.0.0 This document outlines the functional specification of the components of the Accelerator for Apache Spark

Upload: others

Post on 14-Mar-2020

42 views

Category:

Documents


0 download

TRANSCRIPT

http://www.tibco.com

Global Headquarters

3303 Hillview Avenue

Palo Alto, CA 94304

Tel: +1 650-846-1000

Toll Free: 1 800-420-8450

Fax: +1 650-846-1005

© 2006, TIBCO Software Inc. All rights

reserved. TIBCO, the TIBCO logo, The

Power of Now, and TIBCO Software are

trademarks or registered trademarks of

TIBCO Software Inc. in the United States

and/or other countries. All other product and

company names and marks mentioned in

this document are the property of their

respective owners and are mentioned for

identification purposes only.

Accelerator for Apache Spark

Functional Specification

23 August 2016

Version 1.0.0

This document outlines the functional specification of the components of the

Accelerator for Apache Spark

Document

Accelerator for Apache Spark – Functional Specification 2

Revision History

Version Date Author Comments

0.1 18/04/2016 Piotr Smolinski Initial version

0.2 24/04/2016 Piotr Smolinski

0.3 03/06/2016 Piotr Smolinski

0.4 10/06/2016 Ana Costa e Silva

1.0.0 23/08/2016 Piotr Smolinski Version for release

Document

Accelerator for Apache Spark – Functional Specification 3

Copyright Notice

COPYRIGHT© 2016 TIBCO Software Inc. This document is unpublished and the foregoing notice is

affixed to protect TIBCO Software Inc. in the event of inadvertent publication. All rights reserved. No

part of this document may be reproduced in any form, including photocopying or transmission

electronically to any computer, without prior written consent of TIBCO Software Inc. The information

contained in this document is confidential and proprietary to TIBCO Software Inc. and may not be used

or disclosed except as expressly authorized in writing by TIBCO Software Inc. Copyright protection

includes material generated from our software programs displayed on the screen, such as icons, screen

displays, and the like.

Trademarks

Technologies described herein are either covered by existing patents or patent applications are in

progress. All brand and product names are trademarks or registered trademarks of their respective

holders and are hereby acknowledged.

Confidentiality

The information in this document is subject to change without notice. This document contains

information that is confidential and proprietary to TIBCO Software Inc. and may not be copied,

published, or disclosed to others, or used for any purposes other than review, without written

authorization of an officer of TIBCO Software Inc. Submission of this document does not represent a

commitment to implement any portion of this specification in the products of the submitters.

Content Warranty

The information in this document is subject to change without notice. THIS DOCUMENT IS PROVIDED

"AS IS" AND TIBCO MAKES NO WARRANTY, EXPRESS, IMPLIED, OR STATUTORY, INCLUDING

BUT NOT LIMITED TO ALL WARRANTIES OF MERCHANTABILITY OR FITNESS FOR A

PARTICULAR PURPOSE. TIBCO Software Inc. shall not be liable for errors contained herein or for

incidental or consequential damages in connection with the furnishing, performance or use of this

material.

For more information, please contact:

TIBCO Software Inc.

3303 Hillview Avenue

Palo Alto, CA 94304

USA

Document

Accelerator for Apache Spark – Functional Specification 4

Table of Contents

TABLE OF CONTENTS .............................................................................................................................4

TABLE OF FIGURES .................................................................................................................................7

TABLE OF TABLES ..................................................................................................................................9

1 PREFACE ....................................................................................................................................... 10

1.1 PURPOSE OF DOCUMENT .......................................................................................................... 10

1.2 SCOPE ..................................................................................................................................... 10

1.3 REFERENCED DOCUMENTS ....................................................................................................... 10

2 ARCHITECTURE ............................................................................................................................ 11

2.1 COMPONENTS .......................................................................................................................... 11

2.2 EVENT PROCESSOR FLOWS (FAST DATA STORY) ....................................................................... 13

2.3 SPOTFIRE COMPONENTS (BIG DATA STORY) ............................................................................... 14

2.4 LIVEVIEW COMPONENTS (OPERATIONS STORY) .......................................................................... 14

3 EVENT SEQUENCING ................................................................................................................... 15

3.1 REGULAR EVENT FLOW ............................................................................................................. 15

3.2 DATA PROCESSING FLOW .......................................................................................................... 16

3.3 SIMULATION ............................................................................................................................. 16

4 EVENT PROCESSOR - STREAMBASE........................................................................................ 17

4.1 CORE LOGIC ............................................................................................................................. 17

4.1.1 ProcessTransactionsAndScore .......................................................................................... 17

4.1.2 ProcessTransaction ........................................................................................................... 19

4.1.3 CategorizeTransactions ..................................................................................................... 20

4.1.4 CategorizeTransaction (DefaultCategorizeTransaction) ................................................... 20

4.1.5 FeaturizeTransactions ....................................................................................................... 21

4.1.6 EvaluateModel (H2OEvaluateModel) ................................................................................ 21

4.2 TRANSPORT BINDING ................................................................................................................ 21

4.2.1 KafkaWiredProcessTransaction ......................................................................................... 21

4.2.2 KafkaConsumeTransactions .............................................................................................. 23

4.2.3 KafkaProduceNotifications ................................................................................................. 23

4.2.4 KafkaAcknowledgeTransaction .......................................................................................... 24

4.3 PERSISTENT RUNTIME STATE ..................................................................................................... 24

4.3.1 HBaseCustomerHistory ..................................................................................................... 24

4.3.2 HBaseAddCustomerTransaction ....................................................................................... 25

Document

Accelerator for Apache Spark – Functional Specification 5

4.4 CONFIGURATION LOADING AND CHANGE MONITORING ................................................................. 25

4.4.1 MaintainCategories ............................................................................................................ 25

4.4.2 MaintainFeatures ............................................................................................................... 25

4.4.3 H2OMaintainModel ............................................................................................................ 26

4.4.4 CoordinateStartup .............................................................................................................. 26

5 DATA ANALYTICS - SPOTFIRE ................................................................................................... 27

5.1 DISCOVER BIG DATA ................................................................................................................. 27

5.1.1 Totals ................................................................................................................................. 27

5.1.2 Discover Big Data: Drill-down ............................................................................................ 28

5.1.3 Categories .......................................................................................................................... 28

5.1.4 Basket Analysis .................................................................................................................. 29

5.1.5 Client Cross-Sell ................................................................................................................ 30

5.1.6 Geos ................................................................................................................................... 30

5.1.7 Play-page ........................................................................................................................... 31

5.2 MODEL BIG DATA ..................................................................................................................... 32

5.2.1 Preparation ......................................................................................................................... 32

5.2.2 Training .............................................................................................................................. 33

5.2.3 Model quality check ........................................................................................................... 34

5.2.4 Variable importance ........................................................................................................... 34

5.2.5 Discrimination threshold selection ..................................................................................... 36

5.3 DESIGN AND EXECUTE MARKETING CAMPAIGNS .......................................................................... 37

5.3.1 Campaign bundling ............................................................................................................ 37

5.3.2 Campaign deployment ....................................................................................................... 38

6 DATA ACCESS - SPARK AND H2O ............................................................................................. 39

6.1 DATA ACCESS AND PROCESSING IN SPARK ................................................................................. 39

6.2 MODEL TRAINING IN SPARKLING WATER / H2O .......................................................................... 39

7 EVENTS TO DATA - FLUME ......................................................................................................... 40

7.1 INFORMATION TO BE STORED ..................................................................................................... 40

7.2 FROM EVENTS TO DATA ............................................................................................................. 41

7.3 WHEN MY DATA IS AVAILABLE .................................................................................................... 42

7.3.1 Events ................................................................................................................................ 42

7.3.2 Runtime context ................................................................................................................. 42

7.3.3 Intermediary storage .......................................................................................................... 42

7.3.4 Target storage .................................................................................................................... 42

Document

Accelerator for Apache Spark – Functional Specification 6

7.4 DATA FOR ANALYTICS ................................................................................................................ 43

7.4.1 Data format ........................................................................................................................ 43

7.4.2 Data organization ............................................................................................................... 43

7.4.3 Enrichment ......................................................................................................................... 43

7.4.4 Tools .................................................................................................................................. 43

8 INSIGHT TO ACTION - ZOOKEEPER AND H2O ......................................................................... 45

9 EVENT FLOW SIMULATOR .......................................................................................................... 46

Document

Accelerator for Apache Spark – Functional Specification 7

Table of Figures

Figure 1: Solution Component Diagram ........................................................................ 11

Figure 2: Regular event flow ......................................................................................... 15

Figure 3: Data processing activities ............................................................................... 16

Figure 4: ProcessTransactionAndScore ........................................................................ 17

Figure 5: ProcessTransactionAndScore transactions .................................................... 17

Figure 6: ProcessTransactionAndScore notifications .................................................... 18

Figure 7: ProcessTransactionAndScore acks ................................................................ 18

Figure 8: ProcessTransactionAndScore acks ................................................................ 18

Figure 9: ProcessTransactionAndScore transactionsOut .............................................. 18

Figure 10: ProcessTransactionAndScore categories ..................................................... 19

Figure 11: ProcessTransaction ..................................................................................... 19

Figure 12: ProcessTransaction Transactions ................................................................ 19

Figure 13: ProcessTransaction Predictions ................................................................... 20

Figure 14: CategorizeTransactions ............................................................................... 20

Figure 15: DefaultCategorizeTransaction ...................................................................... 20

Figure 16: FeaturizeTransactions .................................................................................. 21

Figure 17: H2OEvaluateModel ...................................................................................... 21

Figure 18: KafkaWiredProcessTransaction ................................................................... 22

Figure 19: KafkaWiredProcessTransaction Transactions .............................................. 22

Figure 20: KafkaWiredProcessTransaction Categories ................................................. 23

Figure 21: KafkaConsumeTransactions ........................................................................ 23

Figure 22: KafkaProduceNotifications ........................................................................... 24

Figure 23: KafkaAcknowledgeTransaction .................................................................... 24

Figure 24: HBaseCustomerHistory ................................................................................ 25

Figure 25: HBaseAddCustomerTransaction .................................................................. 25

Figure 26: MaintainCategories ...................................................................................... 25

Figure 27: MaintainFeatures ......................................................................................... 26

Figure 28: H2OMaintainModel ...................................................................................... 26

Figure 29: CoordinateStartup ........................................................................................ 26

Figure 30: Spotfire: Discover: Totals ............................................................................. 27

Document

Accelerator for Apache Spark – Functional Specification 8

Figure 31: Drill-down ..................................................................................................... 28

Figure 32: Spotfire: Discover: Categories ...................................................................... 29

Figure 33: Spotfire: Discover: Basket Analysis .............................................................. 29

Figure 34: Spotfire: Discover: Client CrossSell .............................................................. 30

Figure 35: Spotfire: Discover: Geos .............................................................................. 31

Figure 36: Spotfire: Discover: Play Page ....................................................................... 31

Figure 37: Spotfire: Model: Prep ................................................................................... 32

Figure 38: Spotfire: Model: Training in Spark ................................................................ 33

Figure 39: Spotfire: Model: Evaluate Quality ................................................................. 34

Figure 40: Spotfire: Model: Variable Importance ........................................................... 36

Figure 41: Spotfire: Model: Custom Threshold .............................................................. 37

Figure 42: Spotfire: Deploy: Bundle Models into Campaigns ......................................... 37

Figure 43: Spotfire: Model: Launch Your Campaigns .................................................... 38

Document

Accelerator for Apache Spark – Functional Specification 9

Table of Tables

Table 1: Accelerator for Apache Spark Components ..................................................... 12

Table 2: Event Processor Modules................................................................................ 13

Table 3: Spotfire and Spark components ...................................................................... 14

Table 4: LVW and LDM components ............................................................................. 14

Document

Accelerator for Apache Spark – Functional Specification 10

1 Preface

1.1 Purpose of Document

This document addresses dynamic aspects of the Accelerator for Apache Spark. It describes the

applied solutions as planned, repeatable in concrete customer projects and realization in the accelerator

demo.

The Accelerator for Apache Spark addresses the growing market of analytics solutions (Big Data) with

strong focus on the event processing (Fast Data). The accelerator goal is to highlight the TIBCO added

value to the Big Data world. We have acknowledged that the Big Data solutions already exist. The

missing point is getting value from Big Data analytics. It is possible to explore data, process it and build

the models. The challenge arises when the data is no longer static. The events flow through the system

and the event processing goal is to capture them in analytics optimal form. Once the results from

analytics are available they should be converted into value. The accelerator covers the full cycle from

event capture through analytics to predictive and prescriptive model execution against observations.

1.2 Scope

The document covers the following aspects:

Scalable event capture and processing (Kafka and StreamBase)

Event persistence in Big Data storages (Kafka, Flume, Spark)

o minimal event processing layer impact

o data processing efficiency

Numerical model training in Big Data processing clusters (Spotfire, Spark, H2O)

Model deployment to scaled out event processors (Spotfire, ZooKeeper and StreamBase)

Operational monitoring (LiveView DataMart and LiveView Web)

Artificial data generation and injection

1.3 Referenced Documents

Document Reference

Accelerator for Apache Spark Quick Start Guide

Accelerator for Apache Spark Interface Specification

Document

Accelerator for Apache Spark – Functional Specification 11

2 Architecture

2.1 Components

The accelerator architecture focuses on commonly applied open source Big Data products. The key

solutions are:

Kafka - extremely scalable message bus for Fast Data

HDFS - de facto standard for Big Data storage

These two products have been confronted to TIBCO products:

StreamBase - event processing solution

Spotfire - analytics platform

To gaps in the architecture have been filled with:

HBase - for scalable event context storage

Flume - for Fast Data to Big Data transition

Spark - for data access and transformation

H2O - for clustered model training and lightning-fast scoring

ZooKeeper - for cluster coordination

Figure 1: Solution Component Diagram

What's important, the accelerator is not limited to Big Data. The problem of getting the value from

analytics exists also in traditional applications.

Document

Accelerator for Apache Spark – Functional Specification 12

Table 1: Accelerator for Apache Spark Components

Component Software Description

Messaging Firehose Apache Kafka Highly scalable messaging bus. The core of Fast

Data system is messaging bus capable of passing

thousands of messages per second and still

expandable. With Kafka it is possible to add nodes

on demand to support more traffic.

Data Storage Apache Hadoop HDFS The Big Data systems rely on the efficient and reliable

storage for enormous amounts of data. Hadoop

framework provides two components, one for the data

(HDFS) and one for programs (YARN).

Event Processor TIBCO StreamBase StreamBase is a CEP and ESP platform for event

processing. It combines visual programming with high

efficiency for reactive event handling. The component

provides integration and event processing

capabilities.

Data Analytics TIBCO Spotfire Spotfire is a data visualization and analytics platform.

In the accelerator the access patterns to the data

stored in the cluster were evaluated. The accelerator

shows also sample flow for model building in the Big

Data cluster and runtime model deployment.

Runtime Context Store Apache HBase NoSQL columnar database used with HDFS.

Data Writer Apache Flume Event persistence framework.

Data Access Service Apache Spark Big Data processing framework. Apache Spark is

current state-of-the-art solution for processing data in

Big Data clusters. It offers much better throughput

and latency than the original Hadoop Map-Reduce.

Model Training Engine H2O Cluster-oriented numerical modelling software.

Traditional numerical modelling algorithms in R or

NumPy/SciPy are implemented with simple

architecture in mind. When the dataset significantly

exceeds a single node capacity reimplementation of

such algorithms is needed. H2O is a successful

attempt to train models and it generates effective real-

time processing models.

Simulation Publisher StreamBase

Jython

Kafka

Simulation framework. The component injects the

messages into the system for the demo purposes.

There component uses customer modelling and data

injection parts.

Real-Time Dashboard Live DataMart

LiveView Web

StreamBase

Visualization component presenting recent changes in

the system in the real-time.

Document

Accelerator for Apache Spark – Functional Specification 13

2.2 Event Processor Flows (Fast Data story)

The Fast Data focuses on the data flowing through the system. The operating data unit is customer.

The event processing layer captures new transactions, builds customer history and prepares offers.

Table 2: Event Processor Modules

Module Component Description

Kafka transaction

binding

Event Processor The integration binding to the messaging firehose. It

contains example of Kafka adapter usage and

complex XML handling.

Context binding Event Processor Each transaction is processed in scope of previous

transactions executed by the same customer. The

state is externalized to HBase.

Enrichment Event Processor The context contains only the raw facts. It this case it

is list of transactions with just product ids. For the

model processing this information has to be enriched

with category cross-referencing.

Transaction featurization Event Processor Before transactions can be processed by model, the

transaction and history must be converted into model

input. The typical model input is a list of categorical or

numerical values.

Model execution Event Processor The models are external deployable artifacts

produced by the data analytics layer. The result of

event processing in this case is a score for each

deployed model.

Live DataMart binding Event Processor The LVW is provided as real-time monitoring

dashboard. The underlying LDM is fed by the event

processing component.

Flume binding Event Processor Binding for secure sending of the data to HDFS.

Offer acceptance

tracking

Event Processor Process of tracking customer response.

Configuration

notifications

Event Processor Binding for the configuration changes provided by

ZooKeeper.

Document

Accelerator for Apache Spark – Functional Specification 14

2.3 Spotfire components (Big Data story)

The Big Data store uses holistic view on the data. It aggregates customers and builds statistical models.

The operating unit is dataset.

Table 3: Spotfire and Spark components

Module Component Description

ETL Data Access Service

Data Analytics

Transformation from Avro to Parquet.

Data discovery Data Analytics

Data Access Service

Access to the underlying tables for data discovery.

Model training Data Analytics

Data Access Service

Model preparation and assessment

Model deployment Data Analytics Model submission to the event processing layer.

2.4 LiveView components (operations story)

The LiveView shows the current state of the system. It presents the currently relevant information about

running processes. That means it contains only small fraction of the data or heavily reduced

information.

Table 4: LVW and LDM components

Module Component Description

Transaction

TransactionItems

Real-Time Dashboard Recent transactions with their content. The tables

form master-child structure.

ModelStatus Real-Time Dashboard Status of the models.

StoreSummary Real-Time Dashboard Current store information. Includes static store

information, like geographic position, and aggregate

information derived from transaction.

Document

Accelerator for Apache Spark – Functional Specification 15

3 Event Sequencing

3.1 Regular event flow

Originator Kafka StreamBase FlumeKafkaHBase H2O HDFS

deliver

GET

score

Live DataMart

notify

update

publish

select best offer

track acceptance

insert

PUT

collect

write batch

acknowledge

remove batch

acknowledge

Figure 2: Regular event flow

The Fast Data story is automated process. It focuses on transaction event processing. The sequence of

events happening:

1. The originator publishes transaction event (XML) to Kafka Transactions topic

2. StreamBase event flow retrieves event

3. The past customers transactions are retrieved from HBase

4. The past transactions are filtered by date (to limit to the recent transactions) and deduplicated

5. The built customer context is converted into features

6. The data is scored by deployed H2O models

7. The results are filtered according to the model metadata (cut-off, validity dates and so on)

8. From the remaining results the winning one is selected

9. The transaction data with scoring result is published to Kafka as JSON

a. Flume Source consumes batches of messages

b. Once all messages are accepted by agent's Channel, the batch is acknowledged

c. The batches are delivered to HDFS Sink

d. Once the Sink flushes buffers, it removes data from Channel

10. The result is published to Kafka Notifications topic as XML

11. The message is delivered to originator (it may or may not contain offer)

12. The transaction is published to LDM for runtime tracking

13. The past transactions for current customer are scanned for pending offers

14. The pending offers with categories matching the incoming transaction are marked succeeded

15. The past transactions are scanned for outdated offers (based on message timestamp)

16. The pending offers with missed deadline are marked unsucceeded

Document

Accelerator for Apache Spark – Functional Specification 16

3.2 Data processing flow

Collect data ETL data Discover data Build models Bundle models Deploy models

Figure 3: Data processing activities

The Big Data story is human driven process. The focus here is exploration of the data stored in Big

Data cluster (HDFS+Spark). The process eventually produces models executed in the event processing

layer.

The high-level procedure follows:

1. The data is collected in HDFS as Avro

2. The ETL process turns many relatively small Avro files into Parquet

a. transaction deduplication

b. category info enrichment

c. data flattening

3. The data scientist explores the data and provides candidate algorithms (partially covered by the

accelerator)

4. The data analyst builds the candidate algorithms and assesses their performance (for example

using ROC curves). The accepted models are described and passed to the operations

5. The system operator combines the models into bundles

6. The bundles are deployed to event processing

The side activities happening in the event processing layer are:

1. The events sent to Flume are accompanied with model evaluation data

2. The customer purchases are tracked for offer acceptance and sent in the real-time to LDM

3. The offer acceptance and model efficiency can be transformed in ETL process

3.3 Simulation

The traffic simulator is a StreamBase component generating test transaction flow. The component

publishes transaction messages using configured transaction rate, reference data and time

compression factor. The module is also capable of simulating customer response on the presented

offers.

There are two implementations of the component. One implementation uses real-time transaction

generation model. This variation uses stateful process of tracking large number of customers and

generates random transactions using reference data probability modifiers. The process tries keeping

uniform time distribution between subsequent transactions of the given customer. The advantage is that

the realtime data generation may adapt customer behaviour to the system responses (offers).

Alternative implementation reads pregenerated data and sends messages. The data is stored in a flat

tab-separated file. The file is ordered by timestamp and transaction id. The ordering guarantees that

transaction lines for the same transaction are stored as single block.

The generator process builds random customer history. Single iteration creates a customer with some

demographic profile. For this customer a series of transactions is built. The transactions are written out

as flattened transaction item list.

Document

Accelerator for Apache Spark – Functional Specification 17

4 Event Processor - StreamBase

4.1 Core logic

4.1.1 ProcessTransactionsAndScore

The event flow handles the main context related logic.

Figure 4: ProcessTransactionAndScore

In the flow the transactions are processed for customer offering and for hot item (category) tracking.

The ProcessTransaction module executes the logic related to customer offering. It loads the customer

context, transforms it into model consumable feature set and does the model output final interpretation.

In this particular case the winning offer is selected. The TrackCategories expands the incoming

transaction into transaction lines with category info. What's important, a single line may have 0 or many

categories. The resulting categories are then processed as individual signals.

The module provides also external configuration wiring. The control submodules are responsible for

maintenance of the reference tables and deployed models.

The transactionsIn input stream carries the raw transaction information passed from the originator.

The capture group supports arbitrary external content to be passed transparently to the output streams.

This feature is used to retain the Kafka consumption context information.

Figure 5: ProcessTransactionAndScore transactions

The notifications output stream emits the ultimate result of the processing logic. This result is used to

send the offer to customer. The events contain input event transport-related fields.

Document

Accelerator for Apache Spark – Functional Specification 18

Figure 6: ProcessTransactionAndScore notifications

After all logic is executed the messages are acknowledged to the input topic. With Kafka this means the

last consumed offsets are saved in Zookeeper. Because the acknowledgement protocol is transport

related and logic independent, the acks events carry only transport information.

Figure 7: ProcessTransactionAndScore acks

The notifications output stream emits the ultimate result of the processing logic. This result is used to

send the offer to customer. The events contain input event transport-related fields.

Figure 8: ProcessTransactionAndScore acks

From the same structure as the notifications the audit information is derived and published as

transactionsOut. The events are used to update the LDM tables and to store the transactions and

evaluation results in HDFS for auditing and effectiveness tracking purposes.

Figure 9: ProcessTransactionAndScore transactionsOut

The categories output stream emits category tracking tuples. These are later consumed for category

performance check and for customer to detect the offer responses.

Document

Accelerator for Apache Spark – Functional Specification 19

Figure 10: ProcessTransactionAndScore categories

4.1.2 ProcessTransaction

This is the main working horse for the CEP-style processing. The flow implements stateful context for

customer's transactions. The past transactions are retrieved from dedicated storage solution (pluggable)

and the new transaction is appended to the ledger. All the transactions in the retrieved history are

classified according to the product to category mapping. Subsequently the enriched customer context is

converted into feature vector, i.e. data structure corresponding to the customer description in the

applied modelling. The result is then processed by all currently deployed models.

Figure 11: ProcessTransaction

The Transactions input stream carries essential information about the transaction. The flow in this

module is responsible for information enrichment and adaptation.

Figure 12: ProcessTransaction Transactions

The Predictions output stream strips the locally collected state. It emits the originally input information

with accepted model results.

Document

Accelerator for Apache Spark – Functional Specification 20

Figure 13: ProcessTransaction Predictions

4.1.3 CategorizeTransactions

The flow just iterates over transactions and applies category resolution to each of them.

Figure 14: CategorizeTransactions

4.1.4 CategorizeTransaction (DefaultCategorizeTransaction)

The transaction categorization uses pluggable logic. In the applied case it uses query table to load all

the categories assigned to product SKU.

Figure 15: DefaultCategorizeTransaction

Document

Accelerator for Apache Spark – Functional Specification 21

4.1.5 FeaturizeTransactions

Context featurization is typically complex task. The CEP context information (enriched by the known

state and reference data) has to be converted into a structure that matches the one used to train the

models. In many of the cases there is no perfect mapping between the static data used by a data

scientist and runtime state available during event processing. The featurization tries to build the input

sample description as close as possible to the one used in the model training process.

Figure 16: FeaturizeTransactions

4.1.6 EvaluateModel (H2OEvaluateModel)

Once the incoming transaction is transformed into features, it can be processed by the models. In the

accelerator case the featurized transactions are processed by ultra-fast models generated with H2O. In

generic case there could be even several alternative model deployable at the same time for routed or

broadcasted execution.

Figure 17: H2OEvaluateModel

The logic in the flow is simple. The incoming features are adapted to the model operator interface.

4.2 Transport binding

4.2.1 KafkaWiredProcessTransaction

The event processor core logic is related to the transaction processing. The top level event flow

orchestrates Kafka message exchange and exposes notification flows for other features.

Document

Accelerator for Apache Spark – Functional Specification 22

Figure 18: KafkaWiredProcessTransaction

The transaction processor consumes messages from Kafka bus. The transactions are evaluated using

core logic to obtain offers for customer and to categorize the transaction items. The processing results

are sent as offering to the caller.

The KafkaWiredProcessTransaction is top-level event flow orchestrating the transport binding and

actual logic execution. The event flow calls the main processing logic and passes the transport-related

information as capture group. This data is transparent to the underlying implementation, but it is

required to properly send responses to the incoming messages and to commit the transactions.

The event flow offers two output streams intended for synchronous event consumption:

Transactions

Categories

The Transactions output stream emits the transaction information with model evaluation results.

Figure 19: KafkaWiredProcessTransaction Transactions

The output stream is used to:

update LDM tables

report events to Flume

track prepared offers

The Categories output stream captures categorized transaction information. It emits tuples for each

transaction line.

Document

Accelerator for Apache Spark – Functional Specification 23

Figure 20: KafkaWiredProcessTransaction Categories

The stream is consumed by:

offer acceptance tracking

hot categories tracking (currently not implemented)

4.2.2 KafkaConsumeTransactions

The Kafka consumption has been simplified in this version of accelerator. There is single consumer

handling all the partitions of the Transactions topic. The consumer is statically configured to connect to

known broker list. At the startup the flow is inactive. The subscription is opened once the coordination

module decides that all the models and configuration settings have been read. This was made in order

to avoid processing of the data with partial configuration.

The process reads topic metadata from ZooKeeper. Then for each partition it retrieves recent

consumption offset and activates subscriber.

The flow reads messages from the broker and before emitting events for processing does interpretation

of the opaque content:

the XML payload is adapted to StreamBase compliant format and then to tuple

the header is parsed and exposed for transport-related handling

Figure 21: KafkaConsumeTransactions

4.2.3 KafkaProduceNotifications

The message sending is much simpler than consuming. The flow renders payload XML the StreamBase

style and transforms it to the interface defined schema. Then the message is sent out according to the

data passed in transport header provided by the consuming module.

Document

Accelerator for Apache Spark – Functional Specification 24

Figure 22: KafkaProduceNotifications

4.2.4 KafkaAcknowledgeTransaction

Transaction acknowledgement in Kafka is simple. One has to save the last processed offset in the

shared location, in this case in ZooKeeper node.

Figure 23: KafkaAcknowledgeTransaction

4.3 Persistent runtime state

In the accelerator the runtime state for the main transaction processing logic is maintained in HBase.

This is pluggable component and, as long as the contract is respected, the HBase may be replaced with

any technology. A similar feature can be implemented for example with TIBCO ActiveSpaces. The main

advantage of HBase over AS is durability focus. Also the product API allows for much lighter

communication protocol and lower coupling between components.

4.3.1 HBaseCustomerHistory

In order to retrieve the customer past transactions a Get operation is executed. The operation is done

with MaxVersions attribute set to high value; therefore all transactions stored by HBase are retrieved. It

has been assumed that the solutions should be duplicate-tolerant. There could be multiple entries for

the same transaction, but the initial design states that the content for given transaction id is same. This

way it is enough to retrieve only unique records.

The lookup by primary key uses region server routing, therefore the operation scales linearly with the

HBase cluster size.

Document

Accelerator for Apache Spark – Functional Specification 25

Figure 24: HBaseCustomerHistory

4.3.2 HBaseAddCustomerTransaction

The counterpart of past transactions retrieval is appending of transaction to the customer's history. In

HBase it is made simple. Updating a field with version tracking is equivalent to appending an entry to

the version log.

The update by primary key uses region server routing. Similarly as the lookup the operation scales

linearly with the HBase cluster size.

Figure 25: HBaseAddCustomerTransaction

4.4 Configuration loading and change monitoring

The solution uses ZooKeeper to store the global configuration. ZooKeeper is a cluster-wide source of

truth. It prevents from uncontrolled knowledge corruption that may happen during split-brain. All node

changes are atomic, i.e. the consumers can see only full updates. The important characteristic of

ZooKeeper is that the consumer can see the last value, but may miss the intermediate ones. In case of

global setting management it is perfectly acceptable.

In the solution the asynchronous API was used to retrieve the data. That means the process registers

for change notification and reads the value. If the node does not exist it is treated as if it were empty. In

this release the configuration is monitored using separate connection for each monitored node.

4.4.1 MaintainCategories

The categories are kept in a file in HDFS. The file is pointed by content of z-node. During startup and

whenever the z-node changes (even for the same content), the associated query table is cleaned and

filled with the content from the product catalogue.

Figure 26: MaintainCategories

4.4.2 MaintainFeatures

Features follow the same structure as product-category mapping. The z-node points to the location in

HDFS where the current feature list is defined. On startup and whenever the observed node changes,

the shared query table is cleaned and filled with content.

Document

Accelerator for Apache Spark – Functional Specification 26

Figure 27: MaintainFeatures

4.4.3 H2OMaintainModel

The model maintenance is realized slightly different way than category mapping and feature list. The z-

node keeps a list of model sets as file pointer per line. The observer process reads all the files and

builds the metadata list. This list is them passed to the H2OEvaluateModel that updates the operator.

Figure 28: H2OMaintainModel

4.4.4 CoordinateStartup

The ZooKeeper observers are asynchronous. That means there is no guarantee that the system is fully

configured during init phase. In order to avoid processing messages with partially configured solution,

the subscription should be started only once the configuration has been applied. To achieve this we

need a coordination of messages coming from independent parts. The process is connected to the

maintenance flows via container connections. Once all three inputs report success, the ready state is

released.

Figure 29: CoordinateStartup

Document

Accelerator for Apache Spark – Functional Specification 27

5 Data Analytics - Spotfire

TIBCO's Accelerator for Apache Spark meets a customer service use case, where we want to

understand our sales and to create models that we can later deploy in real time to send promotions for

specific products to our customers. For this we run a Classification Model to identify customers who are

likely to say "Yes" to an offer of a particular product. However, this type of model adapts for many other

use-cases, for example financial crime detection or prediction of machine failure or in general any time

you want to distinguish between two types of records from each other. You can use this accelerator in

those use cases as well.

The example file aims at aiding 3 different tasks. The tasks are made simple by easy to use controls. In

the demonstration scenario all parts are handled by single visualization. In real projects there will be

most likely separate sites dedicated to various task executions.

5.1 Discover Big Data

The first section is called Discover Big Data and it serves as an environment that enables answering

business questions in a visual way, including needs of Big Data Reporting and Discovery. This section

is composed of 6 pages. All aggregations are delegated to the Big Data cluster running Apache Spark.

5.1.1 Totals

The top of this page shows a preview of the data, which has a set of 10K lines and the respective

content. Such a preview can be useful for inspiring strategies for analysing the data. Below, we show

some KPIs and how they evolve over time. By clicking on the X and Y axes selectors, the user can

choose different KPIs.

Figure 30: Spotfire: Discover: Totals

Document

Accelerator for Apache Spark – Functional Specification 28

5.1.2 Discover Big Data: Drill-down

Figure 31: Drill-down

This section proposes a drill into the data. There are four market segments in the data. When the user

chooses some or all of them in the pie chart, a tree map subdividing the respective total revenue by

product appears. When selecting one or many products, below appears a time series of the respective

revenues. The user may as such navigate the different dimensions of the data or choose different

dimensions in any of the visualisations.

5.1.3 Categories

To achieve the better understanding of the data some more details are required. As again a way of

discovering the shape of the data, here is offered a drill-down by product categories. At the top, a tree

map shows the importance on sales and price of each product. The visualisations at the bottom show

some KPIs now and over time. By default, they encompass the whole business, but they respond to

choices of one or many products in tree map.

Document

Accelerator for Apache Spark – Functional Specification 29

Figure 32: Spotfire: Discover: Categories

5.1.4 Basket Analysis

Here, upon making a choice on the left hand list, we get a tree map that show the importance of all

other products that were sold in the same baskets that contained the product of choice. This is a nice

way of perceiving how customers buy products together and can help understand which variables

should be included in models. The controls on the right allow choosing different metrics and

dimensions.

Figure 33: Spotfire: Discover: Basket Analysis

Document

Accelerator for Apache Spark – Functional Specification 30

5.1.5 Client Cross-Sell

Understand customer taste. What types of products do clients buy, regardless of whether in the same

basket or not. Similar to the previous page, here are shown the products that clients who bought the

chosen product have also bought, whether in the same basket or in any moment in time. This is useful

when drawing cross/up-sell campaigns.

Figure 34: Spotfire: Discover: Client CrossSell

5.1.6 Geos

The geospatial analysis it important aspect of data processing. Spotfire allows users to display

aggregated data in order to understand the spatial relationships and geographical coverage. It is

possible to locate the shops, which sell better give products, analyse the customer trends by region,

and understand performance. This page shows how revenue and average price are distributed by shop

and by region. It leverages Spotfire’s ability to draw great maps.

Document

Accelerator for Apache Spark – Functional Specification 31

Figure 35: Spotfire: Discover: Geos

5.1.7 Play-page

This page provides a custom playground for users. Load one month of data into memory. You can

choose which month you want by using our prompt. Use our recommendation engine to pick the right

visualisation to answer new business questions. Replicate the desired visualisation on in-database

data. This page can be duplicated as many times as required.

Figure 36: Spotfire: Discover: Play Page

Document

Accelerator for Apache Spark – Functional Specification 32

5.2 Model Big Data

The second section of the Spotfire component is called Model Big Data and supports the business in

the task of Modelling Big Data. This part is made of 5 pages that support the business in the task of

Modelling Big Data. The Accelerator supports the Random Forest Classification model, which is a type

of model that is valid on any type of data. Therefore, it can be run by a business person. The goal is to

make models that support promotions of a particular product or groups of products.

In the accelerator the H2O DRF algorithm was applied. H2O is particularly effective for the presented

case because it is able to train models on Big Data scale datasets, integrates nicely with Spark and

produces extremely fast runtime models.

5.2.1 Preparation

Before the models can be trained, the user has to define the input data for the model. The model

training algorithms expect the data in reduced form, so called features. Every sample (in our case

customer) is described by uniform set of variables. The calculation of these variables is parameterized

by user selected settings. In the provided example the customer is described by past purchases in each

category and response label that in our case tells if customer made any purchase in interesting

categories in the following months.

In the provided example the user decides which months he/she wants to use for training the model and

which months contain response purchases. For training, we recommend to take enough months to

encompass at least one full seasonal cycle, for example one full year. Maybe very old data is less

relevant to current customer behaviour. If that is the case, one may not want to include much old data.

For testing, at least 1 to 3 months of data should be used, preferably the more recent.

Figure 37: Spotfire: Model: Prep

Document

Accelerator for Apache Spark – Functional Specification 33

5.2.2 Training

Once the model training reference data was selected, the actual model training is executed. The user

names the groups of products that will be modelled. He/she then uses the central list to selects the

products to make a promotion for. The user selects the products to make the promotion and launches

the training job in the Spark cluster. The data defined by the user are collected and passed to the

cluster for execution.

The actual process can be long. The user may check the job’s progress on the Spark web UI and track

the checkpoints in the dashboard. In the presented demonstration, the model training job produces

POJOs (H2O real-time execution models) and collects the information provided by H2O engine. When

the process is finished and the job is done, the models are available for inspection and deployment. The

user should press the Refresh results button. When this button is pressed, Spotfire reaches to Spark

via TERR to obtain the latest results of model building exercise. As the outcome of the training process

the following datasets are created:

results - model training job results; for each model training job there is tab-separated text file

containing information line for each generated models

pojo - generated H2O POJO classes; the directory contains subdirectory for each model

training job

roc - directory stores ROC points generated by H2O for each training job

varimp - variable importance scores obtained from model training jobs

sets - directory containing model metadata as tab-separated files describing deployable model

and its parameters

These results are analysed in the following pages.

Figure 38: Spotfire: Model: Training in Spark

Document

Accelerator for Apache Spark – Functional Specification 34

5.2.3 Model quality check

On the left hand pane, the user chooses which model to evaluate. One can choose the current model or

any past model. The choice populates the chart with the respective ROC curve.

Evaluating model quality involves seeing if its results allow better decisions than a random choice, e.g.

than tossing a coin. The model in the accelerator aims at separating one type of clients from the

remainder, namely the ones who may be interested in the chosen product. For any given client, if we

chose what type they were at random, the model’s ROC curve (Receiver Operating Characteristic)

would likely be close to the red line in the chart. If the model were perfect and got every decision right,

the model’s ROC Curve would be given by the green line. The blue line gives the position of the actual

ROC Curve of the chosen model. The total area below the blue line is called AUC (Area Under the

Curve) and gives a measure of how much better the current model is when compared with making a

choice at random (represented by the red line). The left hand table shows the AUC of all models, which

gives the user an idea of how good models are expected to be. Models with large enough AUC can be

user approved. Previously approved models can also have their approval revoked in this page. All

following pages just show approved models. It is important to bear in mind that approval of a model

should not be final before the variable importance page is analysed, which happens on the next page.

Figure 39: Spotfire: Model: Evaluate Quality

5.2.4 Variable importance

On the left hand pane, the user chooses which model to continue analysing. Only previously approved

models appear here. By default, the models will use all available data to understand what drives the

purchases of the modelled product. Some products are better drivers of a specific promotion than

others. The chart is used to understand the relationship between your products and customer

preferences by identifying the most important predictors. Go back to your Discover Clients' Taste page

to validate the discovery.

Document

Accelerator for Apache Spark – Functional Specification 35

This type of considerations is more important in some usecases than in others. In more sophisticated

cases the variable importance discovered using one model may be used to provide better training

parameters for another model. In fact, a combination of visualizing the ranking of the features as well as

the detail of the individual features is important for a number of reasons:

Validation of the model’s quality. Maybe your best feature is so good because it is part of the

answer and should therefore be excluded.

Validation of the data’s quality. Were you expecting a different feature to have more power than

what is showing? Perhaps there are data quality issues causing a lack of relevance, or maybe

outliers introduced a bias. These quality issues can be quickly spotted in visualization, for

example a histogram of the numeric features.

Correlation is not causation. It is necessary to ask questions that lead to a better understanding

of the reality being predicted.

Surprising top features. Sometimes predictors expected to be irrelevant turn out to have huge

predictive ability. This knowledge, when shared with the business, will inevitably lead to better

decisions.

Inspiration for new features. Sometimes the most informative features are the reason to delve

into new related information as a source of other rich features.

Computational efficiency. Features with very low predictive power can be removed from the

model as long as the prediction accuracy on the test dataset stays high. This ensures a more

lightweight model with a higher degree of freedom, better interpretability, and potentially faster

calculations when applying it to current data, in batch or real time.

It is important to bear in mind that approval of a model should not be final before the variable

importance page is analysed. If any issues are spotted, the user can revoke previously approved

models.

Document

Accelerator for Apache Spark – Functional Specification 36

Figure 40: Spotfire: Model: Variable Importance

5.2.5 Discrimination threshold selection

This page is entirely optional. When a model run is in real time, a measure is calculated of how likely a

given customer is to say yes to a promotion of your specific product. In order to decide to send him or

her a promotion, this metric is compared against a Threshold. This Threshold is defined by default to

maximise the F1-score. The F1-score balances two types of desirable characteristics this type of

models can have:

Precision: of all the people the model would send a promotion to, what proportion accepts it;

Recall: of all the people that would have said yes to a promotion, how many did the model

recognise.

F1 weighs these two dimensions equally. If you are happy with this choice, you can ignore this page.

However, the user may have their own way of defining a desired Threshold and can use this page to set

it. For example, they may want to maximise just precision or just recall, or to weigh them differently.

Table 2a can be used to select other known model performance metrics.

In 2b, one may select a Threshold manually. This is useful if it is important to control the proportion of

customers that are identified as target, in case this must be weighed against the size of a team who will

manually treat each case (e.g. telemarketing calls). The Proportion Selection (% of customer base)

figure populates against this choice.

In 2c, you may create your own model performance metric. For example, attribute a monetary cost to

sending a promotion that is not converted and/or a monetary gain to a promotion that is converted. You

can do this by typing your own formula in "here" on the Y-axis of the chart and then selecting the

Threshold that maximises it. All the data needed for a custom calculation is available in the data that

feeds the chart.

In area 3, the user chooses the Threshold of choice and saves it by pressing Use.

Document

Accelerator for Apache Spark – Functional Specification 37

Figure 41: Spotfire: Model: Custom Threshold

5.3 Design and execute marketing campaigns

This final part is made of 2 pages that support the business in the task of running marketing campaigns

that deploy the models learnt in the previous sections. Each model targets one product. The models are

deployed to the event processing layer are model sets that we call campaigns or marketing campaigns.

Campaigns launch promotions for groups of products at once by bundling models and their respective

thresholds together.

5.3.1 Campaign bundling

The produced models can be composed together to form a co-deployed bundle. Here you can bundle

existing models into a new campaign and name your campaign. Alternatively, you can load a past

campaign and revise it, by adding new models or thresholds to it or by removing past models.

Sections 1 and 2 of this page require user action, whilst the remainder just provide information. In

Section 1, the user chooses to either create a new campaign which takes on the name he/she chooses

to give it just below, or chooses to load an existing campaign for analysis by choosing one from table a)

to the immediate right. The models that are part of the new or existing campaign appear in table b) on

the right hand middle section of the page. The user can now use Section 2 to change the models that

are part of a campaign. This can be done by choosing to add new models, which he/she collects from

table c). Or by deleting existing models from the current campaign. When done, the user can save the

new settings of the existing campaign. The button at the bottom “Refresh available model list” ensures

that all the more recently run models appear in list c).

Figure 42: Spotfire: Deploy: Bundle Models into Campaigns

Document

Accelerator for Apache Spark – Functional Specification 38

5.3.2 Campaign deployment

This page connects you to the real time event processing engine. Here you can see the name of the

campaigns that are now running in real time and inspect their underlying models and thresholds. You

can also launch a new campaign.

The left hand side of this page allows user action, whilst the right presents resulting information. The

button “Which campaigns are currently running in real time?” show the names of the campaigns that are

running now. These names appear as Streambase will see them. The button “Refresh list of available

campaigns” will update table a) so it includes all past campaigns, including the ones that have just been

created. When the user chooses a campaign from this table, table b) reflects the models that are part of

it. Finally, the button “Deploy the new selected campaign” can be pressed to launch a new campaign in

real time.

Figure 43: Spotfire: Model: Launch Your Campaigns

Document

Accelerator for Apache Spark – Functional Specification 39

6 Data Access - Spark and H2O

The Spotfire provides an effective data exploration and visualization layer. The actual data processing

in this accelerator is done in the Big Data processing cluster implemented using Apache Spark.

6.1 Data access and processing in Spark

Apache Spark is a general purpose data processing framework. It reimplements the Map/Reduce

approach by leveraging falling memory costs and by combining the processing steps. The goals of

Spark are similar to the original Hadoop's Map/Reduce. That means Spark is used to process the data

in Big Data systems. The typical tasks are data mining, transformations, model training etc. In the

accelerator the Spark component provides the main gateway to the data. Spotfire cannot access the

data stored in HDFS directly and Spark provides a convenient abstraction layer. Some of the commonly

accessed data has been exposed for direct consumption in Spotfire via HTTP/REST interface. The

same interface is used to coordinate the long-running jobs: ETL and model training.

Spark provides the transformation capability from event-oriented Avro files to analytics-friendly Parquet.

The data transformation is expressed as a series of steps extracting the event-provided information and

eventually saving the transformation result in the dataset optimized for the use-case.

6.2 Model training in Sparkling Water / H2O

H2O was selected as the model training and execution engine. The major advantage of H2O is easy

integration with Spark via the Sparkling Water project. Sparkling Water provides a feature of starting

H2O worker nodes within running Spark executors. Once the H2O cluster is ready, Sparkling Water

optimizes the data transfer from Spark to H2O runtime. Both in typical H2O and Spark applications the

data exchange is done through the single process on both sides. In Big Data systems it is undesirable

as this approach does not scale. In Sparkling Water the data is copied directly from Spark executors to

H2O workers. This approach minimalizes processing effort and avoids unnecessary network traffic

between components.

H2O is not capable of data transformation. It can load the data frames and process the models, but in

H2O it is not possible to convert arbitrary data into data frames. The input data formats are also limited.

Spark and H2O is a very powerful combination. Spark can load the data from many sources and can be

further extended to support other formats. H2O provides proven distributed model training algorithms.

In the accelerator Spark component orchestrates the model training. First it loads the data and

featurizes it according to the provide parameters. The featurized transaction data is then passed to the

H2O cluster as data frames. In H2O the dataset is split into training and validation subsets and then the

desired binomial classification models are built. The model training job results in a set of models

(POJOs), one for each requested response category. For every model the summary is provided as AUC

value and threshold values maximizing standard metrics. In order to assess the models the variable

importance table is collected. To select the most effective working point of the model (cut-off value) the

ROC points are collected and exposed to Spotfire.

Document

Accelerator for Apache Spark – Functional Specification 40

7 Events to Data - Flume

The important aspect of Fast Data to Big Data transformation is persisting events. Even if HDFS offers

capacity to store much more data than it was available before, the solution has very poor performance

when the data has to be written frequently.

This problem is typical to the event processing systems. The major challenges are:

what should be stored

how the data movement speed is reduced

what is the desired format of the data

what is data visibility latency

7.1 Information to be stored

The first question that arrives is: What should be stored? The arrival of Hadoop opened new

possibilities to the enterprises. With relatively low cost it is possible to store enormous amounts of data.

The problem is the relatively low cost. Even if the cost is lower than before, it is still greater than null.

Small unit price multiplied by large number of units may result in a considerable cost.

The decision what to store should be driven by the value of the data. Today's trend in the Big Data

solutions is to store everything and think later about building a value from collected information. When

this approach is used directly it may increase the operational cost of the solution.

StreamBase is a perfect tool for data ingress. It can both filter the events and enrich them with

additional value. In the accelerator case the incoming information is enriched by applying the deployed

models. Before the models can be applied, the incoming transaction must be enriched by loading past

transactions for the customer and classification of the customer history content. The cross-referencing

of the data may change over time. Therefore even if the transaction line to category mapping is

resolved during transaction processing, it may be different once the transaction content is used for other

purposes, like model training. Later the same information can be easily recreated. Other information

built in the event processing layer has significant value for the future. Information that customer was

given a particular offer is important both for audit and for model efficiency tracking.

In the demonstration scenario the following information is stored:

customer id

transaction id

transaction date and time

transaction originator (store id)

transaction content as delivered from originator

the offering sent to customer with reference data for generating model(s)

Information not stored in the Big Data storage:

Document

Accelerator for Apache Spark – Functional Specification 41

previous transactions used to build the customer history (should be already stored)

category to transaction item assignment (can be recreated any time and it is only valid when

used to featurize customer)

effective feature vector

all model responses

Depending on the use-case this data selection may change. Some of the information coming from

enrichment process can be used as data to be stored. For example the event processing layer may add

demographic information to the events that are going to be stored in the Big Data system. Alternatively

one may choose not to store transaction lines for particular products.

7.2 From events to data

The process of storing the data in Big Data system can be easily broken. The APIs to store the data

exist and are relatively easy to use. The problem is the right understanding of the architectural

constraints applied to the Big Data system. The distributed file systems like HDFS are optimized for

reading and processing of the massive amounts of data. The write operations of relatively small data

chunks are highly ineffective.

StreamBase offers direct adapters to write the data to HDFS. In order to guarantee data safety the write

buffers have to be flushed after every operation. This heavily impacts the event processing

performance.

The alternative approach is staging process. The event flow pushes the events to the next layer that

aggregates the data in larger blocks and routes it to the target location. The data is eventually safely

written in HDFS in relatively large blocks.

The recommended design is to start aggregation in event processing layer. The consumption from

Kafka allows the event processing job to be restarted from the last offset with guarantee of the same

message sequence delivery. This strong semantics supports building solutions that do emit information

in batches. This further reduces processing cost in the subsequent layers.

The important fact to acknowledge is data duplication. Failure scenarios assume repeating of the same

tasks, which inevitably leads to data duplicate creation. Event processing may be effectively used (and

should be used) to detect duplicates. In the scalable systems, however, the duplicates may reappear

after event processing layer. Building the solution with duplicate awareness prevents the problems from

appearing in the production scenarios.

In the accelerator Apache Flume was used. To guarantee safe data delivery the StreamBase event flow

communicates with Flume via Kafka. In the current version of accelerator each incoming event is a

separate job unit and sends the single message to Flume. The Flume agent expects the incoming

messages to contain the data to be stored as JSON text content. If more than one event per message

were sent, they are just concatenated JSON strings. Flume aggregates the data in large batches and

saves them to HDFS files in Avro format. The message aggregation and compact format usage reduce

the IO impact on the cluster.

Document

Accelerator for Apache Spark – Functional Specification 42

7.3 When my data is available

The staging approach to data storage raises another problem. The data is available once the batch it

belongs to is committed. The batch sizes and aggregation periods are result of the compromise

between data availability and efficiency. The larger are the batches, the more efficient data can be

processed. On the other hand large batches mean the time between the events were captured and

eventually were made available for consumption is very long.

Depending on the data availability requirements various approaches can be applied. In the accelerator

the transaction data exists in four forms.

7.3.1 Events

The transaction data in form of events is the raw information coming from transaction originator. The

XML messages can be directly consumed from the Kafka topic as they arrive. The consumers may

apply various processing flows like:

customer offering (core logic)

hot item tracking

7.3.2 Runtime context

The example scenario assumes the transactions are processed in context of customer history. The past

transactions are available for direct access from HBase. The new transactions are appended to the

storage immediately after they are processed. As HBase is optimized for direct key access and lock-

free updates, other consumers may also access the same data as it arrives. The information for given

customer has very low availability latency. The downside is dataset processing. While HBase supports

free form queries, frequent analytic queries would have significant negative impact on the event

processing layer.

7.3.3 Intermediary storage

Flume stores the incoming transactions in a compact binary format Avro. The information stored by

event aggregation layer is available for dataset processing once the batches are committed. In the

demonstration scenario the files are rolled every 10 minutes. That means the data is available no later

than 10 minutes from arrival. The problem is that 10 minutes leads to 144 files per Flume agent per day,

that is more than 4000 per agent in a month. In typical Big Data processing solution, each file is

separate task. Large number of relatively small files leads to huge coordination overhead and effectively

to large processing latency.

7.3.4 Target storage

The same information as stored in Avro can be subsequently transformed into more data processing

friendly manner. The accelerator uses Parquet as target data format. The ETL process aggregates the

small Avro files produced by Flume into large Parquet files partitioned by commonly used filters (like

month). The ETL process is executed infrequently. This can be once or twice per day. The effectiveness

of the process can be further improved by the data partitioning. If the partitioning schema uses data

production related fields, the ETL process may use only incremental data. In particular it may process

only data that may have changed recently and keep the results for the previous periods.

Document

Accelerator for Apache Spark – Functional Specification 43

7.4 Data for analytics

Data analytics may create additional challenges for the data. Once the data has been aggregated in the

ETL stage, it is generally available for processing.

7.4.1 Data format

In typical queries only a subset of the data is required. Parquet is columnar format. That means the

continuous blocks of file store the data for single column from multiple rows. In order to show efficiency

of the format a simple aggregation query can be used. For example, in order to aggregate the sales per

month only the month and revenue fields are needed. To execute this query only about 20% of the

whole transaction file has to be read. This significantly reduces the IO cost and effectively lowers

latency.

7.4.2 Data organization

Common task types in data analytics are grouping and joining. Both types are expensive operations in

the clusters. When particular operations are repeatedly used, it is worth optimizing the data to support

that kind of queries.

The first strategy is data partitioning. With file location partitioning it is possible to remove from the

processing set the files that do not contain the requested data.

The second possible strategy is data unit selection. In the retail transaction processing case the basic

information unit is transaction. The problem is that this information is hierarchical, i.e. it contains a

substructure in the form of transaction items. To leverage SQL support, however, the selected data

representation unit is transaction item. Parquet supports nested structures. The problem with them is,

however, that the nested fields no longer benefit from columnar data orientation.

As the third strategy, the data can be denormalized. In traditional OLAP solutions the processing relies

on fast joins between tables. In Big Data solutions it is sometimes better to precompute the join or

grouping to avoid unnecessary runtime processing.

7.4.3 Enrichment

Data enrichment was discussed before in the event storage. The event ingestion time enriched

information may be irrelevant for the long time storage. This does not mean, however, that the enriched

information is not needed. In the accelerator demo case the category information resolved in the event

processing layer is not stored in HDFS. This information is required to do the model training, though. In

order to provide it, the categories are resolved again in the ETL process. The advantage of this

approach is that the same mapping can be executed any time after the event was processed,

7.4.4 Tools

There is a plethora of tools available to access the data stored in Big Data systems. In our case the

data is stored in flat Parquet structure in HDFS and it needs to be accessed by Spotfire.

7.4.4.1 Apache Hadoop Map/Reduce

The early approach to data access in Hadoop was processing the data in using Map/Reduce jobs.

While this approach revolutionized the industry, the data could be transformed from one form to

another, but it was still missing convenient access interfaces.

Document

Accelerator for Apache Spark – Functional Specification 44

7.4.4.2 Apache Hive

Hive was one of the first approaches to provide uniform API to the data. The data language is SQL and

Hive provided SQL/JDBC APIs and metadata management to execute SQL statements as Map/Reduce

jobs. The Map/Reduce implementation caused a lot of runtime inefficiencies, but a common API layer

was provided.

7.4.4.3 Cloudera Impala

Impala is Cloudera approach to reimplement the Hive. Unlike original Hive the execution engine was

implemented in C++, but the access interface and metadata management was retained. Therefore the

former Hive clients could be used with little changes or no changes at all. Impala offers much better

performance than original Hive. The downside is that it is offered only as part of one Hadoop distribution

(CDH).

7.4.4.4 Apache Spark

Apache Spark does not directly address the data access problem. It fixes instead the performance

problems from the original Map/Reduce. Spark is general-purpose data processing framework. It

efficiency comes from consolidating processing steps and intermediary transient result caching. As one

of the modules spark provides efficient SQL execution module with Hive-compliant endpoint.

The strong advantages of Spark are:

possibility to mix SQL with custom processing steps in Java/Scala

ability to expose processing as temporary tables (advanced views)

support for arbitrary custom functions

ability to expose dedicated interfaces to the running system

Accelerator heavily leverages Spark as the data access layer. It uses Spark's data access engine to

efficiently query and process Parquet files. For performance purposes some of the data structures have

been exposed as lightweight HTTP endpoints. In the end the same running component is used to

access the data and train the models.

The disadvantage of Spark is that the external communication goes through the driver program. For

larger systems this central point may become both the single point of failure and bottleneck.

7.4.4.5 Apache Drill

Apache Drill is one of the newest approaches to the data access in Big Data systems. It addresses

similar use-cases as Hive/Impala, i.e. regular data access using SQL. Contrary to previous approaches

Drill promises full ANSI SQL support. The big advantage of Drill is arbitrary data source support. That

means Drill is able to provide a common view over various databases and offer single API to join

Parquet file form HDFS with dimension data coming from Oracle and MSSQL.

Document

Accelerator for Apache Spark – Functional Specification 45

8 Insight to Action - ZooKeeper and H2O

The major problem in Big Data systems is monetizing the value of knowledge generated from data

analytics. The Accelerator for Apache Spark addresses this problem by providing a path from data

analytics to event processing. The applied approach assumes that the data analytics layer is able to

produce artifacts that may be used in the event processing layer to generate value. Typically these

artifacts are statistical models predicting some kind of behaviour.

The major challenge going from data analytics to event processing is common language between the

data scientists and real-time oriented developers. The accelerator shows practical example of the

information exchange between data analytics and event processing teams.

The shared components are models with some use-case specific metadata. In the data processing flow

the data analysts explore the data and discover the significant relations. In the accelerator case an

example of such relation is product inventory with product categorization. Data scientists evaluate the

available datasets in order to find significant statistical relations and provide the recipes to the build the

models. The models are built and assessed for efficiency. When approved, they are annotated with use-

case specific metadata and passed to the event processing.

In the provided retail transaction processing scenario Distributed Random Forest models are built using

H2O and Spark. The models that pass acceptance are bundled into marketing campaigns and sent for

deployment to the event processing system.

The major issue here is that the Data is not only Big. It is also Fast. In order to support large data

streams the provided solutions must be horizontally scalable, which implies multiple nodes processing

their parts of the data streams. The parallel delivery of the configuration change to multiple nodes is

challenging. In the accelerator it has been solved by storing the configuration parameters in ZooKeeper.

When a process starts it gets from the ZK node the current configuration value. Whenever the

configuration changes, the update is delivered to all the listening processes. This way the disconnected

processes can still benefit from shared configuration.

Another important challenge is ability to execute the statistical models and to reload them on demand

without restarting the process. To support this requirement a set of model operators have been provided

in StreamBase. In the demonstration scenario H2O operator is used with binomial classification models.

The accelerator provides examples how the StreamBase processes may connect to the ZooKeeper to

obtain the current configuration and runtime updates. The event processing layer implementation shows

runtime context retrieval, data enrichment, event featurization and eventually model execution. The

ability to process the models completes the full cycle of the events to data and insight to action story.

Document

Accelerator for Apache Spark – Functional Specification 46

9 Event flow simulator

The event flow simulator provides a convenient way of inserting large number of messages to

demonstrate the functionality of the solution and to stress it for performance.

The simulator is a simple Jython program that processes transaction log (sorted by timestamp and

transactionId) and sends the XML messages to Kafka topic with desired production rate.

The transaction data is artificially generated one. It contains randomly generated customer transactions

with some hidden demographics related trends. The data uses probabilistic model of customer

behaviour. In this model the probability of given product purchase changes depending on the context.

Some products have higher propensity to be purchased by women, some are unlikely to be bought by

people living in Florida and some are bought mostly by teenagers.

The transactions events contain just customer identity and the executed baskets. Even if the customer

demographics data is available in the system, the whole demonstration focuses on showing the ability

to predict customer behaviour based on the observations.

The generated 100K customers dataset is preloaded in HDFS and can be accessed via Spark, either

with SQL or HTTP. The same data is injected directly into Kafka in order to show the system reacting on

the incoming events.

What's important, the accelerator demonstrates the technical capabilities of TIBCO products in

connection to the currently available open source Big Data solutions. The data used by simulator may

be any time changed to support another use case, but the general principles remain the same. TIBCO

StreamBase is capable of consuming data from messaging firehoses like Kafka and effectively storing

the data in distributed filesystems for analytics. TIBCO Spotfire helps to understand the data and build

the knowledge.

The created knowledge is in turn used to create the runtime execution artifacts supporting the business

use-case. The use-case can be marketing campaign target selection, fraud detection based on past

observations, process irregularity check or even image classification. All these cases are difficult to be

implemented in traditional solutions, but are commonly addressed by data science and statistical

techniques. The accelerator provides an end-to-end scenario when the events are converted to data,

the data is converted to insight and insight is turned into value.