sanjar akhmedov - joining infinity – windowless stream processing with flink
TRANSCRIPT
Joining Infinity — Windowless Stream Processing with Flink
Sanjar Akhmedov, Software Engineer, ResearchGate
It started when two researchers discovered first-
hand that collaborating with a friend or colleague on
the other side of the world was no easy task. There are many variations ofpassages of Lorem Ipsum
ResearchGate is a socialnetwork for scientists.
Connect the world of science.Make research open to all.
Structured system
There are many variations ofpassages of Lorem Ipsum
We have, and arecontinuing to changehow scientificknowledge is shared anddiscovered.
10,000,000Members
102,000,000Publications
30,000,000Visitors
Feature: Research Timeline
Feature: Research Timeline
Diverse data sources
Proxy
Frontend
Services
memcache MongoDB Solr PostgreSQL
Infinispan HBaseMongoDB Solr
Big data pipelineChange
datacapture
Import
Hadoop cluster
Export
Data Model
Account Publication
Claim
1 *
Author
Authorship
1*
Hypothetical SQL
PublicationAuthorship
1*
CREATE TABLE publications (id SERIAL PRIMARY KEY,author_ids INTEGER[]
);
AccountClaim
1 *
Author
CREATE TABLE accounts (id SERIAL PRIMARY KEY,claimed_author_ids INTEGER[]
);
CREATE MATERIALIZED VIEW account_publicationsREFRESH FAST ON COMMITASSELECTaccounts.id AS account_id,publications.id AS publication_id
FROM accountsJOIN publicationsON ANY (accounts.claimed_author_ids) = ANY (publications.author_ids);
• Data sources are distributed across different DBs
• Dataset doesn’t fit in memory on a single machine
• Join process must be fault tolerant
• Deploy changes fast
• Up-to-date join result in near real-time
• Join result must be accurate
Challenges
Change data capture (CDC)
User Microservice DBRequest Write
Cache
Sync
Solr/ES
Sync
HBase/HDFS
Sync
Change data capture (CDC)
User Microservice DBRequest Write
Log
K2
1
K1
4
Extract
Change data capture (CDC)
User Microservice DBRequest Write
Log
K2
1
K1
4
K1
Ø
Extract
Change data capture (CDC)
User Microservice DBRequest Write
Log
K2
1
K1
4
K1
Ø
KN
42…
Extract
Change data capture (CDC)
User Microservice DB
Cache
Request Write
Log
K2
1
K1
4
K1
Ø
KN
42…
Extract
Sync
Change data capture (CDC)
User Microservice DB
Cache
Request Write
Log
K2
1
K1
4
K1
Ø
KN
42…
Extract
Sync
HBase/HDFSSolr/ES
Join two CDC streams into one
NoSQL1
SQL Kafka
Kafka
Flink Streaming Join Kafka NoSQL2
Flink job topology
Accounts Stream
Join(CoFlatMap)
AccountPublications
PublicationsStream
…
Author 2
Author 1
Author N
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
DataStream<Account> accounts = kafkaTopic("accounts");DataStream<Publication> publications = kafkaTopic("publications");DataStream<AccountPublication> result = accounts.connect(publications)
.keyBy("claimedAuthorId", "publicationAuthorId")
.flatMap(new RichCoFlatMapFunction<Account, Publication, AccountPublication>() {
transient ValueState<String> authorAccount;transient ValueState<String> authorPublication;
public void open(Configuration parameters) throws Exception {authorAccount = getRuntimeContext().getState(new ValueStateDescriptor<>("authorAccount", String.class, null));authorPublication = getRuntimeContext().getState(new ValueStateDescriptor<>("authorPublication", String.class, null));
}
public void flatMap1(Account account, Collector<AccountPublication> out) throws Exception {authorAccount.update(account.id);if (authorPublication.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}
public void flatMap2(Publication publication, Collector<AccountPublication> out) throws Exception {authorPublication.update(publication.id);if (authorAccount.value() != null) {
out.collect(new AccountPublication(authorAccount.value(), authorPublication.value()));}
}});
Prototype implementation
Example dataflowAccount Publications
Accounts
Alice 2
Publications
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Author 2
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Publications
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Author 2
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Publications
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Author 2
Alice
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Author 2
Alice
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Author 2
Alice
Author N
…
(Bob, 1)
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob
Author 2
Alice
Author N
…
(Bob, 1)
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob
Author 2
Alice
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob
Author 2
Alice
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Example dataflowAccount Publications
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Example dataflowAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
• ✔ Data sources are distributed across different DBs
• ✔ Dataset doesn’t fit in memory on a single machine
• ✔ Join process must be fault tolerant
• ✔ Deploy changes fast
• ✔ Up-to-date join result in near real-time
• ? Join result must be accurate
Challenges
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
?
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Need previousvalue
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
Diff withPrevious
State
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
Diff withPrevious
State
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Paper1 gets deletedAccount Publications
K1 (Bob, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 Ø
Accounts Stream
Join
AccountPublications
Diff withPrevious
State
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Need K1 here,e.g. K1 = 𝒇(Bob, Paper1)
Paper1 gets updatedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 2
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Paper1 gets updatedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 2
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice
Author N
…
Paper1 gets updatedAccount Publications
K1 (Bob, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 2
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice Paper1
Author N
…
Paper1 gets updatedAccount Publications
K1 (Bob, Paper1)
(Alice, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 2
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice Paper1
Author N
…
(Alice, Paper1)
Paper1 gets updatedAccount Publications
K1 (Bob, Paper1)
?? (Alice, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 2
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
Accounts
Alice 2
Bob 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
Accounts
Alice 2
Bob 1
Bob Ø
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
Accounts
Alice 2
Bob 1
Bob Ø
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Bob Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Ø Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Ø Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Alice 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Ø Paper1
Author 2
Alice Paper1
Author N
…
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Alice 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Ø Paper1
Author 2
Alice Paper1
Author N
…
2. (Alice, 1)
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Alice 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Alice Paper1
Author 2
Ø Paper1
Author N
…
2. (Alice, 1)
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Alice 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Alice Paper1
Author 2
Ø Paper1
Author N
…
2. (Alice, 1)
(Alice, Paper1)
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
K2 (Alice, Paper1)
K2 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Alice 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Alice Paper1
Author 2
Ø Paper1
Author N
…
2. (Alice, 1)
(Alice, Paper1)
Alice claims Paper1 via different authorAccount Publications
K1 (Bob, Paper1)
K2 (Alice, Paper1)
K1 Ø
K3 (Alice, Paper1)
K2 Ø
Accounts
Alice 2
Bob 1
Bob Ø
Alice 1
Publications
Paper1 1
Paper1 (1, 2)
Accounts Stream
Join
AccountPublications
PublicationsStream
Author 1
Alice Paper1
Author 2
Ø Paper1
Author N
…
2. (Alice, 1)
(Alice, Paper1)
Pick correct natural IDse.g. K3 = 𝒇(Alice, Author1, Paper1)
• Keep previous element state to updateprevious join result
• Stream elements are not domain entitiesbut commands such as delete or upsert
• Joined stream must have natural IDsto propagate deletes and updates
How to solve deletes and updates
Generic join graph
AccountPublications
Accounts Stream
PublicationsStream
Diff
Alice
Bob
…
Diff
Paper1
PaperN
…
Join
Author1
AuthorM
…
Generic join graph
Operate on commands
AccountPublications
Accounts Stream
PublicationsStream
Diff
Alice
Bob
…
Diff
Paper1
PaperN
…
Join
Author1
AuthorM
…
Memory requirements
AccountPublications
Accounts Stream
PublicationsStream
Diff
Alice
Bob
…
Diff
Paper1
PaperN
…
Join
Author1
AuthorM
…
Full copy ofAccounts stream
Full copy ofPublications
stream
Full copy ofAccounts stream
on left side
Full copy ofPublications stream
on right side
Network load
AccountPublications
Accounts Stream
PublicationsStream
Diff
Alice
Bob
…
Diff
Paper1
PaperN
…
Join
Author1
AuthorM
…
Reshuffle Reshuffle
Network
Network
• In addition to handling Kafka traffic we need to reshuffle all data twice over the network
• We need to keep two full copies of each joined stream in memory
Resource considerations
Questions
We are hiring - www.researchgate.net/careers
Lorem ipsum dolor sit amet, consecteturadipiscing elit. Mauris pharetra interdum felis, sitamet aliquet mauris. Proin non fermentum sem. Vivamus a ligula vel arcu convallis porttitor.