rdf stream processing tutorial: rsp implementations
TRANSCRIPT
Tutorial on RDF Stream Processing 2016M.I. Ali, J-P Calbimonte, D. Dell'Aglio, E. Della Valle, and A. Maurihttp://streamreasoning.org/events/rsp2016
RDF Stream Processing ImplementationsJean-Paul Calbimonte
[email protected] http://jeanpi.org @jpcik
http://streamreasoning.org/events/rsp2016
Share, Remix, Reuse — Legally This work is licensed under the Creative Commons
Attribution 3.0 Unported License. You are free:
• to Share — to copy, distribute and transmit the work
• to Remix — to adapt the work Under the following conditions
• Attribution — You must attribute the work by inserting– “[source http://streamreasoning.org/rsp2014]” at the end of
each reused slide– a credits slide stating
- These slides are partially based on “RDF Stream Processing 2014” by M. Balduini, J-P Calbimonte, O. Corcho, D. Dell'Aglio, E. Della Valle http://streamreasoning.org/rsp2014
To view a copy of this license, visit http://creativecommons.org/licenses/by/3.0/ 2
http://streamreasoning.org/events/rsp2016
RSP for developers
• RDF Streams in practice
• RSP Query Engines
• Developing with an RSP Engine
• Handling Results
• RSP Services3
http://streamreasoning.org/events/rsp2016
RDF Streams in Practice
4
http://streamreasoning.org/events/rsp2016
RSP: Keep the data moving
5
Process data in-stream
Not required to store Active processing model
input streams
RSPqueries/
rules output streams/events
RDF Streams
http://streamreasoning.org/events/rsp2016 6
RDF Stream
6
…Gi
Gi+1
Gi+2
…Gi+n
…unbo
unde
d se
quen
ceGi {(s1,p1,o1), (s2,p2,o2),…} [ti]
1+ triplesimplicit/explicit timestamp/interval
RDF streams in theory
How do I code this?
Use Web standards?
http://streamreasoning.org/events/rsp2016 7
Linked Data on the Web
7
Web of DataLinked Data
W3C Standards: RDF, SPARQL, etc.
http://streamreasoning.org/events/rsp2016 8
Linked Data principles for RDF streams?
8
e.g. publish sensor data as RDF/Linked Data?
URIs as names of thingsHTTP URIsuseful information when URI is dereferenced
Link to other URIs
users
applications
WEB
Use RDF model to continuously query real-time data streams?
static vs. streams
one-off vs. continuous
http://streamreasoning.org/events/rsp2016 9
(Sensor) Data Streams on the Web
9
http://mesowest.utah.edu/http://earthquake.usgs.gov/earthquakes/feed/v1.0/http://swiss-experiment.ch
• Monitoring • Alerts • Notifications• Hourly/daily updates
• Myriad of Formats• Ad-hoc access points• Informal description• Convention-semantics• Uneven use of standards• Manual exploration
http://streamreasoning.org/events/rsp2016 10
RDF Streams before RDF Streams
http://richard.cyganiak.de/2007/10/lod/
2011
Linked Sensor Data
MetOfficeAEMET
http://streamreasoning.org/events/rsp2016 11
Sensor Data & Linked Data
11
Zip Files
Number of Triples
Example: Nevada dataset-7.86GB in n-triples format-248MB zipped
An example: Linked Sensor Data
http://wiki.knoesis.org/index.php/LinkedSensorData
http://streamreasoning.org/events/rsp2016 12
Sensor Data & Linked Data
12
<http://knoesis.wright.edu/ssw/MeasureData_Precipitation_4UT01_2003_3_31_5_10_00> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#MeasureData> .<http://knoesis.wright.edu/ssw/MeasureData_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#floatValue> "30.0"^^<http://www.w3.org/2001/XMLSchema#float> .<http://knoesis.wright.edu/ssw/MeasureData_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#uom> <http://knoesis.wright.edu/ssw/ont/weather.owl#centimeters> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://knoesis.wright.edu/ssw/ont/weather.owl#PrecipitationObservation> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#observedProperty> <http://knoesis.wright.edu/ssw/ont/weather.owl#_Precipitation> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#procedure> <http://knoesis.wright.edu/ssw/System_4UT01> .<http://knoesis.wright.edu/ssw/Observation_Precipitation_4UT01_2003_3_31_5_10_00> <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> <http://knoesis.wright.edu/ssw/Instant_2003_3_31_5_10_00> . <http://knoesis.wright.edu/ssw/Instant_2003_3_31_5_10_00> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2006/time#Instant> .<http://knoesis.wright.edu/ssw/Instant_2003_3_31_5_10_00> <http://www.w3.org/2006/time#inXSDDateTime> "2003-03-31T05:10:00-07:00^^http://www.w3.org/2001/XMLSchema#dateTime" .
What do we get in these datasets?
Nice triples
What is measured
MeasurementUnit
Sensor
When is it measured
http://streamreasoning.org/events/rsp2016 13
RDF Streams before RDF Streams
i.e. just use RDF
:observation1 rdf:type om-owl:Observation .:observation1 om-owl:observedProperty weather:_AirTemperature .:observation1 om-owl:procedure :sensor1 . :observation1 om-owl:result :obsresult1 . :observation1 om-owl:resultTime "2015-01-01T10:00:01" :obsresult1 om-owl:floatValue 35.4 .
Plain triplesWhere is the timestamp?
:observation2 rdf:type om-owl:Observation .:observation2 om-owl:observedProperty weather:_AirTemperature .:observation2 om-owl:procedure :sensor1 . :observation2 om-owl:result :obsresult2 . :observation2 om-owl:resultTime "2015-01-01T10:00:02" :obsresult2 om-owl:floatValue 36.4 .
What is the order in the RDF graph?
Appended to a file?Or to some RDF dataset?
How to store it?
http://streamreasoning.org/events/rsp2016 14
Feed an RDF Stream to a RSP engine
Ad-hocConversion to
RDF
Live Non-RDF Streams
RDFRDF datasets RSP
Add (internal) timestamp
on insertion
What is currently done in most RSPs
Continuous additions
RDF + timestamps
http://streamreasoning.org/events/rsp2016 15
Feed an RDF Stream to C-SPARQL
15
public class SensorsStreamer extends RdfStream implements Runnable { public void run() { .. while(true){ ... RdfQuadruple q=new RdfQuadruple(subject,predicate,object, System.currentTimeMillis()); this.put(q); } }}
C-SPARQL
something to run on a thread
timestamped triple
the stream is “observable”
Data structure, execution and callbacks are mixed
Observer patternTightly coupled listener
Added timestamp
http://streamreasoning.org/events/rsp2016 16
Actor Model
16
Actor1
Actor2
m No shared mutable stateAvoid blocking operatorsLightweight objectsLoose coupling
communicate through messages
mailboxstate
behaviornon-blocking response
send: fire-forget
Implementations: e.g. Akka for Java/Scala
http://streamreasoning.org/events/rsp2016 17
RDF Stream
17
object DemoStreams { ... def streamTriples={ Iterator.from(1) map{i=> ... new Triple(subject,predicate,object) } }
Data structureInfinite triple iterator
Executionval f=Future(DemoStreams.streamTriples)f.map{a=>a.foreach{triple=> //do something}}
Asynchronous iteration
Message passingf.map{a=>a.foreach{triple=> someSink ! triple}}
send triple to actor
Immutable RDF stream avoid shared mutable state avoid concurrent writes unbounded sequence
Ideas using akka actors
Disclai
mer: id
eas
in pr
ogre
ss
Futures non blocking composition concurrent computations work with not-yet-computed results
Actors message-based share-nothing async distributable
http://streamreasoning.org/events/rsp2016 18
RDF Stream
18
… other issues: Graph implementation? Timestamps: application vs system? Serialization?
Loose coupling Immutable data streams Asynchronous message passing Well defined input/output Event-driven
http://streamreasoning.org/events/rsp2016 19
Data stream characteristics
19
Data regularity• Raw data typically collected as time series• Very regular structure. • Patterns can be exploited
E.g. mobile NO2 sensor readings
29-02-2016T16:41:24,47,369,46.52104,6.6357929-02-2016T16:41:34,47,358,46.52344,6.6359529-02-2016T16:41:44,47,354,46.52632,6.6363429-02-2016T16:41:54,47,355,46.52684,6.63729...
Data order• Order of data is crucial • Time is the key attribute for establishing an order among the data items. • Important for indexing • Enables efficient time-based selection, filtering and windowing
Timestamp Sensor Observed Value
Coordinates
Not always best to
RDF-ize everything
http://streamreasoning.org/events/rsp2016 20
Feed an RDF Stream to a RSP engine
Conversion to RDF
Live Non-RDF Streams
RDFRDF datasets RSP
Add (internal) timestamp
on insertion
Adding mappings to the data flow
Continuous additions
RDF + timestamps
Mapping-based
configurable
timestamps
graph-based
http://streamreasoning.org/events/rsp2016 21
R2RML Mappings
21
:ObsValueMap rr:subjectMap [ rr:template "http://opensense.epfl.ch/data/ObsResult_NO2_{sensor}_{time}"]; rr:predicateObjectMap [ rr:predicate qu:numericalValue; rr:objectMap [ rr:column "no2"; rr:datatype xsd:float; ]];
rr:predicateObjectMap [ rr:predicate obs:uom; rr:objectMap [ rr:parentTriplesMap :UnitMap; ]].
:ObservationMap rr:subjectMap [ rr:template "http://opensense.epfl.ch/data/Obs_NO2_{sensor}_{time}"]; rr:predicateObjectMap [ rr:predicate ssn:observedProperty; rr:objectMap [ rr:constant opensense:NO2]];
URI of subject
URI of predicate
Object: colum name
Column names in a template
Can be used for mapping both databases, CSVs, JSON, etc
http://streamreasoning.org/events/rsp2016 22
An example: TripleWave
22
Running modesSources
http://streamreasoning.org/events/rsp2016 23
RDF Streams in W3C RSP
23
:g1 {:axel :isIn :RedRoom. :darko :isIn :RedRoom} {:g1, prov:generatedAtTime, "2015-06-18T12:00:00Z"^^xsd:dateTime} :g2 {:axel :isIn :BlueRoom. } {:g2, prov:generatedAtTime, "2015-06-18T12:00:35"^^xsd:dateTime}:g3 {:minh :isIn :RedRoom. } {:g3, prov:generatedAtTime, "2015-06-18T12:02:07"^^xsd:dateTime} ...
https://www.w3.org/community/rsp/http://streamreasoning.github.io/RSP-QL/RSP_Requirements_Design_Document/
Graph-based
Flexible time property
RDF-friendly
Flexible metadata
:g_1 :startsAt "2015-06-18T12:00:00"^^xsd:dateTime :g_1 :endsAt "2015-06-18T13:00:00"^^xsd:dateTime
:g_2 :validBetween [:startsAt "2015-06-18T12:00:00"^^xsd:dateTime; :endsAt "2015-06-18T13:00:00"^^xsd:dateTime]
Intervals
http://streamreasoning.org/events/rsp2016 24
RSP Engine Implementations
http://streamreasoning.org/events/rsp2016 25
Existing RSP systems (oversimplified!)
C-SPARQL: RDF Store + Stream processor• Combined architecture
CQELS: Implemented from scratch. Focus on performance• Native + adaptive joins for static-data and streaming
data
RDF Store
Stream processor
C-SPARQLquery
static
streaming
continuous results
Native RSPCQELSquery
continuous results
translator
http://streamreasoning.org/events/rsp2016 26
Existing RSP systems (oversimplified!)
SPARQLstream: Ontology-based stream query answering• Virtual RDF views, using R2RML mappings• SPARQL stream queries over the original data streams.
EP-SPARQL: Complex-event detection• SEQ, EQUALS operators
Instans: RETE-based evaluation
DSMS/CEPSPARQLStreamquery
continuous resultsrewriter
R2RML mappings
Prolog engineEP-SPARQL
querycontinuous
resultstranslator
http://streamreasoning.org/events/rsp2016 27
Classification of existing systems
Model
Continuous
execution
Union, Join,
Optional, Filter
Aggregates
Time window
Triple window
R2S operator
Sequence, Co-ocurren
ce
TA-SPARQL
TA-RDF ✗ ✔ Limited ✗ ✗ ✗ ✗
tSPARQL tRDF ✗ ✔ ✗ ✗ ✗ ✗ ✗
Streaming SPARQL
RDF Stream
✔ ✔ ✗ ✔ ✔ ✗ ✗
C-SPARQL RDF Stream
✔ ✔ ✔ ✔ ✔ Rstream only
time function
CQELS RDF Stream
✔ ✔ ✔ ✔ ✔ Istream only
✗
SPARQLStream
(Virtual) RDF
Stream
✔ ✔ ✔ ✔ ✗ ✔ ✗
EP-SPARQL
RDF Stream
✔ ✔ ✔ ✗ ✗ ✗ ✔
Instans RDF ✔ ✔ ✔ ✗ ✗ ✗ ✗
Disclaimer: other features may be missing
http://streamreasoning.org/events/rsp2016 28
C-SPARQL
28
http://streamreasoning.org/events/rsp2016 29
A Reminder of SPARQL
http://streamreasoning.org/events/rsp2016 30
Where C-SPARQL Extends SPARQL
http://streamreasoning.org/events/rsp2016 31
C-SPARQL LanguageQuery and Stream Registration
http://streamreasoning.org/events/rsp2016 32
C-SPARQL LanguageQuery and Stream Registration
All C-SPARQL queries over RDF streams are continuous• Registered through the REGISTER statement
The output of queries is in the form of • Instantaneous tables of variable bindings • Instantaneous RDF graphs• RDF stream
Only queries in the CONSTRUCT form can be registered as generators of RDF streams
Composability: • Query results registered as streams can feed other registered
queries just like every other RDF stream
32
http://streamreasoning.org/events/rsp2016 33
C-SPARQL LanguageQuery registration - Example
Using the social stream fb, Who is where?
REGISTER QUERY QWhoIsWhereOnFb AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?room ?personFROM STREAM <http://…/fb> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}
The resulting variable bindings has to be interpreted as an instantaneous. It expires as soon as the query is recomputed
http://streamreasoning.org/events/rsp2016 34
C-SPARQL LanguageStream registration - Example
Results of a C-SPARQL query can be stream out for down stream queries
REGISTER STREAM SWhoIsWhereOnFb AS PREFIX : <http://…/sr4ld2014-onto#> CONSTRUCT { ?person :isIn ?room }FROM STREAM <http://…/fb> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}
The resulting RDF triples are streamed out on an RDF stream• More details in the C-SPARQL Engine hands-on session
http://streamreasoning.org/events/rsp2016 35
C-SPARQL LanguageStream Registration - Notes
The output is constructed in the format of an RDF stream. Every query execution may produce from a minimum of
zero triples to a maximum of an entire graph. The timestamp is always dependent on the query
execution time only, and is not taken from the triples that match the patterns in the WHERE clause.
http://streamreasoning.org/events/rsp2016 36
C-SPARQL LanguageFROM STREAM Clause
http://streamreasoning.org/events/rsp2016 37
C-SPARQL LanguageFROM STREAM Clause
FROM STREAM clauses are similar to SPARQL datasets• They identify RDF stream data sources• They represent windows over a RDF stream
They define the RDF triples available for querying and filtering.
http://streamreasoning.org/events/rsp2016 38
C-SPARQL LanguageFROM STREAM Clause - windows
physical: a given number of triples logical: a variable number of triples which occur during a
given time interval (e.g., 1 hour)• Sliding: they are progressively advanced of
a given STEP (e.g., 5 minutes)
• Tumbling: they are advanced of exactly their time interval
http://streamreasoning.org/events/rsp2016 40
C-SPARQL LanguageFROM STREAM Clause - Example
Using the social stream fb, how many people are in the same room? Count on a window of 1 minute that slides every 10 seconds
REGISTER QUERY HowManyPoepleAreInTheSameRoom AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?room (COUNT(DISTINCT ?s) as ?person)FROM STREAM <http://…/fb> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}GROUP BY ?room
http://streamreasoning.org/events/rsp2016 41
C-SPARQL LanguageC-SPARQL reports only snapshots
t
t+10
t+20
t+30
t+40
t+50
t+60
t+70
t+80
d1
d2
d3
d1 d1 d1 d1 d1
d2 d2 d2 d2
d3 d3
IncomingtimestampedRDF triples
Time window [RANGE 40s STEP 10s] Windowcontentt+40
d1
d1, d2
d1, d2
d1, d2, d3
d2, d3
t+50 t+60 t+70 t+80
http://streamreasoning.org/events/rsp2016 42
C-SPARQL LanguageMultiple FROM STREAM Clause - Example
Using the social stream fb and fs, how many people are in the same room? Count on a window of 1 minute that slides every 10 seconds
REGISTER QUERY HowManyPoepleAreInTheSameRoom AS PREFIX : <http://…/rsp2014-onto#> SELECT ?room (COUNT(DISTINCT ?s) as ?person)FROM STREAM <http://…/fb> [RANGE 1m STEP 10s]FROM STREAM <http://…/fs> [RANGE 1m STEP 10s] WHERE { ?person1 :posts [ :who ?person ; :where ?room ] .}GROUP BY ?room
http://streamreasoning.org/events/rsp2016
C-SPARQL LanguageQuery Chaining
A C-SPARQL query Q1 registered using the STREAM clause streams results on an RDF stream
A down stream C-SPARQL query Q2 can open a window on the RDF stream of Q1 using the FROM STREAM clause
E.g.,
43
Is in on 4query
4Strea
m
fStrea
mIs with on f
query
Is In across f and 4 query
Stream
Stream
:Bob :posts [ :who :Bob ; :where :BlueRoom ] .
:Carl :posts [ :who :Carl , :Bob ] .
:Bob :isIn :BlueRoom .
:Carl :isWith :Bob .
:Carl :isIn :BlueRoom .
http://streamreasoning.org/events/rsp2016 44
C-SPARQL LanguageTimeStamp Function
http://streamreasoning.org/events/rsp2016 45
C-SPARQL LanguageTimeStamp Function – Syntax and Semantics
The timestamp of a triple can be bound to a variable using a timestamp() function
Syntax• timestamp(variable|IRI|bn, variable|IRI, variable|IRI|bn|literal)
Semantics
Triple Result of evalutaionIt is not in the window Type ErrorIt appears once in the window
Timestamp of triple
It appears multiple times in the window
The timestamp of the most recent triple
http://streamreasoning.org/events/rsp2016 46
C-SPARQL LanguageTimeStamp Function - Example
Who is “following” whom?
REGISTER QUERY FindFollowers AS PREFIX f: <http://larkc.eu/csparql/sparql/jena/ext#> PREFIX : <http://…/sr4ld2014-onto#> SELECT ?someOne ?someOneElse ?room FROM STREAM <http://…/isIn> [RANGE 1m STEP 10s]WHERE { ?someOne :isIn ?room . ?someOneElse :isIn ?room .FILTER(?someOne!=?someOneElse )FILTER (f:timestamp(?someOne :isIn ?room) < f:timestamp(?someOneElse :isIn ?room) }
http://streamreasoning.org/events/rsp2016 47
C-SPARQL LanguageAccessing background Information
C-SPARQL allows for asking the engine to issue the query also against RDF graphs using the FROM clauses.
E.g., Where else can Alice go?
REGISTER QUERY WhereElseCanAliceGo AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?room FROM STREAM <http://…/isIn> [RANGE 10m STEP 10m]FROM <http://…/bgInfo>WHERE { ?:Alice :isIn ?someRoom . ?someRoom :isConnectedTo ?room .}
IRI identifying the graph containing the background information
http://streamreasoning.org/events/rsp2016
C-SPARQL LanguageC-SPARQL queries and reasoning - example Memo
• posts is a sub property of observes Data
Query under RDFS entailment regimeREGISTER QUERY QueryUnderRDFSEntailmentRegime AS PREFIX : <http://…/sr4ld2014-onto#> SELECT ?x ?room ?personFROM STREAM <http://…/fs> [RANGE 1m STEP 10s] FROM STREAM <http://…/sensors> [RANGE 1m STEP 10s] WHERE { ?x :observes [ :who ?person ; :where ?room ] .}
Results at t2 + 10s
48
RDF graph Time-stamp Stream
:RedSensor :observes [ :who :Alice; :where :RedRoom ] . t1 sensors
:Bob :posts [ :who :Bob ; :where :RedRoom] . t2 fs
?x ?room ?person
:RedSensor :RedRoom :Alice
:Bob :RedRoom :Bob
http://streamreasoning.org/events/rsp2016 49
IntroductionC-SPARQL Engine Architecture
Simple, modular architecture
It relies entirely on existing technologies
Integration of • DSMSs (Esper) and • SPARQL engines (Jena-
ARQ)
http://streamreasoning.org/events/rsp2016 50
IntroductionC-SPARQL Engine Features at a glance 1/3
In-memory RDF stream Processing • Continuous queries, filtering, aggregations, joins, sub-queries
via C-SPARQL • Push based• Reactive
C-SPARQL Engine 0.9.5 supports • SPARQL 1.1 (tested with http://www.w3.org/wiki/SRBench)• query chaining• background RDF graph access and update (via SPARQL 1.1
Update)• naïve stream reasoning (via Jena Generic Rule Reasoner) • time aware matching via timestamp function
http://streamreasoning.org/events/rsp2016 51
IntroductionC-SPARQL Engine Features at a glance 2/3
Extensible Middleware • Runtime management of
– RDF streams– C-SPARQL query– Result listerners
• API driven Quick start available
• C-SPARQL Engine– http://streamreasoning.org/download/csparqlreadytogopack
Source code are released open source under Apache 2.0• C-SPARQL Engine
– https://github.com/streamreasoning/CSPARQL-engine– https://github.com/streamreasoning/CSPARQL-ReadyToGoPack
http://streamreasoning.org/events/rsp2016 52
IntroductionC-SPARQL Engine Features at a glance 3/3
Known limitations• large background data and timestamp function can spoil
performance• no support for named graphs and named streams• no support for multiple windows on the same stream• triple based windows are buggy
http://streamreasoning.org/events/rsp2016 53
IntroductionC-SPARQL Engine as general RSP
RSP-services proposes a unified interface for the RDF stream processors and offers Rest services to interact with them.
RSP-services-csparql represents the specific implementation of the RSP-services for the C-SPARQL engine (more detailed information in the hands-on session)
Quick start available• RDF Stream Processging RESTful Interface (RSP-service) for
C-SPARQL Engine – http://streamreasoning.org/download/rsp-service4csparql
Source code are released open source under Apache 2.0• RSP-services
– https://github.com/streamreasoning/rsp-services-csparql– https://github.com/streamreasoning/rsp-services-api – https://github.com/streamreasoning/rsp-services-client-example
http://streamreasoning.org/events/rsp2016 54
Resources
Read out more• C-SPARQL semantics
– Davide Francesco Barbieri, Daniele Braga, Stefano Ceri, Emanuele Della Valle, Michael Grossniklaus: C-SPARQL: a Continuous Query Language for RDF Data Streams. Int. J. Semantic Computing 4(1): 3-25 (2010)
• Most recent syntax– D. F. Barbieri, D. Braga, S. Ceri, E. Della Valle, M. Grossniklaus, Querying
RDF streams with C-SPARQL, SIGMOD Record 39 (1) (2010) 20–26.• RSP-services
– M Balduini,E Della Valle: A Restful Interface for RDF Stream Processors. International Semantic Web Conference (Posters & Demos) 2013: 209-212
Downloads• http://streamreasoning.org/download/csparqlreadytogopack• http://streamreasoning.org/download/rsp-service4csparql
Contact points• [email protected] • [email protected]
http://streamreasoning.org/events/rsp2016 55
SPARQL Stream & Morph-streams
55
http://streamreasoning.org/events/rsp2016 56
Morph-streams: Overview
Query rewriting
Query Processin
gClie
nt
SPARQLStream
[tuples][triples/
bindings]
Algebra expression
R2RML Mappings
Morph-streams procesing SPARQLStream queries
SELECT ?proximityFROM STREAM <http://streamreasoning.org/SensorReadings.srdf> [NOW–5 S]WHERE { ?obs a ssn:ObservationValue; qudt:numericalValue ?proximity; FILTER (?proximity>10) }
SELECT prox FROM sens.win:time(5 sec) WHERE prox >10
π timed,prox
ωσprox>10
5 Seconds
sens
Data translation
SNEE
Esper
GSN
Cosmpull/push
https://github.com/jpcik/morph-streams
Other
http://streamreasoning.org/events/rsp2016
SPARQLStream Language
57
FROM NAMED STREAM
ISTREAMDSTREA
MRSTREA
M
WINDOW
Underlying data source restrictions
http://streamreasoning.org/events/rsp2016
SPARQLStream Language NamedStream ‘FROM’ [‘NAMED’] ‘STREAM’ StreamIRI ‘[’ Window ‘]’ Window ‘NOW-’ Integer TimeUnit [UpperBound] [Slide] UpperBound ‘TO NOW-’ Integer TimeUnit Slide ‘SLIDE’ Integer TimeUnit TimeUnit ‘MS’ | ‘S’ | ‘MINUTES’| ‘HOURS’ | ‘DAY’
Select ‘SELECT’ [Xstream] [Distinct | Reduced] … Xstream ‘RSTREAM’ | ‘ISTREAM’ | ‘DSTREAM’
58
SELECT ISTREAM ?room FROM NAMED STREAM <http://www.streamreasoning.org/streams/socialsensor.srdf> [NOW-10 S] WHERE {…
http://streamreasoning.org/events/rsp2016
SPARQLStream: examples
59
PREFIX sr4ld: <http://www.streamreasoning.org/ontologies/socialsensor,owl#>SELECT ?room FROM NAMED STREAM <http://www.streamreasoning.org/streams/socialsensor.srdf> [NOW-10 S] WHERE { ?obs sr4ld:observedBy ?sensor. ?obs sr4ld:where ?room.}
SPARQLStream
All rooms where something was observed in the last 10s
PREFIX sr4ld: <http://www.streamreasoning.org/ontologies/socialsensor,owl#>SELECT (COUNT(?person) AS ?nmb) ?room FROM NAMED STREAM <http://www.streamreasoning.org/streams/socialsensor.srdf> [NOW-10 S] WHERE { ?obs sr4ld:who ?pers. ?obs sr4ld:where ?room.}GROUP BY ?room
Number of persons observed in each room in the last 10s
http://streamreasoning.org/events/rsp2016
Underlying Query Processors
Esper• CEP/DSMS• EPL language
SNEE• DSMS/Sensor Network Query Evaluator• Compile queries to sensor code
GSN• Sensor middleware• REST API
Cosm/Xively• Sensor middleware• Open platform• REST API
60
SELECT prox FROM sensors [FROM NOW-5 MINUTES TO NOW] WHERE prox >10
SELECT prox FROM sensors.win:time(5 minute) WHERE prox >10
http://montblanc.slf.ch:22001/multidata?vs[0]=sensors& field[0]=proximity_field&c_min[0]=10& from=15/05/2012+05:00:00&to=15/05/2012+10:00:00
http://api.cosm.com/v2/feeds/14321/datastreams/4?start=2012-05-15T05:00:00Z&end=2012-05-15T10:00:00Z
http://streamreasoning.org/events/rsp2016 61
Morph-streams: Overview
Query rewriting
Query Processin
gClie
nt
SPARQLStream
[tuples][triples/
bindings]
Algebra expression
R2RML Mappings
Morph-streams procesing SPARQLStream queries
SELECT ?proximityFROM STREAM <http://streamreasoning.org/SensorReadings.srdf> [NOW–5 S]WHERE { ?obs a ssn:ObservationValue; qudt:numericalValue ?proximity; FILTER (?proximity>10) }
SELECT prox FROM sens.win:time(5 sec) WHERE prox >10
π timed,prox
ωσprox>10
5 Seconds
sens
Data translation
SNEE
Esper
GSN
Cosmpull/push
https://github.com/jpcik/morph-streams
Other
http://streamreasoning.org/events/rsp2016
3rd: Mapping the two models
62
Observation
Sensor
Person
Roomwhere
who
observes
subClassOf
(person, room,…)detections
Define mappings
http://streamreasoning.org/events/rsp2016
R2RML – There is a recommendation!
63
We can use the W3C recommendation
http://streamreasoning.org/events/rsp2016
R2RML - Overview
64
http://streamreasoning.org/events/rsp2016
Encoding in R2RML
65
:triplesMap a rr:TriplesMap; rr:logicalTable [ rr:tableName ”sensors"; ] rr:subjectMap [ rr:template "http://streamreasoning.org/data/Observation/{person}{timed}"; rr:class sr4ld:Observation; rr:graph sr4ld:socialstream.srdf ];
rr:predicateObjectMap [ rr:predicate sr4ld:who ; rr:objectMap [ rr:template “http://streamreasoning.org/data/Person/{person}” ]];.
the stream name
subject URI
triple predicate + object
Mapping definition
stream attribute
s
the object (a URI in this case)
http://streamreasoning.org/events/rsp2016
Underlying Query Processors
66
SELECT ?proximityFROM STREAM <http://streamreasoning.org/SensorReadings.srdf> [NOW–5 S]WHERE { ?obs a ssn:ObservationValue; qudt:numericalValue ?proximity; FILTER (?proximity>10) }
SELECT prox FROM sensors [FROM NOW-5 MINUTES TO NOW] WHERE prox >10
timed,prox
π
ωσprox>10
5 Seconds
sensors
SELECT prox FROM sensors.win:time(5 minute) WHERE prox >10
http://montblanc.slf.ch:22001/multidata?vs[0]=sensors&field[0]=proximity_field&c_min[0]=10& from=15/05/2012+05:00:00&to=15/05/2012+10:00:00
http://api.cosm.com/v2/feeds/14321/datastreams/4?start=2012-05-15T05:00:00Z&end=2012-05-15T10:00:00Z
Query rewriting
R2RML
SNEE (DSMS)
Esper (CEP)
GSN (middlwr)
Cosm Xively
SPARQLStream
http://streamreasoning.org/events/rsp2016
Underlying query processorsFeatures Esper SNEE GSN Cosm/XivelyProjection ✔ ✔ ✔ FixedProj expression ✔ ✔ ✖ ✖Joins ✔ ✔✖ only window ✖ ✖Union ✖ ✔✖ not windows ✔ ✖Selection ✔ ✔ ✔ ✖✔ limitedAggregates ✔ ✔ ✔✖ ✖Time window ✔ ✔ ✔ ✔Tuple window ✔ ✔ ✔ ✖R2S ✔ ✔ ✖ ✖Conjunction, Disj ✔ ✖ ✖ ✖Repetition pattern ✔ ✖ ✖ ✖Sequence ✔ ✖ ✖ ✖
67
http://streamreasoning.org/events/rsp2016 68
Morph-streams: With reasoning!
Query rewriting
Query Processin
gClie
nt
SPARQLStream
[tuples][triples/
bindings]
Algebra expression
R2RML Mappings
Morph-streams procesing SPARQLStream queries
Data translation
SNEE
Esper
GSN
Cosmpull/push
https://github.com/jpcik/morph-streams
Other
OntologyTBox
Rewrite taking into account the ontology TBox
http://streamreasoning.org/events/rsp2016
Reasoning with Morph-streams Rewriting the SPARQLStream queries:
69
SELECT ?xFROM NAMED STREAM <http://linkeddata.es/s/obs.srdf> [NOW - 5 MINUTES]WHERE { ?x ssn:observedBy ?y}
SELECT ?xFROM NAMED STREAM <http://linkeddata.es/s/obs.srdf> [NOW - 5 MINUTES]WHERE { {?x ssn:observedBy ?y} UNION {?x a ssn:Observation} UNION {?x a aws:TemperatureObservation} UNION {?x a aws:HumidityObservation}}
http://streamreasoning.org/events/rsp2016 70
Morph-streams: With reasoning!
Query rewriting
Query Processin
gClie
nt
SPARQLStream
[tuples][triples/
bindings]
Algebra expression
R2RML Mappings
Data translation
SNEE
Esper
GSN
Cosmpull/push
https://github.com/jpcik/morph-streams
Other
OntologyTBox
Rewrite only happens once
Query rewriting
+translation
Then continuous query is registered
http://streamreasoning.org/events/rsp2016
Now some code
Morph-streams: Coded in Scala JAR bundle, use it from Scala or Java code Maven, Sbt Examples
• One off query• Register continuous query • Pull data• Push• Basic REST
https://github.com/jpcik/morph-streams https://github.com/jpcik/morph-web
71
http://streamreasoning.org/events/rsp2016
Code examples
Parse SPARQLStream val query= “PREFIX sr4ld: <…>. SELECT ?a …”
val syntax= StreamQueryFactory.create(query);
Execute One-off queryval query= “PREFIX sr4ld: <…>. SELECT ?a …”mapping=Mapping(new URI(mappings/social.ttl))val adapter:QueryEvaluator=Application.adapter(system)val results= adapter.executeQuery(query,mapping)
72
Mapping
Bindings
http://streamreasoning.org/events/rsp2016
Code examples
Register and Pullval queryid= adapter.registerQuery(query,mapping)val results1=adapter.pull(queryid)val results2=adapter.pull(queryid)
Register and Pushclass ExampleReceiver extends StreamReceiver{
override def receiveData(s:SparqlResults):Unit= Logger.debug("got: "+res) }
val receiver=new ExampleReceiverval queryid= adapter.listenToQuery(query,mapping,receiver)
73
Query identifier
Implement receiver
For Java users: Exactly the same interface!
http://streamreasoning.org/events/rsp2016 74
Querying RSPs in Practice
74
http://streamreasoning.org/events/rsp2016 75
ExecContext context=new ExecContext(HOME, false);
String queryString =" SELECT ?person ?loc …ContinuousSelect selQuery=context.registerSelect(queryString);
selQuery.register(new ContinuousListener(){ public void update(Mapping mapping){ String result=""; for(Iterator<Var> vars=mapping.vars();vars.hasNext();) result+=" "+ context.engine().decode(mapping.get(vars.next())); System.out.println(result); } });
RSP Querying
75
Example with CQELS (code.google.com/p/cqels)
CQELS continuous query:
get result updates
adding listener
register query
SELECT ?person ?loc WHERE { STREAM <http://deri.org/streams/rfid> [RANGE 3s] {?person :detectedAt ?loc} }
CQELS
Tightly coupled listenersResults delivery: push & pull?
http://streamreasoning.org/events/rsp2016 76
CQELS fed by a TripleWave WebSocket
76
val conf = ConfigFactory.load.getConfig("experiments.rsp")
val qq="""CONSTRUCT { ?s ?p ?o } WHERE { STREAM <ws://localhost:4040/primus> [RANGE 0ms] {?s ?p ?o} }""" val sys=new RspSystem("wstreams") val cqels=new CqelsEngine sys.startStream(Props( new WebSocketStream(cqels,"ws://localhost:4040/primus",conf))) cqels.registerQuery(qq, cqels.createListener(lissy)) def lissy(triples:TripleList):Unit={ println("tikki: "+triples) }
http://streamreasoning.org/events/rsp2016 77
Similar models, similar (not equals!) query languages SELECT ?sensorFROM NAMED STREAM <http://www.cwi.nl/SRBench/observations> [NOW-3 HOURS SLIDE 10 MINUTES] WHERE { ?observation om-owl:procedure ?sensor ; om-owl:observedProperty weather:WindSpeed ; om-owl:result [ om-owl:floatValue ?value ] . } GROUP BY ?sensor HAVING ( AVG(?value) >= "74"^^xsd:float )
SELECT ?sensor WHERE { STREAM <http://www.cwi.nl/SRBench/observations> [RANGE 10800s SLIDE 600s] { ?observation om-owl:procedure ?sensor ; om-owl:observedProperty weather:WindSpeed ; om-owl:result [ om-owl:floatValue ?value ] .} } GROUP BY ?sensor HAVING ( AVG(?value) >= "74"^^xsd:float )
SELECT ?sensorFROM STREAM <http://www.cwi.nl/SRBench/observations> [RANGE 1h STEP 10m] WHERE { ?observation om-owl:procedure ?sensor ; om-owl:observedProperty weather:WindSpeed ; om-owl:result [ om-owl:floatValue ?value ] . } GROUP BY ?sensor HAVING ( AVG(?value) >= "74"^^xsd:float )
SPARQLStream
CQELS
C-SPARQL
http://streamreasoning.org/events/rsp2016 78
Query using SQL on Streams
78
SPARQL
Model Continuous
executio
n
Union,
Join, Optional, Filter
Aggregates
Time window
Triple window
R2S
operator
Sequence, Co-ocurrenc
e
Time function
TA-SPARQL TA-RDF ✗ ✔ Limited
✗ ✗ ✗ ✗ ✗
tSPARQL tRDF ✗ ✔ ✗ ✗ ✗ ✗ ✗ ✗Streaming SPARQL
RDF Stream
✔ ✔ ✗ ✔ ✔ ✗ ✗ ✗
C-SPARQL RDF Stream
✔ ✔ ✔ ✔ ✔ ✗ ✗ ✔
CQELS RDF Stream
✔ ✔ ✔ ✔ ✔ ✗ ✗ ✗
SPARQLStream
(Virtual) RDF
Stream
✔ ✔ ✔ ✔ ✗ ✔ ✗ ✗
EP-SPARQL RDF Stream
✔ ✔ ✔ ✗ ✗ ✗ ✔ ✗
Instans RDF ✔ ✔ ✔ ✗ ✗ ✗ ✗ ✗
W3C RSP review features in existing systems agree on fundamental operators discuss on possible semantics https://www.w3.org/community/rsp/wiki/RSP_Query_Features
Not
exha
ustiv
e!
RSP is not always/only SPARQL-like queryingSPARQL protocol is not enoughRSP RESTful interfaces?
Powerful languages for continuous query processing
http://streamreasoning.org/events/rsp2016 79
W3C RSP-CG: RSP-QL
79
PREFIX e: <http://somevocabulary.org/> PREFIX s: <http://someinvasivesensornetwork.org/streams#> PREFIX g: <http://somesocialnetwork.org/graphs#> PREFIX : <http://acrasycompany.org/rsp> REGISTER STREAM :GallehaultWasTheBar UNDER ENTAILMENT REGIME <http://www.w3.org/ns/entailment/RIF> AS CONSTRUCT ISTREAM { ?poi rdf:type :Gallehault ; :count ?howmanycouples ; :for (?somebody ?someoneelse) } FROM NAMED WINDOW :veryLongWindow ON s:1 [RANGE PT4H STEP PT1H] FROM NAMED WINDOW :longWindow ON s:1 [FROM NOW-PT35M TO NOW-PT5M STEP PT5M] FROM NAMED WINDOW :shortWindow ON s:1 [RANGE PT10M STEP PT5M] FROM NAMED GRAPH g:SocialGraph FROM GRAPH g:POIs WHERE { ?poi rdf:type e:bar . WINDOW :veryLongWindow { {?somebody e:enters ?poi} BEGIN AT ?t3 {?someoneelse e:enters ?poi} BEGIN AT ?t4 FILTER(?t3>?t4) } WINDOW :longWindow { { ?somebody e:isCloseTo ?someoneelse MINUS { ?somebody e:isCloseTo ?yetanotherone . FILTER (?yetanotherone != ?someoneelse) } } WITH DURATION ?duration FILTER (?duration>="PT30M"^^xsd:duration) } WINDOW :shortWindow { { ?somebody e:exits ?bar} BEGIN AT ?t1 { ?someoneelse e:exits ?bar } BEGIN AT ?t2 FILTER (abs(?t2-?t1)<"PT1M"^^xsd:duration ) } GRAPH g:SocialGraph { FILTER NOT EXIST { ?somebody e:knows ?someoneelse } } FILTER (?somebody != ?someoneelse) } AGGREGATE { GROUP BY ?poi COUNT(?somebody) AS ?howmanycouples }
Continuously look for bars where people are falling in love (because of a book )
Register stream
Time windows
Sequencing
Duration
Stored Graphs
Aggregates
Access to time
Reasoning
They entered the same bar
They are close to each other, with no-one else
They get out together
Didn’t know each other
http://streamreasoning.org/events/rsp2016 80
RSP Communication
80
http://streamreasoning.org/events/rsp2016 81
RDF Stream Processing
81
RSP Engine
RDF graphs
input RDF streams streams of results
background knowledge
continuous queries
stre
am p
rodu
cers
RSPEngine
producer
subscribe
notify
cont. query consumer
push results
subscribe
stre
am c
onsu
mer
s
continuous queries
RSP Implementations
http://streamreasoning.org/events/rsp2016 82
Reactive Systems
82
Event-Driven
Jonas Boner. Go Reactive: Event-Driven, Scalable, Resilient & Responsive Systems. 2013.
Events:re
act t
oScalableLoad:ResilientFailure:ResponsiveUsers:
http://streamreasoning.org/events/rsp2016 83
Actor Model
83
Actor1
Actor2
m No shared mutable stateAvoid blocking operatorsLightweight objectsLoose coupling
communicate through messages
mailboxstate
behaviornon-blocking response
send: fire-forget
Implementations: e.g. Akka for Java/Scala
Parent
Actor1 Supervisi
onhierarchy
Supervision
Actor2
Actor4 XActor
2
Actor1
Actor2
m
Actor3
Actor4
m
mRemoting
http://streamreasoning.org/events/rsp2016 84
RDF Streams: Actors
84
val sys=ActorSystem.create("system")val consumer=sys.actorOf(Props[RdfConsumer])
class Streamer extends StreamRDF{ override def triple(triple:Triple){ consumer ! triple }}
class RdfConsumer extends Actor{ def receive= { case t:Triple => if (t.predicateMatches(RDF.‘type‘)) println(s"received triple $t") }
RDF consumerActor receive method Implements behaviorMessage-passing model
RDF producerAsync message passing
http://streamreasoning.org/events/rsp2016 85
RSP Producer & Consumer
85
Processor
faster producers >> slower processor/consumer
Producer
Producer
Producer
Producer
RDF streams Consumer
Consumer
Consumer
unresponsive overload
Overload of the processor/receiver
Unresponsiveness in stream processor
http://streamreasoning.org/events/rsp2016 87
Dynamic Push-Pull
87
ProducerConsumer
m
data flow
demand flow
Push when consumer is fasterPull when producer is fasterDynamically switch modes
Communication is dynamic depending on demand vs supply
Event-drivenResponsive
Producer Consumer
mm
m m
m
m
m
m
m
mpush
http://streamreasoning.org/events/rsp2016 88
Evaluation: throughput
88
Basic dynamic pull pushOn top of CQELSLimitations of Thread modelNot yet fully async
http://streamreasoning.org/events/rsp2016 89
Reactive RSP workflows
89
MorphStreams
CSPARQL
s Etalis
TrOWL
s
s CQELS
Dynamit
e
s
Minimal agreements: standards, serialization, interfacesFormal models for RSPs and reasoningWorking prototypes/systems!
Event-driven message passingAsync communicationImmutable streamsTransparent RemotingParallel and distributedSupervised Failure HandlingResponsive processing
Reactive RSPs
http://streamreasoning.org/events/rsp2016 90
RSPs and the Linked Data Principles
90
http://streamreasoning.org/events/rsp2016 91
URIs as Names of Things
91
http://mysensorweb.me/mytemperature/20151110Z10:00:00
Different concepts
http://mysensorweb.me/mytemperature/latest
http://mysensorweb.me/mytemperature/lastMinutehttp://mysensorweb.me/mytemperature/lastMonth
Different granularities
Different uses
http://mysensorweb.me/mytemperature/avgLastMonth
http://mysensorweb.me/mytemperaturea stream?
an observation?
many
observations?
http://streamreasoning.org/events/rsp2016 92
HTTP URIs
92
http://mysensorweb.me/mytemperature/latest
Internet of Things
How about XMPP, CoAP, MQTT?Websockets?
http://streamreasoning.org/events/rsp2016 93
De-referenceable URIs
93
GET http://mysensorweb.me/mytemperature/latest
:Obs1 a my:TemperatureObservation; my:hasValue 33.5 ; my:hasUnit u:Celsius; my:atTime “20151110Z10:00:00”.
GET http://mysensorweb.me/mytemperatureGet the whole stream?
GET http://mysensorweb.me/mytemperature/lastMonth
Get continuous updates?
http://streamreasoning.org/events/rsp2016 94
Link to other URIs
94
• Broken links?
• Mix streaming and stored data• Persist or not persist?• Volatile links?
http://mysensorweb.me/mytemperature/20151110Z10:00:00
http://streamreasoning.org/events/rsp2016 95
For Java/Scala developers
95
http://streamreasoning.org/events/rsp2016 96
Semantic Web Devs
96
Have funLove challengesNeed cool tools
RDF4J
http://streamreasoning.org/events/rsp2016 97
Scala: Functions and Objects
97
JVM languageBoth object and functional orientedEasy Java-interopReuse Java librariesGrowing community
http://streamreasoning.org/events/rsp2016 98
RDF in Jena: in Scala
98
String personURI = "http://somewhere/JohnSmith";Model model = ModelFactory.createDefaultModel();model.createResource(personURI).addProperty(VCARD.FN,"John Smith"); Ja
vaType inference
Not too useful ; and ()
Terser & compact codeType-safe DSLCompiler takes care
val personURI = "http://somewhere/JohnSmith"val model = ModelFactory.createDefaultModelmodel.createResource(personURI).addProperty(VCARD.FN,"John Smith")
sw:JohnSmith “John Smith”
vcard:FN
val personURI = "http://somewhere/JohnSmith"implicit val model = createDefaultModeladd(personURI,VCARD.FN->"John Smith")
Scala
boilerplate
String converted to Resource
http://streamreasoning.org/events/rsp2016 99
Some more RDF
99
String personURI = "http://somewhere/JohnSmith";String givenName = "John";String familyName = "Smith";String fullName = givenName + " " + familyName;Model model = ModelFactory.createDefaultModel();model.createResource(personURI) .addProperty(VCARD.FN,fullName) .addProperty(VCARD.N,model.createResource() .addProperty(VCARD.Given,givenName) .addProperty(VCARD.Family,familyName));
val personURI = "http://somewhere/JohnSmith"val givenName = "John"val familyName = "Smith"val fullName = s"$givenName $familyName"implicit val model = createDefaultModeladd(personURI,VCARD.FN->fullName, VCARD.N ->add(bnode,VCARD.Given -> givenName, VCARD.Family->familyName))
sw:JohnSmith
“John Smith”vcard:FN
_:n“John”“Smith”vcard:N
vcard:Givenvcard:Family
Blank node
Scala DSLs customizable
Predicate-objects are pairs
http://streamreasoning.org/events/rsp2016 100
Some more RDF in Jena
100
implicit val m=createDefaultModel val ex="http://example.org/" val alice=iri(ex+"alice") val bob=iri(ex+"bob") val charlie=iri(ex+"charlie")
alice+(RDF.`type`->FOAF.Person, FOAF.name->"Alice", FOAF.mbox->iri("mailto:[email protected]"), FOAF.knows->bob, FOAF.knows->charlie, FOAF.knows->bnode) bob+ (FOAF.name->"Bob", FOAF.knows->charlie) charlie+(FOAF.name->"Charlie", FOAF.knows->alice)
Scala
Still valid Jena RDFYou can do it even nicer
http://streamreasoning.org/events/rsp2016 101
Exploring an RDF Graph
101
ArrayList<String> names=new ArrayList<String>();NodeIterator iter=model.listObjectsOfProperty(VCARD.N);while (iter.hasNext()){ RDFNode obj=iter.next(); if (obj.isResource()) names.add(obj.asResource() .getProperty(VCARD.Family).getObject().toString()); else if (obj.isLiteral()) names.add(obj.asLiteral().getString());}
val names=model.listObjectsOfProperty(VCARD.N).map{ case r:Resource=> r.getProperty(VCARD.Family).obj.toString case l:Literal=> l.getString}
Imperative iteration of collections
Type-based conditional execution
Type casting
Case typeMap applied to operators
http://streamreasoning.org/events/rsp2016 102 102
http://streamreasoning.org/events/rsp2016 103
Query with SPARQL
103
val queryStr = """select distinct ?Concept where {[] a ?Concept} LIMIT 10"""val query = sparql(queryStr)
query.serviceSelect("http://dbpedia.org/sparql").foreach{implicit qs=> println(res("Concept").getURI)}
val f=Future(query.serviceSelect("http://es.dbpedia.org/sparql")).fallbackTo( Future(query.serviceSelect("http://dbpedia.org/sparql")))
f.recover{ case e=> println("Error "+e.getMessage)}f.map(_.foreach{implicit qs=> println(res("Concept").getValue)})
Remote SPARQL endpointSimplified access to Query solutions
Futures: asnyc executionNon blocking codeFallback alternative execution
http://streamreasoning.org/events/rsp2016 104
Muchas Gracias!
104
Tutorial on RDF Stream Processing 2016M.I. Ali, J-P Calbimonte, D. Dell'Aglio, E. Della Valle, and A. Maurihttp://streamreasoning.org/events/rsp2016
RDF Stream Processing ImplementationsJean-Paul Calbimonte
[email protected] http://jeanpi.org @jpcik