bluebrain nexus · blue brain project | 2019 37 / 45. indexing: how it's used define indexers...
TRANSCRIPT
![Page 1: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/1.jpg)
BlueBrain Nexus
Building a knowledge graph fordata driven science
March 6, 2019
Blue Brain Project | 2019 1 / 45
![Page 2: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/2.jpg)
Who we are
The Blue Brain Project
Not the Human Brain ProjectOwn entity under EPFL governance120+ people9 sectionsSupercomputer at the CSCS in Lugano
https://bluebrain.epfl.ch
https://portal.bluebrain.epfl.ch
Blue Brain Project | 2019 2 / 45
![Page 3: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/3.jpg)
Who we are
Two teams
NeuroinformaticsBack-endFront-endUI/UX
Data & knowledge engineeringModeling (vocabularies, taxonomies, ontologies, schemas)Literature curationImage reconstruction & atlasing
We build Nexus.
Presentation and documentation: https://bluebrain.github.io/nexus
Public instance: https://nexus-sandbox.io/web
Blue Brain Project | 2019 3 / 45
![Page 4: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/4.jpg)
What do we do?
Blue Brain Project | 2019 4 / 45
![Page 5: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/5.jpg)
Why a knowledge graph?
Blue Brain Project | 2019 5 / 45
![Page 6: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/6.jpg)
RDF in a nutshell
Blue Brain Project | 2019 6 / 45
![Page 7: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/7.jpg)
RDF in a nutshell
Example from Wikipedia:
<rdf:RDF xmlns:contact="http://www.w3.org/2000/10/swap/pim/contact#" xmlns:eric="http://www.w3.org/People/EM/contact#" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"> <rdf:Description rdf:about="http://www.w3.org/People/EM/contact#me"> <contact:fullName>Eric Miller</contact:fullName> </rdf:Description> <rdf:Description rdf:about="http://www.w3.org/People/EM/contact#me"> <contact:mailbox rdf:resource="mailto:[email protected]"/> </rdf:Description> <rdf:Description rdf:about="http://www.w3.org/People/EM/contact#me"> <contact:personalTitle>Dr.</contact:personalTitle> </rdf:Description> <rdf:Description rdf:about="http://www.w3.org/People/EM/contact#me"> <rdf:type rdf:resource="http://www.w3.org/2000/10/swap/pim/contact#Person"/> </rdf:Description></rdf:RDF>
Other representation formats: N-Triples, Turtle, JSON-LD, etc.
Blue Brain Project | 2019 7 / 45
![Page 8: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/8.jpg)
N-Triples<http://www.w3.org/People/EM/contact#me> <http://www.w3.org/2000/10/swap/pim/contac<http://www.w3.org/People/EM/contact#me> <http://www.w3.org/1999/02/22-rdf-syntax-n<http://www.w3.org/People/EM/contact#me> <http://www.w3.org/2000/10/swap/pim/contac<http://www.w3.org/People/EM/contact#me> <http://www.w3.org/2000/10/swap/pim/contac
Blue Brain Project | 2019 8 / 45
![Page 9: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/9.jpg)
JSON-LD{ "@context": { "contact": "http://www.w3.org/2000/10/swap/pim/contact#", "eric": "http://www.w3.org/People/EM/contact#", "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "xsd": "http://www.w3.org/2001/XMLSchema#" }, "@id": "eric:me", "@type": "contact:Person", "contact:fullName": "Eric Miller", "contact:mailbox": { "@id": "mailto:[email protected]" }, "contact:personalTitle": "Dr."}
Blue Brain Project | 2019 9 / 45
![Page 10: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/10.jpg)
SPARQLWhat are five names of people that published a paper in Springer, working for EPFLin Geneva?
Blue Brain Project | 2019 10 / 45
![Page 11: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/11.jpg)
SPARQLWhat are five names of people that published a paper in Springer, working for EPFLin Geneva?
Blue Brain Project | 2019 11 / 45
![Page 12: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/12.jpg)
SPARQLSELECT DISTINCT ?nameWHERE { ?person a :Person . ?person :name ?name . ?publication :author ?person . ?publication :publisher "Springer". ?person :worksFor "EPFL" . ?person :workLocation "Geneva" .}LIMIT 5
Blue Brain Project | 2019 12 / 45
![Page 13: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/13.jpg)
SHACL@prefix schema: <http://schema.org/> .@prefix sh: <http://www.w3.org/ns/shacl#> .@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
schema:PersonShape a sh:NodeShape ; sh:targetClass schema:Person ; sh:property [ sh:path schema:givenName ; sh:datatype xsd:string ; sh:name "given name" ; ] ; sh:property [ sh:path schema:birthDate ; sh:lessThan schema:deathDate ; sh:maxCount 1 ; ] ; sh:property [ sh:path schema:gender ; sh:in ( "female" "male" ) ; ] ; sh:property [ sh:path schema:address ; sh:node schema:AddressShape ; ] .
Blue Brain Project | 2019 13 / 45
![Page 14: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/14.jpg)
In practice
Blue Brain Project | 2019 14 / 45
![Page 15: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/15.jpg)
Nexus services
Blue Brain Project | 2019 15 / 45
![Page 16: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/16.jpg)
Overview
Blue Brain Project | 2019 16 / 45
![Page 17: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/17.jpg)
Resource hierarchy
Blue Brain Project | 2019 17 / 45
![Page 18: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/18.jpg)
Resource life-cycle
Example with resources:
POST /v1/resources/{org_label}/{project_label}/{schema_id}
PUT /v1/resources/{org_label}/{project_label}/{schema_id}/{resource_id}?rev={prev}
GET /v1/resources/{org_label}/{project_label}/{schema_id}/{resource_id}
GET /v1/resources/{org_label}/{project_label}/{schema_id}/{resource_id}?rev={rev}
DELETE /v1/resources/{org_label}/{project_label}/{schema_id}/{resource_id}?rev={prev}
Blue Brain Project | 2019 18 / 45
![Page 19: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/19.jpg)
Identity & access managementAuthentication providers (OpenID Connect)
RealmsIdentities
UsersGroupsAuthenticatedAnonymous
PermissionsArbitrary strings, e.g. projects/create or schemas/write
ACLsResource path -> (Identity, Permissions)
case class AccessControlLists(value: Map[Path, AccessControlList])
case class AccessControlList(value: Map[Identity, Set[Permission]])
Blue Brain Project | 2019 19 / 45
![Page 20: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/20.jpg)
AdminOrganizationsProjects
BaseVocabularyAPI mappings (prefixes)
Blue Brain Project | 2019 20 / 45
![Page 21: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/21.jpg)
Knowledge graphSchemasResolversViews
ElasticsearchSPARQL
Files (binary attachments)Resources
Blue Brain Project | 2019 21 / 45
![Page 22: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/22.jpg)
Nexus components
Blue Brain Project | 2019 22 / 45
![Page 23: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/23.jpg)
Event sourcing & CQRShttps://github.com/BlueBrain/nexus-sourcing
Blue Brain Project | 2019 23 / 45
![Page 24: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/24.jpg)
Example: Statesealed trait State extends Product with Serializable
case object Initial extends State
final case class Current( id: Id[ProjectRef], rev: Long, types: Set[AbsoluteIri], deprecated: Boolean, tags: Map[String, Long], created: Instant, updated: Instant, createdBy: Subject, updatedBy: Subject, schema: Ref, source: Json ) extends State
Blue Brain Project | 2019 24 / 45
![Page 25: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/25.jpg)
Example: Commandsealed trait Command extends Product with Serializable { def id: Id[ProjectRef] def rev: Long def instant: Instant def subject: Subject}
final case class Create( id: Id[ProjectRef], schema: Ref, types: Set[AbsoluteIri], source: Json, instant: Instant, subject: Subject) extends Command { val rev: Long = 0L}
Blue Brain Project | 2019 25 / 45
![Page 26: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/26.jpg)
Example: Eventsealed trait Event extends Product with Serializable { def id: Id[ProjectRef] def rev: Long def instant: Instant def subject: Subject}
final case class Created( id: Id[ProjectRef], schema: Ref, types: Set[AbsoluteIri], source: Json, instant: Instant, subject: Subject) extends Event { val rev: Long = 1L}
Blue Brain Project | 2019 26 / 45
![Page 27: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/27.jpg)
Example: Evaluation def eval(state: State, cmd: Command): Either[Rejection, Event] = cmd match { case cmd: Create => create(state, cmd) // Update, Deprecate, Tag, ... }
def create(state: State, c: Create): Either[Rejection, Created] = state match { case Initial => Right(Created(c.id, c.schema, c.types, c.source, c.instant, c case _ => Left(ResourceAlreadyExists(c.id.ref)) }
class Repo[F[_]: Monad](agg: Agg[F], clock: Clock, toIdentifier: ResId => String)
def create(id: ResId, schema: Ref, types: Set[AbsoluteIri], source: Json, instant: Instant = clock.instant)( implicit subject: Subject): EitherT[F, Rejection, Resource] = evaluate(id, Create(id, schema, types, source, instant, subject))}
Blue Brain Project | 2019 27 / 45
![Page 28: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/28.jpg)
Example: State machinedef next(state: State, ev: Event): State = (state, ev) match { case (Initial, e @ Created(id, schema, types, value, tm, ident)) => Current(id, e.rev, types, false, Map.empty, None, tm, tm, ident, ident, schema, value) case (c: Current, Updated(_, rev, types, value, tm, ident)) => c.copy(rev = rev, types = types, source = value, updated = tm, updatedBy = ident) // Deprecated, TagAdded, ... }
Blue Brain Project | 2019 28 / 45
![Page 29: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/29.jpg)
Example: State machine
Blue Brain Project | 2019 29 / 45
![Page 30: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/30.jpg)
Example: Aggregatetype Agg[F[_]] = Aggregate[F, String, Event, State, Command, Rejection]
def aggregate[F[_]: Effect: Timer](implicit as: ActorSystem, mt: ActorMaterializer, sourcing: SourcingConfig, F: Monad[F]): F[Agg[F]] = AkkaAggregate.sharded[F]( name = "resources", Initial, next, (state, cmd) => F.pure(eval(state, cmd)), sourcing.passivationStrategy, Retry(sourcing.retry.retryStrategy), sourcing.akkaSourcingConfig, sourcing.shards )
Blue Brain Project | 2019 30 / 45
![Page 31: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/31.jpg)
Aggregatetrait Aggregate[F[_], Id, Event, State, Command, Rejection] extends StatefulEventLog[F, Id, Event, State] {
def evaluate(id: Id, command: Command): F[Either[Rejection, (State, Event)]]}
Using Akka cluster sharding, we construct it like this:
object AkkaAggregate {
def shardedF[F[_]: Effect, Event, State, Command, Rejection](...) = { val shardExtractor = { case msg: Msg => math.abs(msg.id.hashCode) % shards toString } val entityExtractor = { case msg: Msg => (msg.id, msg) } val props = AggregateActor.shardedProps(name, initialState, next, evaluate, passivationStrategy, config) val ref = ClusterSharding(as).start(name, props, settings, entityExtractor, shardExtractor) // route all messages through the sharding coordinator val selection = ActorRefSelection.const(ref) new AkkaAggregate(name, selection, retry, config) }}
Blue Brain Project | 2019 31 / 45
![Page 32: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/32.jpg)
Clustered Akka aggregateclass AkkaAggregate[F[_]: Async, Event: ClassTag, State, Command, Rejection] ( override val name: String, selection: ActorRefSelection[F], retry: Retry[F, Throwable], config: AkkaSourcingConfig,)(implicit as: ActorSystem, mat: ActorMaterializer) extends Aggregate[F, String, Event, State, Command, Rejection] {
override def evaluate(id: String, command: Command): F[Either[Rejection, (State, Event)]] = send(id, Evaluate(id, command), r => r.value)
private def send[Reply, A](id: String, msg: Msg, f: Reply => A) (implicit Reply: ClassTag[Reply]): F[A] = selection(name, id).flatMap { ref => val future = IO(ref ? msg) val fa = IO.fromFuture(future).to[F] fa.flatMap[A] { case Reply(value) => F.pure(f(value)) case e => F.raiseError(e) } .retry }}
Blue Brain Project | 2019 32 / 45
![Page 33: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/33.jpg)
Persistent aggregate actorclass AggregateActor[F[_]: Effect, Event, State, Command, Rejection]( name: String, initialState: State, next: (State, Event) => State, evaluate: (State, Command) => F[Either[Rejection, Event]], passivationStrategy: PassivationStrategy[State, Command], config: AkkaSourcingConfig,) extends PersistentActor with Stash with ActorLogging {
private def evaluateCommand(cmd: Command, test: Boolean = false): Unit = { val eval = for { _ <- IO.shift(config.commandEvaluationExecutionContext) r <- evaluate(state, cmd).toIO.timeout(config.commandEvaluationMaxDuration) _ <- IO.shift(context.dispatcher) _ <- IO(self ! r) } yield () val io = eval.onError { ... } // error handling io.unsafeRunAsyncAndForget() }}
Blue Brain Project | 2019 33 / 45
![Page 34: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/34.jpg)
Indexing
Blue Brain Project | 2019 34 / 45
![Page 35: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/35.jpg)
Indexingcase class IndexerConfig[F[_], Event, MappedEvt, Err, O <: OffsetStorage]( tag: String, pluginId: String, name: String, mapping: Event => F[Option[MappedEvt]], index: List[MappedEvt] => F[Unit], init: F[Unit], batch: Int, batchTo: FiniteDuration, retry: Retry[F, Err], storage: O)
trait StreamByTag[F[_], A] {
def fetchInit: F[A]
def source(init: A): Source[A, _]}
Blue Brain Project | 2019 35 / 45
![Page 36: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/36.jpg)
Indexingclass VolatileStreamByTag[F[_]: Effect, Event, MappedEvt, Err]( config: IndexerConfig[F, Event, MappedEvt, Err, Volatile]) extends StreamByTag[F, Offset] {
def batchedSource(initialOffset: Offset): Source[(Offset, List[IdentifiedEvent]), NotUsed] = { val eventsByTag = PersistenceQuery(as) .readJournalFor[EventsByTagQuery](config.pluginId) .eventsByTag(tag, initialOffset) // ... casting, mapping, batching eventsBatched }
def fetchInit: F[Offset] = if (config.storage.restart) config.init.retry *> F.pure(NoOffset) else config.init.retry.flatMap(_ => projection.fetchLatestOffset.retry)}
Blue Brain Project | 2019 36 / 45
![Page 37: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/37.jpg)
Indexingclass VolatileStreamByTag[F[_]: Effect, Event, MappedEvt, Err]( config: IndexerConfig[F, Event, MappedEvt, Err, Volatile]) extends StreamByTag[F, Offset] {
def source(initialOffset: Offset): Source[Offset, NotUsed] = {
val eventsIndexed = batchedSource(initialOffset).mapAsync(1) { case (offset, events) => val index = config.index(events.map { case (_, _, mapped) => mapped }) .retry .recoverWith(recoverIndex(offset, events)) F.toIO(index.map(_ => offset)).unsafeToFuture() }
val eventsStoredProgress = eventsIndexed.mapAsync(1) { offset => F.toIO(projection.storeLatestOffset(offset) .retry .map(_ => offset)) .unsafeToFuture() }
eventsStoredProgress }}
Blue Brain Project | 2019 37 / 45
![Page 38: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/38.jpg)
Indexing: how it's usedDefine indexers
ElasticsearchTriple store (Blazegraph)Akka Distributed Data
Wrap the Akka source in an coordinator actorRun the coordinator as an Akka Cluster Singleton or Cluster Sharded actor.
Blue Brain Project | 2019 38 / 45
![Page 39: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/39.jpg)
Interservice communicationWe had Kafka
Easy to integrate with Alpakka KafkaDrawbacks:
One more thing to operate (several nodes + ZooKeeper)Exposing event streams to external users
Solution: server sent events
Support included in Akka HTTPPlain HTTP
Access restriction with Nexus ACLs
Blue Brain Project | 2019 39 / 45
![Page 40: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/40.jpg)
SSE: producerval pq = PersistenceQuery(as).readJournalFor[EventsByTagQuery](queryJournalPlugin)
def source( tag: String, offset: Offset)(implicit enc: Encoder[Event]): Source[ServerSentEvent, NotUsed] = pq.eventsByTag(tag, offset) .flatMapConcat(eventEnvelope => Source(eventToSse(eventEnvelope).toList)) .keepAlive(10 seconds, () => ServerSentEvent.heartbeat)
def lastEventId: Directive1[Offset] = optionalHeaderValueByName(`Last-Event-ID`.name) .map(_.map(id => `Last-Event-ID`(id))) .flatMap { case Some(header) => Try[Offset](TimeBasedUUID(UUID.fromString(header.id))) .orElse(Try(Sequence(header.id.toLong))) match { case Success(value) => provide(value) case Failure(_) => reject(validationRejection("Invalid header")) } case None => provide(NoOffset) }
Blue Brain Project | 2019 40 / 45
![Page 41: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/41.jpg)
SSE: consumerclass EventSource[A: Decoder] {
def send(request: HttpRequest)( implicit cred: Option[AuthToken]): Future[HttpResponse] = http.singleRequest(addCredentials(request)).map { resp => if (!resp.status.isSuccess()) logger.warn(s"Error when performing SSE request: '${resp.status}'") resp }
def apply(iri: AbsoluteIri, offset: Option[String])( implicit cred: Option[AuthToken]): Source[A, NotUsed] = SSESource(iri.toAkkaUri, send, offset, sseRetryDelay).flatMapConcat { sse => decode[A](sse.data) match { case Right(ev) => Source.single(ev) case Left(err) => logger.error(s"Failed to decode admin event '$sse'", err) Source.empty } }}
Blue Brain Project | 2019 41 / 45
![Page 42: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/42.jpg)
MonitoringWe use Kamon with two reporters:
Prometheus for metrics, plotted in GrafanaJaeger for tracingBe careful not to generate too many metrics
operationName directive creates a new histogram for every differentrequest pathStart with a reasonable sampling rate
Blue Brain Project | 2019 42 / 45
![Page 43: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/43.jpg)
Nexus RDFhttps://github.com/BlueBrain/nexus-rdf
IRI
Superset of URI that supports unicode, used by RDFCustom parser implemented with Parboiled2Manipulation (scheme, host, port, path, segments, query, ...)Conversion (absolute, relative, URL, URN, CURIE, ...)
Graph
ConstructionManipulation
NodesTriples
SerializationJSON-LD with Circe and JenaN-TriplesDOT graph format
Blue Brain Project | 2019 43 / 45
![Page 44: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/44.jpg)
Application: Nexus Searchhttps://bbp.epfl.ch/nexus/search/
Blue Brain Project | 2019 44 / 45
![Page 45: BlueBrain Nexus · Blue Brain Project | 2019 37 / 45. Indexing: how it's used Define indexers Elasticsearch Triple store (Blazegraph) Akka Distributed Data Wrap the Akka sour ce](https://reader033.vdocuments.us/reader033/viewer/2022042307/5ed31ab2d682b644414ee327/html5/thumbnails/45.jpg)
Questions?
Blue Brain Project | 2019 45 / 45