Download - Meandre 2.0 Alpha Preview
Meandre 2.0 CTB-Alpha Preview
Cargol Treu Banya
Xavier Llorà
Data-Intensive Technologies and ApplicationsNational Center for Supercomputing Applications
University of Illinois at [email protected]
Motivation
• Great feedback and lessons learned from 1.4.X series
• Hot topics on 1.4.X
• Complex concurrency model based on traditional semaphores written in Java
• Server performance bounded by JENA’s persistent model implementation
• State caching on individual servers increase complexity of single-image clusters
• Cloud-deployable, but not cloud-friendly
Motivation
• How 1.5 efforts turned into 2.0?
• Cloud-friendly infrastructure required rethinking core functionalities
• Drastic redesign of backend state storage
• Revisited execution engine to support distributed flow execution
• Changes on the API that will rendered returned JSON documents incompatible with 1.4.X
Quick Overview
What's New 2.0 Series?
• Rewritten from scratch in Scala
• RDBMS backend via Jena/JDBC has been dropped
• MongoDB for state management and scalability
• Meandre 2.0 server is stateless
• Meandre API revised
• Revised response documents
• Simplified API (reduced the number of services)
• New job API
What's New From 1.4.X Series?
• New HTML interaction interface
• Off-the-shelf full-fledged single-image cluster
• Revised flow execution lifecycle: Queued, Preparing, Running, Done, Failed, Killed, Aborted
• Flow execution as a separate spawned process. Multiple execution engines are available
• Running flows can be killed on demand
• Rewritten execution engine (Snowfield)
• Support for distributed flow fragment execution
Four New Supporting Technologies
• Cloud-ready Meandre 2.0 clusters via:
• Solid and reliable tools
• Scala’s actors
• MongoDB (http://www.mongodb.org/)
• REST access and distributed communication layers
• Crochet (http://github.com/xllora/Crochet)
• Snare (http://github.com/xllora/Snare)
Meandre 2.0 - MongoDB Store
• MongoDB bridges the gap between
• Key-value stores (which are fast and highly scalable)
• Traditional RDBMS systems (which provide rich queries and deep functionality)
• MongoDB supports replication of data between servers for failover and redundancy
• MongoDB is designed to scale horizontally via an auto-sharding permitting the development of large-
Meandre 2.0 - Crochet
• Fast REST API prototyping and development for Scala
• Built on the top of Jetty (http://jetty.codehaus.org/jetty/)
• Enables quick prototyping of REST APIs
• Provides a simple DSL build on Scala
• Developed to support the development of Meandre 2.0
• http://github.com/xllora/Crochet
Meandre 2.0 - A Crochet Example
import crochet._
new Crochet {
get(“/message”,“text/plain”) { “Hello World!” }
} serving “./static_content” as “/static” on 8080
Get your server up and running by running
$ scala \ -cp crochet-0.1.4.jar:crochet-3dparty-libraries-0.1.X.jar\ hello-world-with-static.scala
Meandre 2.0 - Snare
• Notification fabric for distributed Scala applications
• Back ended on MongoDB for scalability
• Snare monitors developed for Meandre 2.0
• Track activity via heartbeat
• Provide messaging between monitors and global broadcasting of BSON objects
• Basic monitoring over HTTP via Crochet
• http://github.com/xllora/Snare
Meandre 2.0 - A Snare Example
scala> import snare.tools.Implicits._
scala> val monitors = (1 to 3).toList.map( i => snare.Snare( "me_"+i, “my_pool”, (o)=>{println(o);true} ) )
scala> monitors.map(_.activity=true)2010.01.28 16:47:05.222::INFO:[EVTL] Notification event loop engaged for 230815e0-30cc-3afe-99ac-936d497d12822010.01.28 16:47:05.231::INFO:[EVTL] Notification event loop engaged for baec232f-d74d-3fd1-ad3a-caf362f58b7d2010.01.28 16:47:05.236::INFO:[EVTL] Notification event loop engaged for d057fcde-fd10-3edd-9fd2-cfe464c6971c2010.01.28 16:47:08.136::INFO:[HRTB] Heartbeat engaged for baec232f-d74d-3fd1-ad3a-caf362f58b7d2010.01.28 16:47:08.136::INFO:[HRTB] Heartbeat engaged for 230815e0-30cc-3afe-99ac-936d497d12822010.01.28 16:47:08.136::INFO:[HRTB] Heartbeat engaged for d057fcde-fd10-3edd-9fd2-cfe464c6971c
scala> monitors(0).broadcast("""{"msg":“Fooo!!!”}""")
scala> monitors(0).notifyPeer( “230815e0-30cc-3afe-99ac-936d497d1282”, """{"msg":“Fooo!!!”}""" )
Meandre 2.0 Architecture
Meandre 2.0 = Scala + MongoDB ++ Snare + Crochet
• Meandre 2.0 requires at least 2 separate services running
• A MongoDB for shared state storage and management
• A Meandre server to provide services (via Crochet) and facilitate execution (customizable execution engines)
• A single-image Meandre cluster scales horizontally by adding new Meandre servers and sharding the MongoDB store
The Minimal Meandre Cluster
• Can be broken in three basic functional units
1. The Meandre server (main activity coordinator)
2. The MongoDB store (holds all server state, job related information, and system information)
3. Meandre customizable executor (in charge of flow execution allowing selection of multiple execution engines)
Meandre 2.0 server
State API Snare Monitor Job Manager API
CrochetServer
MongoDB Meandre Execution
User Info, Profiles & Roles
Repositories
Unified Job Queue
Job Consoles and Logs
Snare Cluster Status & Heartbeat
•Execution coordination
•Spawn external jobs for execution
•Customizable execution engine
•On job running per server
•Allow consuming all server
resources
Scaling via a Single-Image Cluster
• A cluster is formed by one or more Meandre servers
• MongoDB scalability can support tens of Meandre servers with a single instance
• Adding more Meandre servers allows:
• Provide web service load balance
• Fault tolerance
• Improving the throughput of job execution (number of concurrent jobs is equal to the number of Meandre servers in the cluster)
MongoDBUser Info, Profiles & Roles
Repositories
Unified Job Queue
Job Consoles and Logs
Snare Cluster Status & Heartbeat
Meandre ExecutionMeandre 2.0 server
State API
Snare Monitor
Job Manager API
CrochetServer
Meandre ExecutionMeandre 2.0 server
State API
Snare Monitor
Job Manager API
CrochetServer
Meandre ExecutionMeandre 2.0 server
State API
Snare Monitor
Job Manager API
CrochetServer
Load
Bal
anci
ng
Large-Scale Single Image Cluster
• A single image cluster can be scaled out by relying on MongoDB
• MongoDB is the key to as single-image cluster
• Starting at 1.6.X MongoDB provides production ready autosharding
• State scalability via sharded collections allows to keep scaling up a single-image large-scale Meandre Cluster
MongoDB
Load Balancing
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Meandre Execution
Meandre 2.0 server
Mongo Configuration
Mongo Configuration
Replica
MongoS MongoS MongoS MongoS MongoS
MongoS MongoS MongoS MongoS MongoS
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Mongod Shard
Changes in Meandre 2.0 Web Services
Goal: Get a Leaner REST API
• The response messages have been revised
• Homogenized the structure of the response contents
• Revisit execution mechanics
• Introduce a new job API that helps
• Submit jobs for execution
• Track them (monitor state, kill, etc.)
• Inspect console and logs in real time
Basic Web Service REST APIs
• Repository API Manage user repository of components and flows
• Location APIManage locations from where components and flows can be imported into a user repository
• Security APIAllow administrators to manage users and their profiles and roles in a given cluster
Basic Web Service REST APIs
• Publish APIHelps manage the components and flows that get published to the publicly shared global repository
• Cluster management & logs APIThe cluster management API mostly focus on cluster monitoring (via Snare web monitor), selective server/cluster shutdown, and access to server/cluster logs
• Job APIThe new job API allows to submit, monitor, and control jobs submitted for execution to a cluster
Basic Web Service REST APIs
• Public APIMiscellaneous public services providing access to the public repository, demo repository, and pinging services (targeted to specific servers)
Basic Web Service REST APIs
• The prefix of the rest API is configurable
• Each call specifies the response format using a simple file extension convention
• The next few slides provides a raw list of the revisited API (further details should be looked up on the Meandre documentation website)
Repository API
• “””<PREFIX>/services/repository/regenerate\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/list_components\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/list_flows\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/clear\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/tags\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/tags_components\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/tags_flows\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/search_components\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/search_flows\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/search\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/describe\.(rdf|ttl|nt|html)”””.r
• “””<PREFIX>/services/repository/remove\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/add\.(json|xml|html)”””.r
• “””<PREFIX>/services/repository/integrity\.(json|xml|html)”””.r
Location API
• “””<PREFIX>/services/locations/add\.(json|xml|html)”””.r
• “””<PREFIX>/services/locations/remove\.(json|xml|html)”””.r
• “””<PREFIX>/services/locations/remove_all\.(json|xml|html)”””.r
• “””<PREFIX>/services/locations/list\.(json|xml|html)”””.r
Security API
• “””<PREFIX>/services/security/valid_roles\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/valid_roles\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/users\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/add\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/remove\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/grant_roles\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/revoke_roles\.(json|xml|html)”””.r
• “””<PREFIX>/services/security/update\.(json|xml|html)”””.r
Publish API
• “””<PREFIX>/services/publish/publish\.(json|xml|html)”””.r
• “””<PREFIX>/services/publish/unpublish\.(json|xml|html)”””.r
• “””<PREFIX>/services/publish/list_published\.(json|xml|html)”””.r
• “””<PREFIX>/services/publish/publish_all\.(json|xml|html)”””.r
• “””<PREFIX>/services/publish/unpublish_all\.(json|xml|html)”””.r
• “””<PREFIX>/services/publish/force_unpublish_all\.(json|xml|html)”””.r
Cluster Management & Logs API
• “””<PREFIX>/services/server/shutdown\.(json|xml|html)”””.r
• “””<PREFIX>/services/server/shutdown_cluster\.(json|xml|html)”””.r
• “””<PREFIX>/services/logs/global\.(json|xml|html)”””.r
• “””<PREFIX>/services/logs/server\.(json|xml|html)”””.r
Job API
• “””<PREFIX>/services/jobs/submit\.(json|xml|html)”””.r
• “””<PREFIX>/services/jobs/list\.(json|xml|html)”””.r
• “””<PREFIX>/services/jobs/ids\.(json|xml|html)”””.r
• “””<PREFIX>/services/jobs/console\.(txt)”””.r
• “””<PREFIX>/services/jobs/log\.(txt)”””.r
• “””<PREFIX>/services/jobs/clean\.(json|xml|html)”””.r
• “””<PREFIX>/services/jobs/kill\.(json|xml|html)”””.r
Public API
• “””<PREFIX>/public/services/ping\.(json|xml|html)”””.r
• “””<PREFIX>/public/services/repository\.(rdf|ttl|nt)”””.r
• “””<PREFIX>/public/services/demo_repository\.(rdf|ttl|nt)”””.r
Customized Execution Engines
Changes in Flow Execution
• Already mentioned that flows in Meandre 2.0 are spawn on a separate process
• The execution process is a wrapper
• STDIN: Read the repository RDF to execute
• STDOUT: Outputs the console flow output
• STDERR: Outputs of the logs of the flow
• Console and logs are streamed and archive by the Meandre server in real time
Changes in Flow Execution
• Console and logs are linked to job submission
• Users can query anytime for consoles and logs and they will get the current contents
• Once flow execution finishes consoles and logs are compacted but are still available on demand
External Execution EngineMeandre 2.0
server
State API
Snare Monitor Job Manager API
CrochetServer
Meandre Execution
SpawnedFlow Execution
Process
MongoDB
•Consoles•Logs•Job tracking
Flow & Components RDF (STDIN)
Control
Console (STDOUT)
Logs (STDIN)
Spawn Mechanics
• Meandre 2.0 server does not provide any execution facility. Instead, it spawns a separate process
• The process is pass a command-line parameter (the port number for the WebUI)
• The process is assumed to read the repository to execute (flow and required components RDF)
• Reads console (STDOUT) and logs (STDERR) and pushes them into MongoDB
• It is able to terminate a spawned job on demand
Customizable Execution
• The Job API submit service accepts a parameter (“wrapper”) that allows you to request specific execution engines.
• The default execution engines provided in 2.0 are
• echo: Just reprints the input RDF to the console and logs beginning and end of execution
• 1.4.x: The latest execution engine released on the 1.4.x series
• snowfield: The revamped Meandre 2.0 execution engine (also the basic execution piece of distributed execution of flows)
Adding Customized Execution Engines
• All execution engines are place on the <MEANDRE_HOME>/scripts directory
• All execution engines are lunch via Scala scripts using the name convention execution_<NAME>.scala
• The provided execution engines are named
• execution_echo.scala
• execution_1.4.x.scala
• execution_snowfield.scala
Adding Customized Execution Engines
• You can add an execution engine by adding a script following the previous naming convention. For instance, execution engine my_engine will require a Scala wrapper place in the <MEANDRE_HOME>/scripts folder named execution_my_engine.scala
• You can request your customized execution engine by submitting jobs via the REST API and add the parameter &wrapper=my_engine
Flow Lifecycle
Meandre 2.0 Flow Lifecycle
• The introduction of the Job API have refined flow lifecycle
• 1.4.X execution was on demand (potentially overloading the box)
• 2.0.X introduces a refine execution state
Submitted RunningPreparing
Done
Aborted Failed Killed
Flow
User request
Serv
er av
ailab
le
Exec
ution
engin
e
read
y
Execution successfullycompleted
Use
r re
ques
t
User r
eque
st
User request
Infrastructure failure
Infrastructure failure
Infrastructure failure
Bad
-beh
aved
flow
Bad-
beha
ved
flow
Snowfield:Distributed Flow
Execution
The 1.4.x Execution Engine
• Data-driven execution
• No centralized control
• Designed with multi and many cores in mind
• The underlying assumption
• One thread per component
• Finite buffers on input ports to decouple production/consumption
• Ideal for share-memory machines (e.g. Cobalt)
The 1.4.x Execution Engine
C1
C2
C3
C4
C5 C6
C1
The 1.4.x Execution Engine
• Two other threads are created
• Mr. Proper
• This threads monitors the status of component threads
• If no thread running and no data, then flow is done, time to clean
• Mr. Probe can
• Record firing events
• Data in the buffers
• Component state
The 1.4.x Execution Engine
C1
C2
C3
C4
C5 C6
C1
Mr. Proper Mr. Probe
JVM
Scala, Concurrency, and Meandre
• Key benefit for Meandre after the Scala transition
• High level parallel constructs
• Simple concurrency model
• Actors modeled after Erlang
• Actors are light weight when compared to threads
• Configurable scheduling for actors
Actor-based Concurrency Model?
• Actors are the primitive of concurrent computation
• Actors respond to messages they receive
• Actors perform local operations
• Actors send messages to other actors
• Actors can create new actors
Snowfield: Actor-base ExecutionJVM
C4
A4
C1
A1
C6
A6
C3
A3
C2
A2
C5
A5
ActorScheduler
Mr. Probing Proper
A0
Actor-based Concurrency Model?
• Abstraction
• Break the relation between components and threads
• Minimize context switching between threads
• Main benefit
• Simple communication model
• Trivial to distribute!
Distributed Snowfield Execution
JVM1
C1
A1
C3
A3
ActorScheduler
C6
A6
ActorScheduler
JVM4
C5
A5
ActorScheduler
JVM3
C4
A4
C2
A2
JVM2
ActorScheduler
Mr. Probing Proper
A0
JVM0
ActorScheduler
Snowfield Distribution
• Now JVM can be place on different machines
• Questions?
• How do I group components in JVMs?
• Where do I place the JVMs
• Scheduling and mapping relies on 3rd parties
• Manually by user
• Model 1 job and let the grid do the allocation (e.g. Abe, Blue Waters)
• Cloud orchestrated
Meandre 2.0 CTB-Alpha Preview
Cargol Treu Banya
Xavier Llorà
Data-Intensive Technologies and ApplicationsNational Center for Supercomputing Applications
University of Illinois at [email protected]