system integration through queues

29
System integration through queues Paolo Laurenti @paololaurenti Gianluca Padovani coders51 - @gpad619 Gabriele Santomaggio Erlang-Solutions - @gsantomaggio

Upload: gianluca-padovani

Post on 07-Jan-2017

105 views

Category:

Software


0 download

TRANSCRIPT

Page 1: System integration through queues

System integration through queues

● Paolo Laurenti @paololaurenti

● Gianluca Padovanicoders51 - @gpad619

● Gabriele SantomaggioErlang-Solutions - @gsantomaggio

Page 2: System integration through queues

What we will see... topics

● Integration with MQTT - AMQP

● Different languages (Elixir, Java, .Net, Ruby, Python, C++, .Net)

● Different technologies in action as Docker, Phoenix, RabbitMQ and more

● Unit tests / Integration Tests and Monitoring

● High availability and Horizontal scaling

Page 3: System integration through queues

What we will see... structure

● Introduction (15 mins)

● Coding (you) (25 mis)

● Recap and introduction next problem (10 mins)

● Coding (you) (20 mis)

● Recap Q&A Monitoring/QuickCheck!! (20 min)

Page 4: System integration through queues

What we will see...

.Net

JS

Java

Python

Ruby

C++

HAPr

oxy RabbitMQ - Cluster Elixir

ConsumerBackendOne

Phoenix

Web-

Sock

et

Beam

Page 5: System integration through queues

AMQP protocol, send receipts from many shops at minutes scale.

MQTT protocol, send internal, external temperatures and the number of people inside a shop at seconds scale.

What we will see...

.Net

JS

Java

Python

Ruby

C++

RabbitMQ - ClusterHAPr

oxy

Elixir ConsumerBackendOne

Phoenix

Web-

Sock

et

Beam

We work here!

Page 6: System integration through queues

BackendOne

What we will see...

DeviceConsumer FinancialConsumer

Accumulator

RabbitMQ

FinancialMessage

StatisticalMessage

DeviceMessage

Page 7: System integration through queues

Financial message

The financial messages are the receipts received from different sellers.

Message struct:

sellerId - identifies the seller

date - date of the receipt

totalAmount - total amount of the receipt

other fields

Look in FinancialConsumer to see how this message is handled

Page 8: System integration through queues

Device message

The device messages are sent from MQTT producers.

Message struct :

sellerId - identifies the seller

date

type - of device

value

The message payload is binary, look in DeviceConsumer to see how the binary-deserilization works

Page 9: System integration through queues

Statistical message

The statistical messages are sent from BackendOne and contain statistical data extracted from Device and Financial messages.

Message struct :

Seller id

payload:

Receipt

Date

Id

Amount

Internal average temperature

External average temperature

People count

Page 10: System integration through queues

Statistical message - continue

The statistical messages are calculated from Device and Financial messages.

When we receive a receipt in one minute and we receive internal temperature, external temperature and people count in the same minute of receipt and in the next minute.

Example:

We receive some temperatures and counters at 14:31 and 14:32. We calculate the average of internal and external temperature of this minute, and the sum of people counter. When we receive the receipt at 14:31 we send statistical message for 14:31.

Page 11: System integration through queues

Accumulator state

If I want create a map with key ‘k1’ as atom and value 42 I should write something like this:

%{ k1: 42 }

or

%{ :k1 => 42 }

Page 12: System integration through queues

Accumulator state - continue

The accumulator state is a map like this:

%{internal_avg_temperature: %{ … },external_avg_temperature: %{ … },people: %{ … },receipt: %{ … },

}

Page 13: System integration through queues

Accumulator state - continue

The value of internal_avg_temperature and external_avg_temperature keys are maps that have as key the datetime rounded at minute and another map as value.

%{external_avg_temperature: %{“2016-11-19 11:23:00” => %{

value: average,total: total_of_received_temperatures,count: numer_of_temperatures_received,

},“2016-11-19 11:24:00” => %{…

},}

}

Page 14: System integration through queues

Accumulator state - continue

The value of people key is a maps that have as key the datetime rounded at minute and the total of people counter.

%{people: %{“2016-11-19 11:23:00” => 12,“2016-11-19 11:24:00” => 10,}

}

Page 15: System integration through queues

Accumulator state - continue

The value of receipt key is a maps that have as key the datetime rounded at minute and the list of receipt received in that minute.

%{receipt: %{“2016-11-19 11:23:00” => [receipt_1,receipt_2, … receipt_N],“2016-11-19 11:24:00” => [receipt_1,receipt_2],… }

}

Page 16: System integration through queues

Accumulator state - continue

%{external_avg_temperature: %{“2016-11-19 11:21:00” => %{

value: average,total: total_of_received_temperatures,count: numer_of_temperatures_received,

},},people: %{“2016-11-19 11:23:00” => 12,},receipt: %{“2016-11-19 11:25:00” => [receipt_1,receipt_2, … receipt_N],}

}

Page 17: System integration through queues

Get the code

git clone https://github.com/ggp/backend_one .

git checkout CLUES_master

mix deps.get

mix compile

AMQP_HOST=<rabbit-ip> AMQP_USERNAME=<rabbit-user> AMQP_PASSWORD=<rabbit-pwd> mix test

Page 18: System integration through queues

Test failed

Page 19: System integration through queues

CODING - Test failed

Some hints:

The code probably doesn’t manage correctly some new fields

Take some time to study the code

Pay attention when accumulator crashes …

Make a simple solution that works for test (couldn’t work in general …)

Page 20: System integration through queues

Your round...

Fix the test in ~25 mins.

Page 21: System integration through queues

CODING - Test failed - recap

Did you understand the architecture of the backend_one?

How many ‘if … then … else’ did you see?

How do the process communicate?

What happen if:

The accumulator crashes? Who knows?

The accumulator is too slow?

Page 22: System integration through queues

Get the code

git clone https://github.com/ggp/backend_one .

git checkout TDD_new_aggregator

mix deps.get

mix compile

AMQP_HOST=<rabbit-ip> AMQP_USERNAME=<rabbit-user> AMQP_PASSWORD=<rabbit-pwd> mix test

Page 23: System integration through queues

Tests failed!!!

Page 24: System integration through queues

CODING - New tests ..

We replaced accumulator process with a GenServer

We use struct to manage data

More tests are red

You can execute only a subset of tests executing

Page 25: System integration through queues

CODING - New tests ..

Some hints:

Start with AccumulatorService

Execute single file test

Pay attention on how the state is managed in GenServer

In Aggregate pay attention to how a new message can declare a slot as “complete”

Page 26: System integration through queues

Your round… again.

Fix the test … 20 mins.

Page 27: System integration through queues

CODING - New tests .. - recap

How the state is managed in GenServer?

Is it better to manage data with structs?

How do you call functions in GenServer?

How do you identify it?

Is the caller blocked during execution?

Page 28: System integration through queues

Monitoring - Tests

● Monitor distribution application

● Common tools ( Netdata, Zabbix, Ganglia, tc)

● Observer

● Beam tools - WombatOM

● QuickCheck (http://www.quviq.com/products/erlang-quickcheck/)

Page 29: System integration through queues

Thank youAny questions?

● Paolo Laurenti @paololaurenti

● Gianluca Padovanicoders51 - @gpad619

● Gabriele SantomaggioErlang-Solutions - @gsantomaggio