system integration through queues
TRANSCRIPT
System integration through queues
● Paolo Laurenti @paololaurenti
● Gianluca Padovanicoders51 - @gpad619
● Gabriele SantomaggioErlang-Solutions - @gsantomaggio
What we will see... topics
● Integration with MQTT - AMQP
● Different languages (Elixir, Java, .Net, Ruby, Python, C++, .Net)
● Different technologies in action as Docker, Phoenix, RabbitMQ and more
● Unit tests / Integration Tests and Monitoring
● High availability and Horizontal scaling
What we will see... structure
● Introduction (15 mins)
● Coding (you) (25 mis)
● Recap and introduction next problem (10 mins)
● Coding (you) (20 mis)
● Recap Q&A Monitoring/QuickCheck!! (20 min)
What we will see...
.Net
JS
Java
Python
Ruby
C++
HAPr
oxy RabbitMQ - Cluster Elixir
ConsumerBackendOne
Phoenix
Web-
Sock
et
Beam
AMQP protocol, send receipts from many shops at minutes scale.
MQTT protocol, send internal, external temperatures and the number of people inside a shop at seconds scale.
What we will see...
.Net
JS
Java
Python
Ruby
C++
RabbitMQ - ClusterHAPr
oxy
Elixir ConsumerBackendOne
Phoenix
Web-
Sock
et
Beam
We work here!
BackendOne
What we will see...
DeviceConsumer FinancialConsumer
Accumulator
RabbitMQ
FinancialMessage
StatisticalMessage
DeviceMessage
Financial message
The financial messages are the receipts received from different sellers.
Message struct:
sellerId - identifies the seller
date - date of the receipt
totalAmount - total amount of the receipt
other fields
Look in FinancialConsumer to see how this message is handled
Device message
The device messages are sent from MQTT producers.
Message struct :
sellerId - identifies the seller
date
type - of device
value
The message payload is binary, look in DeviceConsumer to see how the binary-deserilization works
Statistical message
The statistical messages are sent from BackendOne and contain statistical data extracted from Device and Financial messages.
Message struct :
Seller id
payload:
Receipt
Date
Id
Amount
Internal average temperature
External average temperature
People count
Statistical message - continue
The statistical messages are calculated from Device and Financial messages.
When we receive a receipt in one minute and we receive internal temperature, external temperature and people count in the same minute of receipt and in the next minute.
Example:
We receive some temperatures and counters at 14:31 and 14:32. We calculate the average of internal and external temperature of this minute, and the sum of people counter. When we receive the receipt at 14:31 we send statistical message for 14:31.
Accumulator state
If I want create a map with key ‘k1’ as atom and value 42 I should write something like this:
%{ k1: 42 }
or
%{ :k1 => 42 }
Accumulator state - continue
The accumulator state is a map like this:
%{internal_avg_temperature: %{ … },external_avg_temperature: %{ … },people: %{ … },receipt: %{ … },
}
Accumulator state - continue
The value of internal_avg_temperature and external_avg_temperature keys are maps that have as key the datetime rounded at minute and another map as value.
%{external_avg_temperature: %{“2016-11-19 11:23:00” => %{
value: average,total: total_of_received_temperatures,count: numer_of_temperatures_received,
},“2016-11-19 11:24:00” => %{…
},}
}
Accumulator state - continue
The value of people key is a maps that have as key the datetime rounded at minute and the total of people counter.
%{people: %{“2016-11-19 11:23:00” => 12,“2016-11-19 11:24:00” => 10,}
}
Accumulator state - continue
The value of receipt key is a maps that have as key the datetime rounded at minute and the list of receipt received in that minute.
%{receipt: %{“2016-11-19 11:23:00” => [receipt_1,receipt_2, … receipt_N],“2016-11-19 11:24:00” => [receipt_1,receipt_2],… }
}
Accumulator state - continue
%{external_avg_temperature: %{“2016-11-19 11:21:00” => %{
value: average,total: total_of_received_temperatures,count: numer_of_temperatures_received,
},},people: %{“2016-11-19 11:23:00” => 12,},receipt: %{“2016-11-19 11:25:00” => [receipt_1,receipt_2, … receipt_N],}
}
Get the code
git clone https://github.com/ggp/backend_one .
git checkout CLUES_master
mix deps.get
mix compile
AMQP_HOST=<rabbit-ip> AMQP_USERNAME=<rabbit-user> AMQP_PASSWORD=<rabbit-pwd> mix test
Test failed
CODING - Test failed
Some hints:
The code probably doesn’t manage correctly some new fields
Take some time to study the code
Pay attention when accumulator crashes …
Make a simple solution that works for test (couldn’t work in general …)
Your round...
Fix the test in ~25 mins.
CODING - Test failed - recap
Did you understand the architecture of the backend_one?
How many ‘if … then … else’ did you see?
How do the process communicate?
What happen if:
The accumulator crashes? Who knows?
The accumulator is too slow?
Get the code
git clone https://github.com/ggp/backend_one .
git checkout TDD_new_aggregator
mix deps.get
mix compile
AMQP_HOST=<rabbit-ip> AMQP_USERNAME=<rabbit-user> AMQP_PASSWORD=<rabbit-pwd> mix test
Tests failed!!!
CODING - New tests ..
We replaced accumulator process with a GenServer
We use struct to manage data
More tests are red
You can execute only a subset of tests executing
CODING - New tests ..
Some hints:
Start with AccumulatorService
Execute single file test
Pay attention on how the state is managed in GenServer
In Aggregate pay attention to how a new message can declare a slot as “complete”
Your round… again.
Fix the test … 20 mins.
CODING - New tests .. - recap
How the state is managed in GenServer?
Is it better to manage data with structs?
How do you call functions in GenServer?
How do you identify it?
Is the caller blocked during execution?
Monitoring - Tests
● Monitor distribution application
● Common tools ( Netdata, Zabbix, Ganglia, tc)
● Observer
● Beam tools - WombatOM
● QuickCheck (http://www.quviq.com/products/erlang-quickcheck/)
Thank youAny questions?
● Paolo Laurenti @paololaurenti
● Gianluca Padovanicoders51 - @gpad619
● Gabriele SantomaggioErlang-Solutions - @gsantomaggio