rdf stream processing tutorial: rsp implementations

Post on 14-Feb-2017

521 Views

Category:

Internet

1 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Tutorial on RDF Stream Processing 2016M.I. Ali, J-P Calbimonte, D. Dell'Aglio, E. Della Valle, and A. Maurihttp://streamreasoning.org/events/rsp2016

RDF Stream Processing ImplementationsJean-Paul Calbimonte

jean-paul.calbimonte@hevs.ch http://jeanpi.org @jpcik

http://streamreasoning.org/events/rsp2016

Share, Remix, Reuse — Legally This work is licensed under the Creative Commons

Attribution 3.0 Unported License. You are free:

• to Share — to copy, distribute and transmit the work

• to Remix — to adapt the work Under the following conditions

• Attribution — You must attribute the work by inserting– “[source http://streamreasoning.org/rsp2014]” at the end of

each reused slide– a credits slide stating

- These slides are partially based on “RDF Stream Processing 2014” by M. Balduini, J-P Calbimonte, O. Corcho, D. Dell'Aglio, E. Della Valle http://streamreasoning.org/rsp2014

To view a copy of this license, visit http://creativecommons.org/licenses/by/3.0/ 2

http://streamreasoning.org/events/rsp2016

RSP for developers

• RDF Streams in practice

• RSP Query Engines

• Developing with an RSP Engine

• Handling Results

• RSP Services3

http://streamreasoning.org/events/rsp2016

RDF Streams in Practice

4

http://streamreasoning.org/events/rsp2016

RSP: Keep the data moving

5

Process data in-stream

Not required to store Active processing model

input streams

RSPqueries/

rules output streams/events

RDF Streams

http://streamreasoning.org/events/rsp2016 6

RDF Stream

6

…Gi

Gi+1

Gi+2

…Gi+n

…unbo

unde

d se

quen

ceGi {(s1,p1,o1), (s2,p2,o2),…} [ti]

1+ triplesimplicit/explicit timestamp/interval

RDF streams in theory

How do I code this?

Use Web standards?

http://streamreasoning.org/events/rsp2016 7

Linked Data on the Web

7

Web of DataLinked Data

W3C Standards: RDF, SPARQL, etc.

http://streamreasoning.org/events/rsp2016 8

Linked Data principles for RDF streams?

8

e.g. publish sensor data as RDF/Linked Data?

URIs as names of thingsHTTP URIsuseful information when URI is dereferenced

Link to other URIs

users

applications

WEB

Use RDF model to continuously query real-time data streams?

static vs. streams

one-off vs. continuous

http://streamreasoning.org/events/rsp2016 9

(Sensor) Data Streams on the Web

9

http://mesowest.utah.edu/http://earthquake.usgs.gov/earthquakes/feed/v1.0/http://swiss-experiment.ch

• Monitoring • Alerts • Notifications• Hourly/daily updates

• Myriad of Formats• Ad-hoc access points• Informal description• Convention-semantics• Uneven use of standards• Manual exploration

http://streamreasoning.org/events/rsp2016 10

RDF Streams before RDF Streams

http://richard.cyganiak.de/2007/10/lod/

2011

Linked Sensor Data

MetOfficeAEMET

http://streamreasoning.org/events/rsp2016 11

Sensor Data & Linked Data

11

Zip Files

Number of Triples

Example: Nevada dataset-7.86GB in n-triples format-248MB zipped

An example: Linked Sensor Data

http://wiki.knoesis.org/index.php/LinkedSensorData

http://streamreasoning.org/events/rsp2016 12

Sensor Data & Linked Data

12

<http://knoesis.wright.edu/ssw/MeasureData_Precipitation_4UT01_2003_3_31_5_10_00> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#MeasureData> .<http://knoesis.wright.edu/ssw/MeasureData_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#floatValue> "30.0"^^<http://www.w3.org/2001/XMLSchema#float> .<http://knoesis.wright.edu/ssw/MeasureData_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#uom> <http://knoesis.wright.edu/ssw/ont/weather.owl#centimeters> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://knoesis.wright.edu/ssw/ont/weather.owl#PrecipitationObservation> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#observedProperty> <http://knoesis.wright.edu/ssw/ont/weather.owl#_Precipitation> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#procedure> <http://knoesis.wright.edu/ssw/System_4UT01> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> <http://knoesis.wright.edu/ssw/Instant_2003_3_31_5_10_00> . <http://knoesis.wright.edu/ssw/Instant_2003_3_31_5_10_00> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2006/time#Instant> .<http://knoesis.wright.edu/ssw/Instant_2003_3_31_5_10_00> <http://www.w3.org/2006/time#inXSDDateTime> "2003-03-31T05:10:00-07:00^^http://www.w3.org/2001/XMLSchema#dateTime" .

What do we get in these datasets?

Nice triples

What is measured

MeasurementUnit

Sensor

When is it measured

http://streamreasoning.org/events/rsp2016 13

RDF Streams before RDF Streams

i.e. just use RDF

:observation1 rdf:type om-owl:Observation .:observation1 om-owl:observedProperty weather:_AirTemperature .:observation1 om-owl:procedure :sensor1 . :observation1 om-owl:result :obsresult1 . :observation1 om-owl:resultTime  "2015-01-01T10:00:01"  :obsresult1 om-owl:floatValue 35.4 .

Plain triplesWhere is the timestamp?

:observation2 rdf:type om-owl:Observation .:observation2 om-owl:observedProperty weather:_AirTemperature .:observation2 om-owl:procedure :sensor1 . :observation2 om-owl:result :obsresult2 . :observation2 om-owl:resultTime  "2015-01-01T10:00:02"  :obsresult2 om-owl:floatValue 36.4 .

What is the order in the RDF graph?

Appended to a file?Or to some RDF dataset?

How to store it?

http://streamreasoning.org/events/rsp2016 14

Feed an RDF Stream to a RSP engine

Ad-hocConversion to

RDF

Live Non-RDF Streams

RDFRDF datasets RSP

Add (internal) timestamp

on insertion

What is currently done in most RSPs

Continuous additions

RDF + timestamps

http://streamreasoning.org/events/rsp2016 15

Feed an RDF Stream to C-SPARQL

15

public class SensorsStreamer extends RdfStream implements Runnable { public void run() { .. while(true){ ... RdfQuadruple q=new RdfQuadruple(subject,predicate,object, System.currentTimeMillis()); this.put(q); } }}

C-SPARQL

something to run on a thread

timestamped triple

the stream is “observable”

Data structure, execution and callbacks are mixed

Observer patternTightly coupled listener

Added timestamp

http://streamreasoning.org/events/rsp2016 16

Actor Model

16

Actor1

Actor2

m No shared mutable stateAvoid blocking operatorsLightweight objectsLoose coupling

communicate through messages

mailboxstate

behaviornon-blocking response

send: fire-forget

Implementations: e.g. Akka for Java/Scala

http://streamreasoning.org/events/rsp2016 17

RDF Stream

17

object DemoStreams { ... def streamTriples={ Iterator.from(1) map{i=> ... new Triple(subject,predicate,object) } }

Data structureInfinite triple iterator

Executionval f=Future(DemoStreams.streamTriples)f.map{a=>a.foreach{triple=> //do something}}

Asynchronous iteration

Message passingf.map{a=>a.foreach{triple=> someSink ! triple}}

send triple to actor

Immutable RDF stream avoid shared mutable state avoid concurrent writes unbounded sequence

Ideas using akka actors

Disclai

mer: id

eas

in pr

ogre

ss

Futures non blocking composition concurrent computations work with not-yet-computed results

Actors message-based share-nothing async distributable

http://streamreasoning.org/events/rsp2016 18

RDF Stream

18

… other issues: Graph implementation? Timestamps: application vs system? Serialization?

Loose coupling Immutable data streams Asynchronous message passing Well defined input/output Event-driven

http://streamreasoning.org/events/rsp2016 19

Data stream characteristics

19

Data regularity• Raw data typically collected as time series• Very regular structure. • Patterns can be exploited

E.g. mobile NO2 sensor readings

29-02-2016T16:41:24,47,369,46.52104,6.6357929-02-2016T16:41:34,47,358,46.52344,6.6359529-02-2016T16:41:44,47,354,46.52632,6.6363429-02-2016T16:41:54,47,355,46.52684,6.63729...

Data order• Order of data is crucial • Time is the key attribute for establishing an order among the data items. • Important for indexing • Enables efficient time-based selection, filtering and windowing

Timestamp Sensor Observed Value

Coordinates

Not always best to

RDF-ize everything

http://streamreasoning.org/events/rsp2016 20

Feed an RDF Stream to a RSP engine

Conversion to RDF

Live Non-RDF Streams

RDFRDF datasets RSP

Add (internal) timestamp

on insertion

Adding mappings to the data flow

Continuous additions

RDF + timestamps

Mapping-based

configurable

timestamps

graph-based

http://streamreasoning.org/events/rsp2016 21

R2RML Mappings

21

:ObsValueMap rr:subjectMap [ rr:template "http://opensense.epfl.ch/data/ObsResult_NO2_{sensor}_{time}"]; rr:predicateObjectMap [ rr:predicate qu:numericalValue; rr:objectMap [ rr:column "no2"; rr:datatype xsd:float; ]];

rr:predicateObjectMap [ rr:predicate obs:uom; rr:objectMap [ rr:parentTriplesMap :UnitMap; ]].

:ObservationMap rr:subjectMap [ rr:template "http://opensense.epfl.ch/data/Obs_NO2_{sensor}_{time}"]; rr:predicateObjectMap [ rr:predicate ssn:observedProperty; rr:objectMap [ rr:constant opensense:NO2]];

URI of subject

URI of predicate

Object: colum name

Column names in a template

Can be used for mapping both databases, CSVs, JSON, etc

http://streamreasoning.org/events/rsp2016 22

An example: TripleWave

22

Running modesSources

http://streamreasoning.org/events/rsp2016 23

RDF Streams in W3C RSP

23

:g1 {:axel :isIn :RedRoom. :darko :isIn :RedRoom} {:g1, prov:generatedAtTime, "2015-06-18T12:00:00Z"^^xsd:dateTime} :g2 {:axel :isIn :BlueRoom. } {:g2, prov:generatedAtTime, "2015-06-18T12:00:35"^^xsd:dateTime}:g3 {:minh :isIn :RedRoom. } {:g3, prov:generatedAtTime, "2015-06-18T12:02:07"^^xsd:dateTime} ...

https://www.w3.org/community/rsp/http://streamreasoning.github.io/RSP-QL/RSP_Requirements_Design_Document/

Graph-based

Flexible time property

RDF-friendly

Flexible metadata

:g_1 :startsAt "2015-06-18T12:00:00"^^xsd:dateTime :g_1 :endsAt "2015-06-18T13:00:00"^^xsd:dateTime

:g_2 :validBetween [:startsAt "2015-06-18T12:00:00"^^xsd:dateTime; :endsAt "2015-06-18T13:00:00"^^xsd:dateTime]

Intervals

http://streamreasoning.org/events/rsp2016 24

RSP Engine Implementations

http://streamreasoning.org/events/rsp2016 25

Existing RSP systems (oversimplified!)

C-SPARQL: RDF Store + Stream processor• Combined architecture

CQELS: Implemented from scratch. Focus on performance• Native + adaptive joins for static-data and streaming

data

RDF Store

Stream processor

C-SPARQLquery

static

streaming

continuous results

Native RSPCQELSquery

continuous results

translator

http://streamreasoning.org/events/rsp2016 26

Existing RSP systems (oversimplified!)

SPARQLstream: Ontology-based stream query answering• Virtual RDF views, using R2RML mappings• SPARQL stream queries over the original data streams.

EP-SPARQL: Complex-event detection• SEQ, EQUALS operators

Instans: RETE-based evaluation

DSMS/CEPSPARQLStreamquery

continuous resultsrewriter

R2RML mappings

Prolog engineEP-SPARQL

querycontinuous

resultstranslator

http://streamreasoning.org/events/rsp2016 27

Classification of existing systems

Model

Continuous

execution

Union, Join,

Optional, Filter

Aggregates

Time window

Triple window

R2S operator

Sequence, Co-ocurren

ce

TA-SPARQL

TA-RDF ✗ ✔ Limited ✗ ✗ ✗ ✗

tSPARQL tRDF ✗ ✔ ✗ ✗ ✗ ✗ ✗

Streaming SPARQL

RDF Stream

✔ ✔ ✗ ✔ ✔ ✗ ✗

C-SPARQL RDF Stream

✔ ✔ ✔ ✔ ✔ Rstream only

time function

CQELS RDF Stream

✔ ✔ ✔ ✔ ✔ Istream only

SPARQLStream

(Virtual) RDF

Stream

✔ ✔ ✔ ✔ ✗ ✔ ✗

EP-SPARQL

RDF Stream

✔ ✔ ✔ ✗ ✗ ✗ ✔

Instans RDF ✔ ✔ ✔ ✗ ✗ ✗ ✗

Disclaimer: other features may be missing

http://streamreasoning.org/events/rsp2016 28

C-SPARQL

28

http://streamreasoning.org/events/rsp2016 29

A Reminder of SPARQL

http://streamreasoning.org/events/rsp2016 30

Where C-SPARQL Extends SPARQL

http://streamreasoning.org/events/rsp2016 31

C-SPARQL LanguageQuery and Stream Registration

http://streamreasoning.org/events/rsp2016 32

C-SPARQL LanguageQuery and Stream Registration

All C-SPARQL queries over RDF streams are continuous• Registered through the REGISTER statement

The output of queries is in the form of • Instantaneous tables of variable bindings • Instantaneous RDF graphs• RDF stream

Only queries in the CONSTRUCT form can be registered as generators of RDF streams

Composability: • Query results registered as streams can feed other registered

queries just like every other RDF stream

32

http://streamreasoning.org/events/rsp2016 33

C-SPARQL LanguageQuery registration - Example

Using the social stream fb, Who is where?

REGISTER QUERY QWhoIsWhereOnFb AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?room ?personFROM STREAM <http://…/fb> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}

The resulting variable bindings has to be interpreted as an instantaneous. It expires as soon as the query is recomputed

http://streamreasoning.org/events/rsp2016 34

C-SPARQL LanguageStream registration - Example

Results of a C-SPARQL query can be stream out for down stream queries

REGISTER STREAM SWhoIsWhereOnFb AS PREFIX : <http://…/sr4ld2014-onto#> CONSTRUCT { ?person :isIn ?room }FROM STREAM <http://…/fb> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}

The resulting RDF triples are streamed out on an RDF stream• More details in the C-SPARQL Engine hands-on session

http://streamreasoning.org/events/rsp2016 35

C-SPARQL LanguageStream Registration - Notes

The output is constructed in the format of an RDF stream. Every query execution may produce from a minimum of

zero triples to a maximum of an entire graph. The timestamp is always dependent on the query

execution time only, and is not taken from the triples that match the patterns in the WHERE clause.

http://streamreasoning.org/events/rsp2016 36

C-SPARQL LanguageFROM STREAM Clause

http://streamreasoning.org/events/rsp2016 37

C-SPARQL LanguageFROM STREAM Clause

FROM STREAM clauses are similar to SPARQL datasets• They identify RDF stream data sources• They represent windows over a RDF stream

They define the RDF triples available for querying and filtering.

http://streamreasoning.org/events/rsp2016 38

C-SPARQL LanguageFROM STREAM Clause - windows

physical: a given number of triples logical: a variable number of triples which occur during a

given time interval (e.g., 1 hour)• Sliding: they are progressively advanced of

a given STEP (e.g., 5 minutes)

• Tumbling: they are advanced of exactly their time interval

http://streamreasoning.org/events/rsp2016 40

C-SPARQL LanguageFROM STREAM Clause - Example

Using the social stream fb, how many people are in the same room? Count on a window of 1 minute that slides every 10 seconds

REGISTER QUERY HowManyPoepleAreInTheSameRoom AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?room (COUNT(DISTINCT ?s) as ?person)FROM STREAM <http://…/fb> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}GROUP BY ?room

http://streamreasoning.org/events/rsp2016 41

C-SPARQL LanguageC-SPARQL reports only snapshots

t

t+10

t+20

t+30

t+40

t+50

t+60

t+70

t+80

d1

d2

d3

d1 d1 d1 d1 d1

d2 d2 d2 d2

d3 d3

IncomingtimestampedRDF triples

Time window [RANGE 40s STEP 10s] Windowcontentt+40

d1

d1, d2

d1, d2

d1, d2, d3

d2, d3

t+50 t+60 t+70 t+80

http://streamreasoning.org/events/rsp2016 42

C-SPARQL LanguageMultiple FROM STREAM Clause - Example

Using the social stream fb and fs, how many people are in the same room? Count on a window of 1 minute that slides every 10 seconds

REGISTER QUERY HowManyPoepleAreInTheSameRoom AS PREFIX : <http://…/rsp2014-onto#> SELECT ?room (COUNT(DISTINCT ?s) as ?person)FROM STREAM <http://…/fb> [RANGE 1m STEP 10s]FROM STREAM <http://…/fs> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}GROUP BY ?room

http://streamreasoning.org/events/rsp2016

C-SPARQL LanguageQuery Chaining

A C-SPARQL query Q1 registered using the STREAM clause streams results on an RDF stream

A down stream C-SPARQL query Q2 can open a window on the RDF stream of Q1 using the FROM STREAM clause

E.g.,

43

Is in on 4query

4Strea

m

fStrea

mIs with on f

query

Is In across f and 4 query

Stream

Stream

:Bob :posts [ :who :Bob ; :where :BlueRoom ] .

:Carl :posts [ :who :Carl , :Bob ] .

:Bob :isIn :BlueRoom .

:Carl :isWith :Bob .

:Carl :isIn :BlueRoom .

http://streamreasoning.org/events/rsp2016 44

C-SPARQL LanguageTimeStamp Function

http://streamreasoning.org/events/rsp2016 45

C-SPARQL LanguageTimeStamp Function – Syntax and Semantics

The timestamp of a triple can be bound to a variable using a timestamp() function

Syntax• timestamp(variable|IRI|bn, variable|IRI, variable|IRI|bn|literal)

Semantics

Triple Result of evalutaionIt is not in the window Type ErrorIt appears once in the window

Timestamp of triple

It appears multiple times in the window

The timestamp of the most recent triple

http://streamreasoning.org/events/rsp2016 46

C-SPARQL LanguageTimeStamp Function - Example

Who is “following” whom?

REGISTER QUERY FindFollowers AS PREFIX f: <http://larkc.eu/csparql/sparql/jena/ext#> PREFIX : <http://…/sr4ld2014-onto#> SELECT ?someOne ?someOneElse ?room FROM STREAM <http://…/isIn> [RANGE 1m STEP 10s]WHERE { ?someOne :isIn ?room . ?someOneElse :isIn ?room .FILTER(?someOne!=?someOneElse )FILTER (f:timestamp(?someOne :isIn ?room) < f:timestamp(?someOneElse :isIn ?room) }

http://streamreasoning.org/events/rsp2016 47

C-SPARQL LanguageAccessing background Information

C-SPARQL allows for asking the engine to issue the query also against RDF graphs using the FROM clauses.

E.g., Where else can Alice go?

REGISTER QUERY WhereElseCanAliceGo AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?room FROM STREAM <http://…/isIn> [RANGE 10m STEP 10m]FROM <http://…/bgInfo>WHERE { ?:Alice :isIn ?someRoom . ?someRoom :isConnectedTo ?room .}

IRI identifying the graph containing the background information

http://streamreasoning.org/events/rsp2016

C-SPARQL LanguageC-SPARQL queries and reasoning - example Memo

• posts is a sub property of observes Data

Query under RDFS entailment regimeREGISTER QUERY QueryUnderRDFSEntailmentRegime AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?x ?room ?personFROM STREAM <http://…/fs> [RANGE 1m STEP 10s] FROM STREAM <http://…/sensors> [RANGE 1m STEP 10s] WHERE { ?x :observes [ :who ?person ; :where ?room ] .}

Results at t2 + 10s

48

RDF graph Time-stamp Stream

:RedSensor :observes [ :who :Alice; :where :RedRoom ] . t1 sensors

:Bob :posts [ :who :Bob ; :where :RedRoom] . t2 fs

?x ?room ?person

:RedSensor :RedRoom :Alice

:Bob :RedRoom :Bob

http://streamreasoning.org/events/rsp2016 49

IntroductionC-SPARQL Engine Architecture

Simple, modular architecture

It relies entirely on existing technologies

Integration of • DSMSs (Esper) and • SPARQL engines (Jena-

ARQ)

http://streamreasoning.org/events/rsp2016 50

IntroductionC-SPARQL Engine Features at a glance 1/3

In-memory RDF stream Processing • Continuous queries, filtering, aggregations, joins, sub-queries

via C-SPARQL • Push based• Reactive

C-SPARQL Engine 0.9.5 supports • SPARQL 1.1 (tested with http://www.w3.org/wiki/SRBench)• query chaining• background RDF graph access and update (via SPARQL 1.1

Update)• naïve stream reasoning (via Jena Generic Rule Reasoner) • time aware matching via timestamp function

http://streamreasoning.org/events/rsp2016 51

IntroductionC-SPARQL Engine Features at a glance 2/3

Extensible Middleware • Runtime management of

– RDF streams– C-SPARQL query– Result listerners

• API driven Quick start available

• C-SPARQL Engine– http://streamreasoning.org/download/csparqlreadytogopack

Source code are released open source under Apache 2.0• C-SPARQL Engine

– https://github.com/streamreasoning/CSPARQL-engine– https://github.com/streamreasoning/CSPARQL-ReadyToGoPack

http://streamreasoning.org/events/rsp2016 52

IntroductionC-SPARQL Engine Features at a glance 3/3

Known limitations• large background data and timestamp function can spoil

performance• no support for named graphs and named streams• no support for multiple windows on the same stream• triple based windows are buggy

http://streamreasoning.org/events/rsp2016 53

IntroductionC-SPARQL Engine as general RSP

RSP-services proposes a unified interface for the RDF stream processors and offers Rest services to interact with them.

RSP-services-csparql represents the specific implementation of the RSP-services for the C-SPARQL engine (more detailed information in the hands-on session)

Quick start available• RDF Stream Processging RESTful Interface (RSP-service) for

C-SPARQL Engine – http://streamreasoning.org/download/rsp-service4csparql

Source code are released open source under Apache 2.0• RSP-services

– https://github.com/streamreasoning/rsp-services-csparql– https://github.com/streamreasoning/rsp-services-api – https://github.com/streamreasoning/rsp-services-client-example

http://streamreasoning.org/events/rsp2016 54

Resources

Read out more• C-SPARQL semantics

– Davide Francesco Barbieri, Daniele Braga, Stefano Ceri, Emanuele Della Valle, Michael Grossniklaus: C-SPARQL: a Continuous Query Language for RDF Data Streams. Int. J. Semantic Computing 4(1): 3-25 (2010)

• Most recent syntax– D. F. Barbieri, D. Braga, S. Ceri, E. Della Valle, M. Grossniklaus, Querying

RDF streams with C-SPARQL, SIGMOD Record 39 (1) (2010) 20–26.• RSP-services

– M Balduini,E Della Valle: A Restful Interface for RDF Stream Processors. International Semantic Web Conference (Posters & Demos) 2013: 209-212

Downloads• http://streamreasoning.org/download/csparqlreadytogopack• http://streamreasoning.org/download/rsp-service4csparql

Contact points• marco.balduini@polimi.it • emanuele.dellavalle@polimi.it

http://streamreasoning.org/events/rsp2016 55

SPARQL Stream & Morph-streams

55

http://streamreasoning.org/events/rsp2016 56

Morph-streams: Overview

Query rewriting

Query Processin

gClie

nt

SPARQLStream

[tuples][triples/

bindings]

Algebra expression

R2RML Mappings

Morph-streams procesing SPARQLStream queries

SELECT ?proximityFROM STREAM <http://streamreasoning.org/SensorReadings.srdf> [NOW–5 S]WHERE { ?obs a ssn:ObservationValue; qudt:numericalValue ?proximity; FILTER (?proximity>10) }

SELECT prox FROM sens.win:time(5 sec) WHERE prox >10

π timed,prox

ωσprox>10

5 Seconds

sens

Data translation

SNEE

Esper

GSN

Cosmpull/push

https://github.com/jpcik/morph-streams

Other

http://streamreasoning.org/events/rsp2016

SPARQLStream Language

57

FROM NAMED STREAM

ISTREAMDSTREA

MRSTREA

M

WINDOW

Underlying data source restrictions

http://streamreasoning.org/events/rsp2016

SPARQLStream Language NamedStream ‘FROM’ [‘NAMED’] ‘STREAM’ StreamIRI ‘[’ Window ‘]’ Window ‘NOW-’ Integer TimeUnit [UpperBound] [Slide] UpperBound ‘TO NOW-’ Integer TimeUnit Slide ‘SLIDE’ Integer TimeUnit TimeUnit ‘MS’ | ‘S’ | ‘MINUTES’| ‘HOURS’ | ‘DAY’

Select ‘SELECT’ [Xstream] [Distinct | Reduced] … Xstream ‘RSTREAM’ | ‘ISTREAM’ | ‘DSTREAM’

58

SELECT ISTREAM ?room FROM NAMED STREAM <http://www.streamreasoning.org/streams/socialsensor.srdf> [NOW-10 S] WHERE {…

http://streamreasoning.org/events/rsp2016

SPARQLStream: examples

59

PREFIX sr4ld: <http://www.streamreasoning.org/ontologies/socialsensor,owl#>SELECT ?room FROM NAMED STREAM <http://www.streamreasoning.org/streams/socialsensor.srdf> [NOW-10 S] WHERE { ?obs sr4ld:observedBy ?sensor. ?obs sr4ld:where ?room.}

SPARQLStream

All rooms where something was observed in the last 10s

PREFIX sr4ld: <http://www.streamreasoning.org/ontologies/socialsensor,owl#>SELECT (COUNT(?person) AS ?nmb) ?room FROM NAMED STREAM <http://www.streamreasoning.org/streams/socialsensor.srdf> [NOW-10 S] WHERE { ?obs sr4ld:who ?pers. ?obs sr4ld:where ?room.}GROUP BY ?room

Number of persons observed in each room in the last 10s

http://streamreasoning.org/events/rsp2016

Underlying Query Processors

Esper• CEP/DSMS• EPL language

SNEE• DSMS/Sensor Network Query Evaluator• Compile queries to sensor code

GSN• Sensor middleware• REST API

Cosm/Xively• Sensor middleware• Open platform• REST API

60

SELECT prox FROM sensors [FROM NOW-5 MINUTES TO NOW] WHERE prox >10

SELECT prox FROM sensors.win:time(5 minute) WHERE prox >10

http://montblanc.slf.ch:22001/multidata?vs[0]=sensors& field[0]=proximity_field&c_min[0]=10& from=15/05/2012+05:00:00&to=15/05/2012+10:00:00

http://api.cosm.com/v2/feeds/14321/datastreams/4?start=2012-05-15T05:00:00Z&end=2012-05-15T10:00:00Z

http://streamreasoning.org/events/rsp2016 61

Morph-streams: Overview

Query rewriting

Query Processin

gClie

nt

SPARQLStream

[tuples][triples/

bindings]

Algebra expression

R2RML Mappings

Morph-streams procesing SPARQLStream queries

SELECT ?proximityFROM STREAM <http://streamreasoning.org/SensorReadings.srdf> [NOW–5 S]WHERE { ?obs a ssn:ObservationValue; qudt:numericalValue ?proximity; FILTER (?proximity>10) }

SELECT prox FROM sens.win:time(5 sec) WHERE prox >10

π timed,prox

ωσprox>10

5 Seconds

sens

Data translation

SNEE

Esper

GSN

Cosmpull/push

https://github.com/jpcik/morph-streams

Other

http://streamreasoning.org/events/rsp2016

3rd: Mapping the two models

62

Observation

Sensor

Person

Roomwhere

who

observes

subClassOf

(person, room,…)detections

Define mappings

http://streamreasoning.org/events/rsp2016

R2RML – There is a recommendation!

63

We can use the W3C recommendation

http://streamreasoning.org/events/rsp2016

R2RML - Overview

64

http://streamreasoning.org/events/rsp2016

Encoding in R2RML

65

:triplesMap a rr:TriplesMap; rr:logicalTable [ rr:tableName ”sensors"; ] rr:subjectMap [ rr:template "http://streamreasoning.org/data/Observation/{person}{timed}"; rr:class sr4ld:Observation; rr:graph sr4ld:socialstream.srdf ];

rr:predicateObjectMap [ rr:predicate sr4ld:who ; rr:objectMap [ rr:template “http://streamreasoning.org/data/Person/{person}” ]];.

the stream name

subject URI

triple predicate + object

Mapping definition

stream attribute

s

the object (a URI in this case)

http://streamreasoning.org/events/rsp2016

Underlying Query Processors

66

SELECT ?proximityFROM STREAM <http://streamreasoning.org/SensorReadings.srdf> [NOW–5 S]WHERE { ?obs a ssn:ObservationValue; qudt:numericalValue ?proximity; FILTER (?proximity>10) }

SELECT prox FROM sensors [FROM NOW-5 MINUTES TO NOW] WHERE prox >10

timed,prox

π

ωσprox>10

5 Seconds

sensors

SELECT prox FROM sensors.win:time(5 minute) WHERE prox >10

http://montblanc.slf.ch:22001/multidata?vs[0]=sensors&field[0]=proximity_field&c_min[0]=10& from=15/05/2012+05:00:00&to=15/05/2012+10:00:00

http://api.cosm.com/v2/feeds/14321/datastreams/4?start=2012-05-15T05:00:00Z&end=2012-05-15T10:00:00Z

Query rewriting

R2RML

SNEE (DSMS)

Esper (CEP)

GSN (middlwr)

Cosm Xively

SPARQLStream

http://streamreasoning.org/events/rsp2016

Underlying query processorsFeatures Esper SNEE GSN Cosm/XivelyProjection ✔ ✔ ✔ FixedProj expression ✔ ✔ ✖ ✖Joins ✔ ✔✖ only window ✖ ✖Union ✖ ✔✖ not windows ✔ ✖Selection ✔ ✔ ✔ ✖✔ limitedAggregates ✔ ✔ ✔✖ ✖Time window ✔ ✔ ✔ ✔Tuple window ✔ ✔ ✔ ✖R2S ✔ ✔ ✖ ✖Conjunction, Disj ✔ ✖ ✖ ✖Repetition pattern ✔ ✖ ✖ ✖Sequence ✔ ✖ ✖ ✖

67

http://streamreasoning.org/events/rsp2016 68

Morph-streams: With reasoning!

Query rewriting

Query Processin

gClie

nt

SPARQLStream

[tuples][triples/

bindings]

Algebra expression

R2RML Mappings

Morph-streams procesing SPARQLStream queries

Data translation

SNEE

Esper

GSN

Cosmpull/push

https://github.com/jpcik/morph-streams

Other

OntologyTBox

Rewrite taking into account the ontology TBox

http://streamreasoning.org/events/rsp2016

Reasoning with Morph-streams Rewriting the SPARQLStream queries:

69

SELECT ?xFROM NAMED STREAM <http://linkeddata.es/s/obs.srdf> [NOW - 5 MINUTES]WHERE { ?x ssn:observedBy ?y}

SELECT ?xFROM NAMED STREAM <http://linkeddata.es/s/obs.srdf> [NOW - 5 MINUTES]WHERE { {?x ssn:observedBy ?y} UNION {?x a ssn:Observation} UNION {?x a aws:TemperatureObservation} UNION {?x a aws:HumidityObservation}}

http://streamreasoning.org/events/rsp2016 70

Morph-streams: With reasoning!

Query rewriting

Query Processin

gClie

nt

SPARQLStream

[tuples][triples/

bindings]

Algebra expression

R2RML Mappings

Data translation

SNEE

Esper

GSN

Cosmpull/push

https://github.com/jpcik/morph-streams

Other

OntologyTBox

Rewrite only happens once

Query rewriting

+translation

Then continuous query is registered

http://streamreasoning.org/events/rsp2016

Now some code

Morph-streams: Coded in Scala JAR bundle, use it from Scala or Java code Maven, Sbt Examples

• One off query• Register continuous query • Pull data• Push• Basic REST

https://github.com/jpcik/morph-streams https://github.com/jpcik/morph-web

71

http://streamreasoning.org/events/rsp2016

Code examples

Parse SPARQLStream val query= “PREFIX sr4ld: <…>. SELECT ?a …”

val syntax= StreamQueryFactory.create(query);

Execute One-off queryval query= “PREFIX sr4ld: <…>. SELECT ?a …”mapping=Mapping(new URI(mappings/social.ttl))val adapter:QueryEvaluator=Application.adapter(system)val results= adapter.executeQuery(query,mapping)

72

Mapping

Bindings

http://streamreasoning.org/events/rsp2016

Code examples

Register and Pullval queryid= adapter.registerQuery(query,mapping)val results1=adapter.pull(queryid)val results2=adapter.pull(queryid)

Register and Pushclass ExampleReceiver extends StreamReceiver{

override def receiveData(s:SparqlResults):Unit= Logger.debug("got: "+res) }

val receiver=new ExampleReceiverval queryid= adapter.listenToQuery(query,mapping,receiver)

73

Query identifier

Implement receiver

For Java users: Exactly the same interface!

http://streamreasoning.org/events/rsp2016 74

Querying RSPs in Practice

74

http://streamreasoning.org/events/rsp2016 75

ExecContext context=new ExecContext(HOME, false);

String queryString =" SELECT ?person ?loc …ContinuousSelect selQuery=context.registerSelect(queryString);

selQuery.register(new ContinuousListener(){  public void update(Mapping mapping){    String result="";    for(Iterator<Var> vars=mapping.vars();vars.hasNext();)      result+=" "+ context.engine().decode(mapping.get(vars.next()));      System.out.println(result);    } });

RSP Querying

75

Example with CQELS (code.google.com/p/cqels)

CQELS continuous query:

get result updates

adding listener

register query

SELECT ?person ?loc  WHERE { STREAM <http://deri.org/streams/rfid> [RANGE 3s] {?person :detectedAt ?loc} }

CQELS

Tightly coupled listenersResults delivery: push & pull?

http://streamreasoning.org/events/rsp2016 76

CQELS fed by a TripleWave WebSocket

76

val conf = ConfigFactory.load.getConfig("experiments.rsp")

val qq="""CONSTRUCT { ?s ?p ?o } WHERE { STREAM <ws://localhost:4040/primus> [RANGE 0ms] {?s ?p ?o} }""" val sys=new RspSystem("wstreams") val cqels=new CqelsEngine sys.startStream(Props( new WebSocketStream(cqels,"ws://localhost:4040/primus",conf))) cqels.registerQuery(qq, cqels.createListener(lissy)) def lissy(triples:TripleList):Unit={ println("tikki: "+triples) }

http://streamreasoning.org/events/rsp2016 77

Similar models, similar (not equals!) query languages SELECT ?sensorFROM NAMED STREAM <http://www.cwi.nl/SRBench/observations> [NOW-3 HOURS SLIDE 10 MINUTES] WHERE {   ?observation om-owl:procedure ?sensor ; om-owl:observedProperty weather:WindSpeed ; om-owl:result [ om-owl:floatValue ?value ] . } GROUP BY ?sensor HAVING ( AVG(?value) >= "74"^^xsd:float )

SELECT ?sensor WHERE { STREAM <http://www.cwi.nl/SRBench/observations> [RANGE 10800s SLIDE 600s] { ?observation om-owl:procedure ?sensor ; om-owl:observedProperty weather:WindSpeed ; om-owl:result [ om-owl:floatValue ?value ] .} } GROUP BY ?sensor HAVING ( AVG(?value) >= "74"^^xsd:float )

SELECT ?sensorFROM STREAM <http://www.cwi.nl/SRBench/observations> [RANGE 1h STEP 10m] WHERE {   ?observation om-owl:procedure ?sensor ; om-owl:observedProperty weather:WindSpeed ; om-owl:result [ om-owl:floatValue ?value ] . } GROUP BY ?sensor HAVING ( AVG(?value) >= "74"^^xsd:float )

SPARQLStream

CQELS

C-SPARQL

http://streamreasoning.org/events/rsp2016 78

Query using SQL on Streams

78

SPARQL

Model Continuous

executio

n

Union,

Join, Optional, Filter

Aggregates

Time window

Triple window

R2S

operator

Sequence, Co-ocurrenc

e

Time function

TA-SPARQL TA-RDF ✗ ✔ Limited

✗ ✗ ✗ ✗ ✗

tSPARQL tRDF ✗ ✔ ✗ ✗ ✗ ✗ ✗ ✗Streaming SPARQL

RDF Stream

✔ ✔ ✗ ✔ ✔ ✗ ✗ ✗

C-SPARQL RDF Stream

✔ ✔ ✔ ✔ ✔ ✗ ✗ ✔

CQELS RDF Stream

✔ ✔ ✔ ✔ ✔ ✗ ✗ ✗

SPARQLStream

(Virtual) RDF

Stream

✔ ✔ ✔ ✔ ✗ ✔ ✗ ✗

EP-SPARQL RDF Stream

✔ ✔ ✔ ✗ ✗ ✗ ✔ ✗

Instans RDF ✔ ✔ ✔ ✗ ✗ ✗ ✗ ✗

W3C RSP review features in existing systems agree on fundamental operators discuss on possible semantics https://www.w3.org/community/rsp/wiki/RSP_Query_Features

Not

exha

ustiv

e!

RSP is not always/only SPARQL-like queryingSPARQL protocol is not enoughRSP RESTful interfaces?

Powerful languages for continuous query processing

http://streamreasoning.org/events/rsp2016 79

W3C RSP-CG: RSP-QL

79

PREFIX e: <http://somevocabulary.org/> PREFIX s: <http://someinvasivesensornetwork.org/streams#> PREFIX g: <http://somesocialnetwork.org/graphs#> PREFIX : <http://acrasycompany.org/rsp> REGISTER STREAM :GallehaultWasTheBar UNDER ENTAILMENT REGIME <http://www.w3.org/ns/entailment/RIF> AS CONSTRUCT ISTREAM { ?poi rdf:type :Gallehault ; :count ?howmanycouples ; :for (?somebody ?someoneelse) } FROM NAMED WINDOW :veryLongWindow ON s:1 [RANGE PT4H STEP PT1H] FROM NAMED WINDOW :longWindow ON s:1 [FROM NOW-PT35M TO NOW-PT5M STEP PT5M] FROM NAMED WINDOW :shortWindow ON s:1 [RANGE PT10M STEP PT5M] FROM NAMED GRAPH g:SocialGraph FROM GRAPH g:POIs WHERE { ?poi rdf:type e:bar . WINDOW :veryLongWindow { {?somebody e:enters ?poi} BEGIN AT ?t3 {?someoneelse e:enters ?poi} BEGIN AT ?t4 FILTER(?t3>?t4) } WINDOW :longWindow { { ?somebody e:isCloseTo ?someoneelse MINUS { ?somebody e:isCloseTo ?yetanotherone . FILTER (?yetanotherone != ?someoneelse) } } WITH DURATION ?duration FILTER (?duration>="PT30M"^^xsd:duration) } WINDOW :shortWindow { { ?somebody e:exits ?bar} BEGIN AT ?t1 { ?someoneelse e:exits ?bar } BEGIN AT ?t2 FILTER (abs(?t2-?t1)<"PT1M"^^xsd:duration ) } GRAPH g:SocialGraph { FILTER NOT EXIST { ?somebody e:knows ?someoneelse } } FILTER (?somebody != ?someoneelse) } AGGREGATE { GROUP BY ?poi COUNT(?somebody) AS ?howmanycouples }

Continuously look for bars where people are falling in love (because of a book )

Register stream

Time windows

Sequencing

Duration

Stored Graphs

Aggregates

Access to time

Reasoning

They entered the same bar

They are close to each other, with no-one else

They get out together

Didn’t know each other

http://streamreasoning.org/events/rsp2016 80

RSP Communication

80

http://streamreasoning.org/events/rsp2016 81

RDF Stream Processing

81

RSP Engine

RDF graphs

input RDF streams streams of results

background knowledge

continuous queries

stre

am p

rodu

cers

RSPEngine

producer

subscribe

notify

cont. query consumer

push results

subscribe

stre

am c

onsu

mer

s

continuous queries

RSP Implementations

http://streamreasoning.org/events/rsp2016 82

Reactive Systems

82

Event-Driven

Jonas Boner. Go Reactive: Event-Driven, Scalable, Resilient & Responsive Systems. 2013.

Events:re

act t

oScalableLoad:ResilientFailure:ResponsiveUsers:

http://streamreasoning.org/events/rsp2016 83

Actor Model

83

Actor1

Actor2

m No shared mutable stateAvoid blocking operatorsLightweight objectsLoose coupling

communicate through messages

mailboxstate

behaviornon-blocking response

send: fire-forget

Implementations: e.g. Akka for Java/Scala

Parent

Actor1 Supervisi

onhierarchy

Supervision

Actor2

Actor4 XActor

2

Actor1

Actor2

m

Actor3

Actor4

m

mRemoting

http://streamreasoning.org/events/rsp2016 84

RDF Streams: Actors

84

val sys=ActorSystem.create("system")val consumer=sys.actorOf(Props[RdfConsumer])

class Streamer extends StreamRDF{ override def triple(triple:Triple){ consumer ! triple }}

class RdfConsumer extends Actor{ def receive= { case t:Triple => if (t.predicateMatches(RDF.‘type‘)) println(s"received triple $t") }

RDF consumerActor receive method Implements behaviorMessage-passing model

RDF producerAsync message passing

http://streamreasoning.org/events/rsp2016 85

RSP Producer & Consumer

85

Processor

faster producers >> slower processor/consumer

Producer

Producer

Producer

Producer

RDF streams Consumer

Consumer

Consumer

unresponsive overload

Overload of the processor/receiver

Unresponsiveness in stream processor

http://streamreasoning.org/events/rsp2016 87

Dynamic Push-Pull

87

ProducerConsumer

m

data flow

demand flow

Push when consumer is fasterPull when producer is fasterDynamically switch modes

Communication is dynamic depending on demand vs supply

Event-drivenResponsive

Producer Consumer

mm

m m

m

m

m

m

m

mpush

http://streamreasoning.org/events/rsp2016 88

Evaluation: throughput

88

Basic dynamic pull pushOn top of CQELSLimitations of Thread modelNot yet fully async

http://streamreasoning.org/events/rsp2016 89

Reactive RSP workflows

89

MorphStreams

CSPARQL

s Etalis

TrOWL

s

s CQELS

Dynamit

e

s

Minimal agreements: standards, serialization, interfacesFormal models for RSPs and reasoningWorking prototypes/systems!

Event-driven message passingAsync communicationImmutable streamsTransparent RemotingParallel and distributedSupervised Failure HandlingResponsive processing

Reactive RSPs

http://streamreasoning.org/events/rsp2016 90

RSPs and the Linked Data Principles

90

http://streamreasoning.org/events/rsp2016 91

URIs as Names of Things

91

http://mysensorweb.me/mytemperature/20151110Z10:00:00

Different concepts

http://mysensorweb.me/mytemperature/latest

http://mysensorweb.me/mytemperature/lastMinutehttp://mysensorweb.me/mytemperature/lastMonth

Different granularities

Different uses

http://mysensorweb.me/mytemperature/avgLastMonth

http://mysensorweb.me/mytemperaturea stream?

an observation?

many

observations?

http://streamreasoning.org/events/rsp2016 92

HTTP URIs

92

http://mysensorweb.me/mytemperature/latest

Internet of Things

How about XMPP, CoAP, MQTT?Websockets?

http://streamreasoning.org/events/rsp2016 93

De-referenceable URIs

93

GET http://mysensorweb.me/mytemperature/latest

:Obs1 a my:TemperatureObservation; my:hasValue 33.5 ; my:hasUnit u:Celsius; my:atTime “20151110Z10:00:00”.

GET http://mysensorweb.me/mytemperatureGet the whole stream?

GET http://mysensorweb.me/mytemperature/lastMonth

Get continuous updates?

http://streamreasoning.org/events/rsp2016 94

Link to other URIs

94

• Broken links?

• Mix streaming and stored data• Persist or not persist?• Volatile links?

http://mysensorweb.me/mytemperature/20151110Z10:00:00

http://streamreasoning.org/events/rsp2016 95

For Java/Scala developers

95

http://streamreasoning.org/events/rsp2016 96

Semantic Web Devs

96

Have funLove challengesNeed cool tools

RDF4J

http://streamreasoning.org/events/rsp2016 97

Scala: Functions and Objects

97

JVM languageBoth object and functional orientedEasy Java-interopReuse Java librariesGrowing community

http://streamreasoning.org/events/rsp2016 98

RDF in Jena: in Scala

98

String personURI = "http://somewhere/JohnSmith";Model model = ModelFactory.createDefaultModel();model.createResource(personURI).addProperty(VCARD.FN,"John Smith"); Ja

vaType inference

Not too useful ; and ()

Terser & compact codeType-safe DSLCompiler takes care

val personURI = "http://somewhere/JohnSmith"val model = ModelFactory.createDefaultModelmodel.createResource(personURI).addProperty(VCARD.FN,"John Smith")

sw:JohnSmith “John Smith”

vcard:FN

val personURI = "http://somewhere/JohnSmith"implicit val model = createDefaultModeladd(personURI,VCARD.FN->"John Smith")

Scala

boilerplate

String converted to Resource

http://streamreasoning.org/events/rsp2016 99

Some more RDF

99

String personURI = "http://somewhere/JohnSmith";String givenName = "John";String familyName = "Smith";String fullName = givenName + " " + familyName;Model model = ModelFactory.createDefaultModel();model.createResource(personURI) .addProperty(VCARD.FN,fullName) .addProperty(VCARD.N,model.createResource() .addProperty(VCARD.Given,givenName) .addProperty(VCARD.Family,familyName));

val personURI = "http://somewhere/JohnSmith"val givenName = "John"val familyName = "Smith"val fullName = s"$givenName $familyName"implicit val model = createDefaultModeladd(personURI,VCARD.FN->fullName, VCARD.N ->add(bnode,VCARD.Given -> givenName, VCARD.Family->familyName))

sw:JohnSmith

“John Smith”vcard:FN

_:n“John”“Smith”vcard:N

vcard:Givenvcard:Family

Blank node

Scala DSLs customizable

Predicate-objects are pairs

http://streamreasoning.org/events/rsp2016 100

Some more RDF in Jena

100

implicit val m=createDefaultModel val ex="http://example.org/" val alice=iri(ex+"alice") val bob=iri(ex+"bob") val charlie=iri(ex+"charlie")

alice+(RDF.`type`->FOAF.Person, FOAF.name->"Alice", FOAF.mbox->iri("mailto:alice@example.org"), FOAF.knows->bob, FOAF.knows->charlie, FOAF.knows->bnode) bob+ (FOAF.name->"Bob", FOAF.knows->charlie) charlie+(FOAF.name->"Charlie", FOAF.knows->alice)

Scala

Still valid Jena RDFYou can do it even nicer

http://streamreasoning.org/events/rsp2016 101

Exploring an RDF Graph

101

ArrayList<String> names=new ArrayList<String>();NodeIterator iter=model.listObjectsOfProperty(VCARD.N);while (iter.hasNext()){ RDFNode obj=iter.next(); if (obj.isResource()) names.add(obj.asResource() .getProperty(VCARD.Family).getObject().toString()); else if (obj.isLiteral()) names.add(obj.asLiteral().getString());}

val names=model.listObjectsOfProperty(VCARD.N).map{ case r:Resource=> r.getProperty(VCARD.Family).obj.toString case l:Literal=> l.getString}

Imperative iteration of collections

Type-based conditional execution

Type casting

Case typeMap applied to operators

http://streamreasoning.org/events/rsp2016 102 102

http://streamreasoning.org/events/rsp2016 103

Query with SPARQL

103

val queryStr = """select distinct ?Concept where {[] a ?Concept} LIMIT 10"""val query = sparql(queryStr)

query.serviceSelect("http://dbpedia.org/sparql").foreach{implicit qs=> println(res("Concept").getURI)}

val f=Future(query.serviceSelect("http://es.dbpedia.org/sparql")).fallbackTo( Future(query.serviceSelect("http://dbpedia.org/sparql")))

f.recover{ case e=> println("Error "+e.getMessage)}f.map(_.foreach{implicit qs=> println(res("Concept").getValue)})

Remote SPARQL endpointSimplified access to Query solutions

Futures: asnyc executionNon blocking codeFallback alternative execution

http://streamreasoning.org/events/rsp2016 104

Muchas Gracias!

104

Tutorial on RDF Stream Processing 2016M.I. Ali, J-P Calbimonte, D. Dell'Aglio, E. Della Valle, and A. Maurihttp://streamreasoning.org/events/rsp2016

RDF Stream Processing ImplementationsJean-Paul Calbimonte

jean-paul.calbimonte@hevs.ch http://jeanpi.org @jpcik

top related