orchestrating tasks from the cloud with groovy and aws swf
DESCRIPTION
Speaker: Clay McCoy Some of the most difficult problems in applications today involve coordinating and distributing work in a resilient way. Use cases include communicating with unreliable remote services, parallelization, scheduling critical timers, being immune to server failures... Even if you handle these concerns how do you test this sort of orchestration glue-code? Groovy's powerful metaprogramming capabilities and AWS's robust Simple Workflow Service can be used to solve these problems. In this talk you will see real world pragmatic examples that can be used even if your business logic is not running in the cloud.TRANSCRIPT
ORCHESTRATING TASKSFROM THE CLOUD
WITH GROOVY AND AWS SWFCreated by ClayMcCoy
ABOUT MESr. Software Engineer @ NetflixEngineering Tools TeamContributes to Asgardwww.claymccoy.com@ClayMcCoy
ASGARDWeb app written in GrailsUsed for deployment to AWSOpen sourced on Github
ASGARD NEEDED A WORKFLOW SERVICETask coordinationLong lived tasksDistributed tasks
AMAZON SWF"Amazon Simple Workflow (Amazon SWF) is a task
coordination and state management service for cloudapplications. With Amazon SWF, you can stop writing
complex glue-code and state machinery and invest more in thebusiness logic that makes your applications unique."
AMAZON SWF CONCEPTSdomainstasklistsdecision tasksactivity taskslong pollingtimeoutsheartbeatsworkflow execution
SHOW AWS API
AMAZON FLOW FRAMEWORKhigher level APIremoves boilerplatedecider logic looks like typical JavapromisespollersAspectJ and generated code
SHOW FLOW EXAMPLE
GROOVY IS GOOD AT HIDING BOILERPLATECODE
no AspectJ or generated codestill uses core Flow objectstypesafe calls to schedule activities and start workflows
Abstract away worflow concernsinterface Workflow<A> { Promise<Void> status(String message)
AndPromise allPromises(Promise... promises) OrPromise anyPromises(Promise... promises)
A getActivities()
<T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work) <T> Promise<T> promiseFor(T value)
<T> DoTry<T> doTry(Closure<Promise<T>> work) <T> Promise<T> retry(RetryPolicy retryPolicy, Closure<Promise<T>> work)
Promise<Void> timer(long delaySeconds)}
SHOW SWF AND LOCAL IMPLEMENTATIONS
NOW FOR AN EXAMPLE
DECIDER INTERFACE@Workflow@WorkflowRegistrationOptions( defaultExecutionStartToCloseTimeoutSeconds = 60L)interface BayAreaTripWorkflow {
@Execute(version = '1.0') void start(String name, Collection<BayAreaLocation> previouslyVisited)
@GetState List<String> getLogHistory()}
ACTIVITIES INTERFACE@Activities(version = "1.0")@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L, defaultTaskStartToCloseTimeoutSeconds = 300L)interface BayAreaTripActivities {
MORE ACTIVITIES INTERFACEinterface BayAreaTripActivities {
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L, defaultTaskStartToCloseTimeoutSeconds = 86400L) boolean askYesNoQuestion(String question)
String goTo(String name, BayAreaLocation location)
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L, defaultTaskStartToCloseTimeoutSeconds = 300L, defaultTaskHeartbeatTimeoutSeconds = String hike(String somewhere)
String enjoy(String something)
String win(String game)}
DECIDER IMPLEMENTATIONPromise<BayAreaLocation> destinationPromise = doTry {if (!previouslyVisited.contains(BayAreaLocation.GoldenGateBridge)) { return promiseFor(BayAreaLocation.GoldenGateBridge)}if (!previouslyVisited.contains(BayAreaLocation.Redwoods)) { return promiseFor(BayAreaLocation.Redwoods)} else { waitFor(activities.askYesNoQuestion('Do you like roller coasters?')) { boolean isThrillSeeker -> if (isThrillSeeker) { return promiseFor(BayAreaLocation.Boardwalk) } promiseFor(BayAreaLocation.Monterey) }}}.result
ACTIVITIES IMPLEMENTATION@ManualActivityCompletionboolean askYesNoQuestion(String question) { sendManualActivityCompletionInfo(activity.taskToken, activity.workflowExecution) true // does not matter what is returned here because // the result will be supplied manually}
DECIDER IMPLEMENTATIONwaitFor(activities.goTo(name, destination)) { status it waitFor(activities.hike('across the bridge')) { status it }}
ACTIVITIES IMPLEMENTATIONString goTo(String name, BayAreaLocation location) { "${name} went to ${location}."}
String hike(String somewhere) { int stepsTaken = 0 int totalStepsForHike = hikeNameToLengthInSteps[somewhere] ?: 100 while (stepsTaken < totalStepsForHike) { activity.recordHeartbeat("Took ${++stepsTaken} steps.") } "And hiked ${somewhere}."}
DECIDER IMPLEMENTATIONwaitFor(activities.goTo(name, destination)) {status itstatus 'And stretched for 10 seconds before hiking.'waitFor(timer(10)) { DoTry<String> hiking = doTry { promiseFor(activities.hike('through redwoods')) } DoTry<Void> countDown = cancellableTimer(30) waitFor(anyPromises(countDown.getResult(), hiking.getResult())) { if (hiking.getResult().isReady()) { countDown.cancel(null) status "${hiking.getResult().get()}" } else { hiking.cancel(null) status 'And ran out of time when hiking.' } }}
DECIDER IMPLEMENTATIONint numberOfTokensGiven = 3int numberOfTokens = numberOfTokensGivenRetryPolicy retryPolicy = new ExponentialRetryPolicy(60). withMaximumAttempts(numberOfTokens).withExceptionsToRetry([IllegalStateException])DoTry<String> tryToWin = doTry { retry(retryPolicy) { if (numberOfTokens <= 0) { null } numberOfTokens-- return promiseFor(activities.win('a carnival game')) }} withCatch { Throwable e -> status "${e.message} ${numberOfTokensGiven} times."}waitFor(tryToWin.result) { status it if (numberOfTokens > 0) { waitFor(activities.enjoy('a roller coaster')) { status it } } Promise.Void()}
ACTIVITIES IMPLEMENTATIONString enjoy(String something) { "And enjoyed ${something}."}
String win(String game) { if (isWinner()) { return "And won ${game}." } else { throw new IllegalStateException("And lost ${game}.") }}
DECIDER IMPLEMENTATIONPromise<String> eating = promiseFor(activities.enjoy('eating seafood'))Promise<String> watching = promiseFor(activities.enjoy('watching sea lions'))waitFor(allPromises(eating, watching)) { status "${eating.get()} ${watching.get()}" doTry { promiseFor(activities.enjoy('looking for sea glass on the beach')) } withCatch { Throwable t -> status t.message promiseFor(activities.enjoy('the aquarium')) } withFinally { String result -> status result waitFor(activities.enjoy('the 17-Mile Drive')) { status it } } Promise.Void()}
ACTIVITIES IMPLEMENTATIONString enjoy(String something) { "And enjoyed ${something}."}
String win(String game) { if (isWinner()) { return "And won ${game}." } else { throw new IllegalStateException("And lost ${game}.") }}
SHOW UNIT TESTS
SHOW WORFLOW EXECUTION HISTORY
SHOW ASGARD'S USE OF SWF
QUESTIONS