osztott, skálázódó platform stream-feldolgozáshoz · s4 vs. hadoop & map/reduce “we...

39
Osztott, skálázódó platform stream-feldolgozáshoz 2012. május 11. Molnár András ([email protected]) http://s4.io

Upload: others

Post on 13-Oct-2020

2 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Osztott, skálázódó platform

stream-feldolgozáshoz

2012. május 11.

Molnár András ([email protected])

http://s4.io

Page 2: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Motiváció● “The emergence of new applications such as real-time

search, high frequency trading, and social networks is pushing the limits of what can be accomplished with traditional data processing systems.

● There is a clear need for highly scalable stream computing solutions that can operate at high data rates and process massive amounts of data.

● For example, to personalize search advertising, we need to process thousands of queries per second from millions of unique users in real-time, which typically involves analyzing recent user activity such as queries and clicks.”

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

● “Other typical uses for computing over continuous streams of data include: listening for stock trading signals, watching for fraud in transactions, and monitoring process logs to look for signs of trouble.”

http://www.infoq.com/news/2010/11/yahoo-releases-s4

Page 3: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Motiváció

● “With the surge of open source projects such as Hadoop, adoption of the MapReduce programming model has accelerated and is moving from the research labs into real-world applications as diverse as web search, fraud detection, and online dating.

● Despite these advances, there is no similar trend for general purpose distributed stream computing software. There are various projects and commercial engines, but their use is still restricted to highly specialized applications.”

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

Page 4: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Stream szempontok

● “Analysis on streaming data should not rely on storing the data, as the amount of required disk space is unknown. Additionally, the processing of the data is likely to take longer than the rate of transmission would allow. Since the data is not stored, special algorithms must be developed for aggregating and analyzing data.”

– http://www.bytemining.com/2010/11/exciting-tools-for-big-data-s4-sawzall-and-mrjob/

Page 5: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Mit mond magáról?

● Simple Scalable Streaming System– “S4 is a general-purpose, distributed, scalable,

partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”

– “We aim to develop a high performance computing platform that hides the complexity inherent in parallel processing system from the application programmer.”

– http://incubator.apache.org/s4

Page 6: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Mit mond magáról?

● “The drivers to read from and write to the platform can be implemented in any language making it possible to integrate with legacy data sources and systems.”

● “S4 was released by Yahoo! Inc. in October 2010

● under the Open Source Apache 2.0 license”– http://incubator.apache.org/s4

Page 7: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Mit mond magáról? - Áttekintés 1.

● “proven”– @ Yahoo! - több ezer keresés / sec feldolg.

● “decentralized”– no single point of failure, egyenrangú node-ok

● “scalable”– nincs felső határ, lineárisan nő a throughput új

node-ok beillesztésekor● ...

– http://incubator.apache.org/s4

Page 8: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Mit mond magáról? - Áttekintés 2.● ...● “extensible”

– egyszerű API, kész alkalmazások

● “cluster management”– ZooKeeper alapon

● “partial fault-tolerance”– ha egy szerver kiesik, egy stand-by szerver

átveszi a feladatait – a szerver állapota elveszhet, de az új input adatok alapján beáll

– http://incubator.apache.org/s4

Page 9: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

S4 vs. Hadoop & Map/Reduce

● “We considered extending the open source Hadoop platform to support computation of unbound streams but we quickly realized that the Hadoop platform was highly optimized for batch processing.

– MapReduce systems typically operate on static data by scheduling batch jobs.

– In stream computing, the paradigm is to have a stream of events that flow into the system at a given data rate over which we have no control.

– The processing system must keep up with the event rate or degrade gracefully by eliminating events, this is typically called load shedding.

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

Page 10: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

S4 vs. Hadoop & Map/Reduce

● ...

● The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general-purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.”

– It is worth mentioning that many real world systems implement a streaming strategy of partitioning the input data into fixed-size segments that are processed by a MapReduce platform. The disadvantage of this approach is that the latency is proportional to the length of the segment plus the overhead required to do the segmentation and initiate the processing jobs.”

● Rather than trying to fit a square peg into a round hole we decided to explore a programming paradigm that is simple and can operate on data streams in real-time.”

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

- S4 is not “real-time Map/Reduce”!

Page 11: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

További jellemzők● “Actors” programming model

– encapsulation & location transparency

● minimize latency– using local memory of nodes and

avoid disk I/O bottlenecks

● lossy failover is acceptable– állapot elvész, de az input stream-ből

helyreállítható;

– “downstream systems must degrade gracefully”

● cluster nodes can NOT be added/removed while running

Page 12: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

További jellemzők● “Elastic : computing load automatically gets

distributed● Expandable : a simple API has been provided● Object Oriented : POJOs used for internode

communication.”

http://jayatiatblogs.blogspot.com/2011/02/introduction-to-s4.html

Page 13: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Másokhoz való viszony● “S4 represents a free- to low-cost alternative presently

available proprietary real-time processing options like multiple IBM InfoSphere products and SAP’s new in-memory HANA appliance”

– 2010. november 3.http://gigaom.com/cloud/is-yahoo-set-to-open-source-real-time-mapreduce/

● IBM Stream Processing Core (SPC)-vel való összehasonlítás:

– SPC – subscription model

– S4 – combination of actors model and Map/Reduce(no centralized control, simplicity)

Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

● ...

Page 14: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Másokhoz való viszony● “Such systems have been around for many years from vendors like

StreamBase, Oracle, Tibco and Sybase as well as in some open source projects like Active Insight Esper and OpenESB. Many of these vendor systems have been folded into larger Enterprise Service Bus offerings.”

● “S4's sweet spot is in processing huge volumes of short-lived data where most of what the business wants is aggregation, not keeping every detail. The way S4 works, it keeps track of data locality and fault detection and lets the developer concentrate on only writing logic.”

– 2010. november 23. http://www.infoq.com/news/2010/11/yahoo-releases-s4

● “The S4 design is not new in the industry as it implements the Actor framework. Erlang and Scala already have a similar implementation. But the power of mixing in Zookeeper and a pluggeable architecture can set S4 apart from previous frameworks.”

2011. február 27. http://jayatiatblogs.blogspot.com/2011/02/introduction-to-s4.html

Page 15: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Fejlesztés állása

● alpha verzió v0.3.0, released in Aug 2011– This is an alpha version and should not be

used for production. There's no guarantee of backwards-compatibility until the 1.0 release.

● benchmarkról írnak a fejlesztők valós példákkal

● továbbfejlesztési irányok pl. garbage collection priorizálással...

Page 16: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Stream Model● Stream:

pl.

Page 17: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Stream Model● Stream: sequence of

(KEY, VAL) data events

– KEY: tuple-valued key (optional)

– VAL: attribute tuple

– data events are typed (EV)● events are represented

by Java objectspl.

Page 18: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Processing Model

http://www.slideshare.net/alekbr/s4-stream-computing-platform

data event stream

Page 19: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Processing Model● Data events routed (emitted) to PEs

– (Processing Elements) with internal state

● PEs consume events and– emit other events consumed by other PEs

– or publish results

● events can be routed to appropriate PEs

● new PE instances can be created

http://www.slideshare.net/alekbr/s4-stream-computing-platform

Page 20: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Példa - WordCount

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

Page 21: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Példa - WordCount

constant key

no key

random key

Page 22: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Processing Model● “Each instance of PE is uniquely identified by four components:

– it's functionality as defined by PE class and associated configuration

– the types of events that it consumes

– the keyed attribute in those events

– the value of the keyed attribute in events which it consumes”

● Azaz minden kulcsértékre külön PE példány kell, ezt az S4 automatikusan létrehozza

● Keyless PE: az adott típus összes eseményét feldolgozza

http://www.slideshare.net/alekbr/s4-stream-computing-platform

Page 23: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Processing Model● “Each instance of PE is uniquely identified by four components:

– it's functionality as defined by PE class and associated configuration

– the types of events that it consumes

– the keyed attribute in those events

– the value of the keyed attribute in events which it consumes”

● Azaz minden kulcsértékre külön PE példány kell, ezt az S4 automatikusan létrehozza, protoípus alapján (új érték esetén)

● Keyless PE: az adott típus összes eseményét feldolgozza

http://www.slideshare.net/alekbr/s4-stream-computing-platform

PE

pro

t oty

pe

Page 24: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Processing Model● “Each instance of PE is uniquely identified by four components:

– it's functionality as defined by PE class and associated configuration

– the types of events that it consumes

– the keyed attribute in those events

– the value of the keyed attribute in events which it consumes”

● Azaz minden kulcsértékre külön PE példány kell, ezt az S4 automatikusan létrehozza, protoípus alapján (új érték esetén)

● Keyless PE: az adott típus összes eseményét feldolgozza

● “Garbage collection is a challenge to the platform” pl. TTL rendelhető a PE-khez, azaz bizonyos idő után ha az adott kulcsértékkel nem érkezik adat, megszűnhet a PE - ekkor viszont elveszti állapotát!

http://www.slideshare.net/alekbr/s4-stream-computing-platform

PE

pro

t oty

pe

Page 25: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Standard és Custom PE-k

● Alapvető feldolgozási műveletek előre gyártott PE-kként megtalálhatók, csak konfigurálni kell, pl.

– io.s4.processor.AbstractWindowingPE

– io.s4.processor.JoinPE

– io.s4.processor.PrintEventPE

– io.s4.processor.ReroutePE

– io.s4.processor.SimpleCountingPE

● Programozással bármilyen PE elkészíthető

– Java, Spring framework● io.s4.processor.AbstractPE

– Eclipse project generation

Page 26: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

PE programozás

● “developers essentially implement two primary handlers:

– input event handler processEvent() - invoked for each incoming event of the types the PE has subscribed to

– output mechanism output() - optional method that implements the output of PE to an external system. Can be configured to be invoked in a variety of ways – at regular time intervals t or on receiving n input events.”

Page 27: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

PE programozás

http://www.slideshare.net/alekbr/s4-stream-computing-platform

pl. Query számláló: az egyes query-ket hányszor adják ki (queryString)

Page 28: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

PE programozás

a kulcsot (queryString) itt már nem kell kezelni,hanem a konfigurációbankell megadni

10 percenként küldünk outputot(küldhetnénk minden eseménynél is vagy adott számú eseményenként)

Page 29: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Processing Model● PN – Processing Node

– A PE-k logikai futtató egységei (hostjai)

– a lokális feldolgozást (PE hívást), esemény kezelést (fogadás, továbbítás) végzi

– a Dispatcher továbbít lokálisan vagy más node-ok felé, particionál (hash)

● Kommunikációs réteg

– cluster kezelés, fizikai-logikai node megfeleltetés, failover kezelés

– API input események küldésére

– választható hálózati protokoll – garantált vagy nem garantált adatküldés (a vezérlő események mindig ganratáltak, az adat esetén beállítható)

● ZooKeeper – koordináció / vagy “red-button” mode (single computer)

http://www.slideshare.net/alekbr/s4-stream-computing-platform

Page 30: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Adapter

● “The events being fed to the S4 cluster for processing need to be translated into S4 compatible events and similarly the events received from an S4 cluster have to be made understandable to the client.

● The Client I/O Stub solves this purpose whereas

● [e.g. JSON or other conversion]● the Adapter injects events into the S4 cluster

and receives from it via the Communication Layer.”

http://jayatiatblogs.blogspot.com/2011/02/introduction-to-s4.html

Page 31: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Adapter

http://docs.s4.io/manual/client_adapter.html

Page 32: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Minta alkalmazás● TwitterTopicCount

– “This sample application listens to the Twitter Spritzer and keeps track of the top 10 hash tags”

● TwitterFeedListener: converts JSON from to Java events for S4 to use

● TopicExtractorPE: pulls hash tags out of each tweet and creates one new event per hash tag (key)

● TopicCountAndReportPE (one per hash tag) countsthe number of times its hash tag has been seen, emitting a new event with the hash tag and the count

● single TopNTopicPE consumes all hash tag counts and keeps a sorted list of the top 10

– Működéséhez korrigálni kell! (TwitterFeedListener.java)

● http helyett https kell a twitter urlString-ben● log4j basic configuration kell a main()-be:

org.apache.log4j.BasicConfigurator.configure();

Page 33: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Telepítés, minta futtatás

● Ld. http://docs.s4.io/tutorials/getting_started.html

● gradlew build rendszer

● Pl. ~/s4 könyvtárba telepítve:– cd ~/s4/s4/build/s4-image/

– export S4_IMAGE=`pwd`

– cd ~/s4/twittertopiccount/build/install/twitter_feed_listener

– export TWIT_LISTENER=`pwd`

– $S4_IMAGE/scripts/start-s4.sh -r client-adapter & [s4 indítás]

– $S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \-d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml & [adapter indítás]

– $TWIT_LISTENER/bin/twitter_feed_listener \<<twitter_user_name>> <<twitter_password>> [esemény betöltés indítás]

– cat /tmp/top_n_hashtags

– tail -f ~/s4/s4/build/s4-image/s4-core/logs/s4-core/s4-core_<<pid>>.log

– kill `ps x|grep s4|awk '{ print $1 }'`

Page 34: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Benchmark

● S4 cikkbeli benchmark #1:– Streaming click-trhough rate (CTR) computation

● estimate of the probability of a user will click on an item● sliding window of 24 hours – splitted into 1hrs / 5mins

slots aggregated and further processed . If a PN fails, state is lost and the aggregate is substituted with long-term estimates

● 250 000 users / day, 2 weeks, peak event rate 1600/sec, 16 servers with 4-4 32-bit procs & 2 GB RAM

● 3% improvement in CTR computation (detecting low quality ads quickly and filtering them out)

● +offline stress test ...

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

Page 35: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Benchmark

● S4 cikkbeli benchmark #2:– On-line parameter optimization

● process:– measurement – for the duration of a slot– comparator – significant differences are remarked– optimizer – adaption strategy

● 200 000 user / day, 2 weeks, ● revenue increase by 0,25 % and click yield 1,4 %

● Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk

Page 36: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Saját tapasztalatok

● Egyelőre csak egy gépen, egy node-dal● Zookeper nélkül (“redbutton mode”)● Minta alkalmazások futtatása● ... folyt.köv...

Page 37: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Összefoglalás - benyomások● Elsőre ígéretesnek tűnik

● Új paradigma, de nem teljesen

● Még alfa verzió

● A Yahoo van mögötte

● Nem nagyon találtam még rá konkrét hivatkozást, alkalmazást

● Ügyelni kell arra, hogy “lossy failover is acceptable”

– azaz node kieséskor vagy PE felszabadításkor rövid távú adatok veszhetnek (ált.trendet vsz. nem befolyásolja)

● Kipróbálás: Minta alkalmazásig jutottam egyelőre,

– telepítése egy gépre könnyű és problémamentes volt

– a minta alkalmazás hibáit javítani kellett

● Kísérletezni kellene vele akár több gépen, később konkrét nagy adatos feladattal stb.

Page 38: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams

Itt a vége.

Köszönöm a figyelmet!

Page 39: Osztott, skálázódó platform stream-feldolgozáshoz · S4 vs. Hadoop & Map/Reduce “We considered extending the open source Hadoop platform to support computation of unbound streams