a bioinformatics introduction to cluster computing part i by andrew d. boyd, md research fellow...
TRANSCRIPT
A Bioinformatics Introduction to Cluster Computing part I
By Andrew D. Boyd, MDResearch Fellow
Michigan Center for Biological InformationDepartment of Psychiatry,
University of Michigan Health Systemand
Abhijit Bose, PhDAssociate Director
Michigan Grid Research and Infrastructure Development andDepartment of Electrical Engineering and Computer Science
University of Michigan
Introduction
• What is parallel computing?
• Why go parallel?
• What are some limits of parallel computing?
• Types of parallel computing– Shared memory
– Distributed memory
What is parallel computing?• Parallel computing: the use of multiple computers or
processors working together on a common task.
– Each processor works on its section of the problem
– Processors are allowed to exchange information with other processors
CPU #1 works on thisarea of the problem
CPU #3 works on thisarea of the problem
CPU #4 works on thisarea of the problem
CPU #2 works on thisarea of the problem
Grid of Problem to be solved
y
x
Exchange
Exchange
Why do parallel computing?• Limits of serial computing
– Available memory– Performance
• Parallel computing allows:– Solve problems that don’t fit on a single
CPU– Solve problems that can’t be solved in a
reasonable time
• We can run…– Larger problems– Faster– More cases
Types of parallelism
• Data parallel– Each processor performs the same task on
different data– Example - grid problems
• Task parallel– Each processor performs a different task– Example - signal processing
• Most applications fall somewhere on the continuum between these two extremes
Basics of Data ParallelProgramming
• Same code will run on 2 CPUs
• Program has array of data to be operated on by 2 CPU so array is split into two parts.
program.f:… if CPU=a then low_limit=1 upper_limit=50elseif CPU=b then low_limit=51 upper_limit=100end ifdo I = low_limit, upper_limit work on A(I)end do...end program
CPU A CPU B
program.f:…low_limit=1upper_limit=50do I= low_limit, upper_limit work on A(I)end do…end program
program.f:…low_limit=51upper_limit=100do I= low_limit, upper_limit work on A(I)end do…end program
Typical Task Parallel Application
• Signal processing• Use one processor for each task
• Can use more processors if one is overloaded
DATANormalize
Task FFTTask
Multiply
Task
InverseFFTTask
Basics of Task Parallel Programming
• Program has 2 tasks (a and b) to be done by 2 CPUs
program.f:… initialize...if CPU=a then do task aelseif CPU=b then do task bend if….end program
CPU A CPU B
program.f:…initialze…do task a…end program
program.f:…initialze…do task b…end program
Limits of Parallel Computing
• Theoretical upper limits– Amdahl’s Law
• Practical limits
• Other Considerations– time to re-write code
Theoretical upper limits
• All parallel programs contain:– Parallel sections
– Serial sections
• Serial sections limit the parallel effectiveness
• Amdahl’s Law states this formally
Amdahl’s Law• Amdahl’s Law places a strict limit on the
speedup that can be realized by using multiple processors.– Effect of multiple processors on run time
– Effect of multiple processors on speed up
– Where• Fs = serial fraction of code• Fp = parallel fraction of code• N = number of processors
tn fp / N fs t1
S 1fs fp /N
It takes only a small fraction of serial content in a code todegrade the parallel performance. It is essential todetermine the scaling behaviour of your code before doing production runs using large numbers of processors
0
50
100
150
200
250
0 50 100 150 200 250Number of processors
Illustration of Amdahl's Law
fp = 1.000fp = 0.999fp = 0.990fp = 0.900
Amdahl’s Law provides a theoretical upper limit on parallelspeedup assuming that there are no costs for speedup assuming that there are no costs for communications. In reality, communications will result in a further degradation of performance
Amdahl’s Law Vs. Reality
0
10
20
30
40
50
60
70
80
0 50 100 150 200 250
fp = 0.99
Spe
ed u
p
Number of processors
Amdahl's LawReality
Some other considerations• Writing effective parallel application is difficult
– Load balance is important– Communication can limit parallel efficiency– Serial time can dominate
• Is it worth your time to rewrite your application– Do the CPU requirements justify parallelization?– Will the code be used just once?Super-linear Speedup ? Cache effects as number of processors increases Randomised algorithms
Shared and Distributed memory
Shared memory – single address space. All processors have access to a pool of shared memory.(examples: CRAY T90)
Methods of memory access : - Bus - Crossbar
Distributed memory – each processorhas it’s own local memory. Must do message passing to exchange data between processors. (examples: CRAY T3E, IBM SP )
P P P P P P
M M M M M M
Network
P P P P P P
Memory
Bus
Pure Shared Memory Machines
• T90, C90, YMP, XMP, SV1• SGI O2000 (sort of)• HP-Exemplar (sort of)• Vax 780• Various Suns• Various Wintel boxes• BBN GP 1000 Butterfly
Programming methodologies• Standard Fortran or C and let the compiler do it
for you
• Directive can give hints to compiler (OpenMP)
• Libraries
• Threads like methods– Explicitly Start multiple tasks– Each given own section of memory– Use shared variables for communication
• Message passing can also be used but is not common
program calc_pi implicit none integer n,i double precision w,x,sum,pi,f,a double precision start, finish, timef f(a) = 4.0 / (1.0 + a*a) n=100000000 start=timef() w=1.0/n sum=0.0
Example of Shared-Memory Programming
Program: Calculate the value of pi
!$OMP PARALLEL PRIVATE(x,i), SHARED(w,n), & !$OMP REDUCTION(+:sum)
!$OMP DO
do i=1,n
x = w * (i - 0.5)
sum = sum + f(x)
end do
!$OMP END DO
!$OMP END PARALLEL
pi = w * sum
finish=timef()
print*,"value of pi, time taken:"
print*,pi,finish-start
end
Shared-Memory Portion:
Distributed shared memory (NUMA)
• Consists of N processors and a global address space
– All processors can see all memory
– Each processor has some amount of local memory
– Access to the memory of other processors is slower
• Non-Uniform Memory Access
P P P P
Memory
Bus
P P P P
Memory
Bus
Programming methodologies• Same as shared memory
• Standard Fortran or C and let the compiler do it for you
• Directive can give hints to compiler (OpenMP)
• Libraries
• Threads like methods– Explicitly Start multiple tasks– Each given own section of memory– Use shared variables for communication
• Message passing can also be used
DSM NUMA Machines
• SGI O2000• HP-Exemplar
Distributed Memory
• Each of N processors has its own memory• Memory is not shared• Communication occurs using messages
P P P P P P
M M M M M M
Network
Communication networks• Custom
– Many manufacturers offer custominterconnects
– Myrinet 2000
• Off the shelf– Ethernet (Force 10, Extreme
Networks)– ATM– HIPI– FIBER Channel– FDDI– INFINIBAND
Programming methodology
• Mostly message passing using MPI
• Data distribution languages– Simulate global name space
– Examples• High Performance Fortran
• Split C
• Co-array Fortran
Hybrid machines• SMP nodes (clumps) with interconnect
between clumps
• Machines– Origin 2000– Exemplar– SV1– IBM Nighthawk
• Programming– SMP methods on clumps or message passing– Message passing between all processors
QuickTime™ and aTIFF (Uncompressed) decompressor
are needed to see this picture.
AN INTRODUCTION TO
MESSAGE PASSING INTERFACE
FOR CLUSTERS
A Brief Intro to MPI
• Background on MPI• Documentation • Hello world in MPI • Basic communications• Simple send and receive program
You can use MPI in both Clusters and Grid environments
Background on MPI
• MPI - Message Passing Interface– Library standard defined by committee of
vendors, implementers, and parallel programmers
– Used to create parallel SPMD programs based on message passing
• Available on almost all parallel machines in C and Fortran
• Over 100 advanced routines but 6 basic
Documentation• MPI home page
http://www.mcs.anl.gov/mpi Contains the library standard
• Books http://www.epm.ornl.gov/~walker/mpi/books.html
"MPI: The Complete Reference" by Snir, Otto, Huss-Lederman, Walker, and Dongarra, MIT Press (also in Postscript and html)
"Using MPI" by Gropp, Lusk and Skjellum, MIT Press
MPI Implementations • Most parallel machine vendors have optimized
versions
• Others: http://www-unix.mcs.anl.gov/mpi/mpich/indexold.html
GLOBUS: http://www.globus.org
MPICH-G2:http://www3.niu.edu/mpi/
Key Concepts of MPI• Used to create parallel SPMD programs based
on message passing – Normally the same program is running on
several different nodes– Nodes communicate using message passing
• Typical methodology: start job on n processorsdo i=1 to j each processor does some calculation pass messages between processorend doend job
Include files
• The MPI include file– C: mpi.h– Fortran: mpif.h (a f90 module is a good place
for this)• Defines many constants used within MPI
programs• In C defines the interfaces for the functions• Compilers know where to find the include files
Communicators• Communicators
– A parameter for most MPI calls– A collection of processors working on some
part of a parallel job– MPI_COMM_WORLD is defined in the MPI
include file as all of the processors in your job– Can create subsets of MPI_COMM_WORLD– Processors within a communicator are
assigned numbers 0 to n-1
Data types• Data types
– When sending a message, it is given a data type
– Predefined types correspond to "normal" types
• MPI_REAL , MPI_FLOAT -Fortran and C real • MPI_DOUBLE PRECISION , MPI_DOUBLE -
Fortan and C double• MPI_INTEGER and MPI_INT - Fortran and C
integer
– Can create user-defined types
Minimal MPI Program• Every MPI program needs these…
– C version
#include <mpi.h> /* the mpi include file *//* Initialize MPI */ierr=MPI_Init(&argc, &argv);/* How many total PEs are there */ierr=MPI_Comm_size(MPI_COMM_WORLD, &nPEs);/* What node am I (what is my rank? */ierr=MPI_Comm_rank(MPI_COMM_WORLD, &iam);...ierr=MPI_Finalize();
• In C MPI routines are functions and return an error value
Minimal MPI Program• Every MPI program needs these…
– Fortran version include 'mpif.h' ! MPI include filec Initialize MPI call MPI_Init(ierr)c Find total number of PEs call MPI_Comm_size(MPI_COMM_WORLD, nPEs, ierr)c Find the rank of this node call MPI_Comm_rank(MPI_COMM_WORLD, iam, ierr) ... call MPI_Finalize(ierr)
• In Fortran, MPI routines are subroutines, and last parameter is an error value
Basic Communication• Data values are transferred from one processor
to another – One process sends the data– Another receives the data
• Synchronous– Call does not return until the message is sent
or received• Asynchronous
– Call indicates a start of send or received, and another call is made to determine if finished
Synchronous Send• C
– MPI_Send(&buffer, count ,datatype, destination, tag,communicator);
• Fortran – Call MPI_Send(buffer, count, datatype, destination,tag,communicator, ierr)
• Call blocks until message on the way
Synchronous Send• MPI_Send: Sends data to another processor
• Use MPI_Receive to "get" the data
• C MPI_Send(&buffer,count,datatype, destination,tag,communicator);
• Fortran Call MPI_Send(buffer, count, datatype,destination, tag, communicator, ierr)
Call MPI_Send(buffer, count, datatype,destination, tag, communicator, ierr)
• Buffer: The data • Count : Length of source array (in elements, 1 for scalars)
• Datatype : Type of data, for example : MPI_DOUBLE_PRECISION, MPI_INT, etc
• Destination : Processor number of destination processor in communicator
• Tag : Message type (arbitrary integer) • Communicator : Your set of processors• Ierr : Error return (Fortran only)
Synchronous Receive • C
– MPI_Recv(&buffer,count, datatype, source, tag, communicator, &status);
• Fortran – Call MPI_ RECV(buffer, count, datatype, source,tag,communicator, status, ierr)
• Call blocks until message is in buffer• Status - contains information about incoming
message– C
• MPI_Status status;
– Fortran• Integer status(MPI_STATUS_SUZE)
Call MPI_Recv(buffer, count, datatype,source, tag, communicator,status, ierr)
• Buffer: The data • Count : Length of source array (in elements, 1 for scalars)
• Datatype : Type of data, for example : MPI_DOUBLE_PRECISION, MPI_INT, etc
• Source : Processor number of source processor in communicator
• Tag : Message type (arbitrary integer) • Communicator : Your set of processors• Status: Information about message• Ierr : Error return (Fortran only)
Six basic MPI callsMPI_INIT
Initialize MPIMPI_COMM_RANK
Get the processor rankMPI_COMM_SIZE
Get the number of processorsMPI_Send
Send data to another processorMPI_Recv
Get data from another processorMPI_FINALIZE
Finish MPI
Send and Receive Program Fortranprogram send_receiveinclude "mpif.h"integer myid,ierr,numprocs,tag,source,destination,countinteger bufferinteger status(MPI_STATUS_SIZE)call MPI_INIT( ierr )call MPI_COMM_RANK( MPI_COMM_WORLD, myid, ierr )call MPI_COMM_SIZE( MPI_COMM_WORLD, numprocs, ierr )tag=1234; source=0; destination=1; count=1if(myid .eq. source)then buffer=5678 Call MPI_Send(buffer, count, MPI_INTEGER,destination,& tag, MPI_COMM_WORLD, ierr) write(*,*)"processor ",myid," sent ",bufferendifif(myid .eq. destination)then Call MPI_Recv(buffer, count, MPI_INTEGER,source,& tag, MPI_COMM_WORLD, status,ierr) write(*,*)"processor ",myid," got ",bufferendifcall MPI_FINALIZE(ierr)stopend
Send and Receive Program C#include <stdio.h>#include "mpi.h"int main(int argc,char *argv[]){ int myid, numprocs, tag,source,destination,count, buffer; MPI_Status status; MPI_Init(&argc,&argv); MPI_Comm_size(MPI_COMM_WORLD,&numprocs); MPI_Comm_rank(MPI_COMM_WORLD,&myid); tag=1234; source=0; destination=1; count=1; if(myid == source){ buffer=5678; MPI_Send(&buffer,count,MPI_INT,destination,tag,MPI_COMM_WORLD); printf("processor %d sent %d\n",myid,buffer); } if(myid == destination){ MPI_Recv(&buffer,count,MPI_INT,source,tag,MPI_COMM_WORLD,&status); printf("processor %d got %d\n",myid,buffer); } MPI_Finalize();}
MPI Types
• MPI has many different predefined data types
• Can be used in any communication operation
Predefined types in C
C MPI Types
MPI_CHAR signed char
MPI_SHORT signed short int
MPI_INT signed int
MPI_LONG signed long int
MPI_UNSIGNED_CHAR unsigned char
MPI_UNSIGNED_SHORT unsigned short int
MPI_UNSIGNED unsigned int
MPI_UNSIGNED_LONG unsigned long int
MPI_FLOAT float
MPI_DOUBLE double
MPI_LONG_DOUBLE long double
MPI_BYTE -
MPI_PACKED -
Predefined types in Fortran
Fortran MPI Types
MPI_INTEGER INTEGER
MPI_REAL REAL
MPI_DOUBLE PRECISION DOUBLE PRECISION
MPI_COMPLEX COMPLEX
MPI_LOGICAL LOGICAL
MPI_CHARACTER CHARACTER(1)
MPI_BYTE -
MPI_PACKED -
Wildcards • Allow you to not necessarily specify a tag or
source • Example
• MPI_ANY_SOURCE and MPI_ANY_TAG are wild cards
• Status structure is used to get wildcard values
MPI_Status status;int buffer[5];int error;error = MPI_Recv(&buffer, 5, MPI_INTEGER, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,&status);
Status
• The status parameter returns additional information for some MPI routines– Additional Error status information – Additional information with wildcard parameters
• C declaration : a predefined struct– MPI_Status status;
• Fortran declaration : an array is used instead – INTEGER STATUS(MPI_STATUS_SIZE)
Accessing status information
• The tag of a received message – C : status.MPI_TAG – Fortran : STATUS(MPI_TAG)
• The source of a received message – C : status.MPI_SOURCE – Fortran : STATUS(MPI_SOURCE)
• The error code of the MPI call – C : status.MPI_ERROR – Fortran : STATUS(MPI_ERROR)
• Other uses...
MPI_Probe
• MPI_Probe allows incoming messages to be checked without actually receiving . – The user can then decide how to receive the
data. – Useful when different action needs to be
taken depending on the "who, what, and how much" information of the message.
MPI_Probe
• C – int MPI_Probe(source, tag, comm, &status)
• Fortran – MPI_PROBE(SOURCE, TAG, COMM, STATUS, IERROR)
• Parameters – Source: source rank, or MPI_ANY_SOURCE – Tag: tag value, or MPI_ANY_TAG– Comm: communicator – Status: status object
MPI_Probe example (part 1)
! How to use probe and get_count ! to find the size of an incoming messageprogram probe_itinclude 'mpif.h'integer myid,numprocsinteger status(MPI_STATUS_SIZE)integer mytag,icount,ierr,iray(10)call MPI_INIT( ierr )call MPI_COMM_RANK( MPI_COMM_WORLD, myid, ierr )call MPI_COMM_SIZE( MPI_COMM_WORLD, numprocs, ierr )mytag=123; iray=0; icount=0if(myid .eq. 0)then! Process 0 sends a message of size 5 icount=5 iray(1:icount)=1 call MPI_SEND(iray,icount,MPI_INTEGER, & 1,mytag,MPI_COMM_WORLD,ierr)endif
MPI_Probe example (part 2)
if(myid .eq. 1)then! process 1 uses probe and get_count to find the size call mpi_probe(0,mytag,MPI_COM_WORLD,status,ierr) call mpi_get_count(status,MPI_INTEGER,icount,ierr) write(*,*)"getting ", icount," values" call mpi_recv(iray,icount,MPI_INTEGER,0, & mytag,MPI_COMM_WORLD,status,ierr)endifwrite(*,*)iraycall mpi_finalize(ierr)stopEnd
Fortran source C source
MPI_BARRIER
• Blocks the caller until all members in the communicator have called it.
• Used as a synchronization tool. • C
– MPI_Barrier(comm )
• Fortran – Call MPI_BARRIER(COMM, IERROR)
• Parameter – Comm communicator (MPI_COMM_WOLD)
Asynchronous Communication
• Asynchronous send: send call returns immediately, send actually occurs later
• Asynchronous receive: receive call returns immediately. When received data is needed, call a wait subroutine
• Asynchronous communication used in attempt to overlap communication with computation
• Can help prevent deadlock (not advised)
Asynchronous Send with MPI_Isend
• C – MPI_Request request – int MPI_Isend(&buffer, count, datatype, tag, comm, &request)
• Fortran – Integer REQUEST– MPI_ISEND(BUFFER,COUNT,DATATYPE, DEST, TAG, COMM, REQUEST,IERROR)
• Request is a new output Parameter• Don't change data until communication is complete
Asynchronous Receive with MPI_Irecv
• C – MPI_Request request; – int MPI_Irecv(&buf, count, datatype, source, tag, comm, &request)
• Fortran – Integer request– MPI_IRECV(BUF, COUNT, DATATYPE, SOURCE, TAG,COMM, REQUEST,IERROR)
• Parameter Changes– Request: communication request– Status parameter is missing
• Don't use data until communication is complete
MPI_Wait used to complete communication
• Request from Isend or Irecv is input• The completion of a send operation
indicates that the sender is now free to update the data in the send buffer
• The completion of a receive operation indicates that the receive buffer contains the received message
• MPI_Wait blocks until message specified by "request" completes
MPI_Wait used to complete communication
• C – MPI_Request request;– MPI_Status status;– MPI_Wait(&request, &status)
• Fortran– Integer request– Integer status(MPI_STATUS_SIZE)– MPI_WAIT(REQUEST, STATUS, IERROR)
• MPI_Wait blocks until message specified by "request" completes
MPI_Test
• Similar to MPI_Wait, but does not block• Value of flags signifies whether a message
has been delivered• C
– int flag– int MPI_Test(&request,&flag, &status)
• Fortran – LOGICAL FLAG– MPI_TEST(REQUEST, FLAG, STATUS, IER)
Non blocking send example
call MPI_Isend (buffer,count,datatype,dest, tag,comm, request, ierr)10 continue Do other work ...
call MPI_Test (request, flag, status, ierr) if (.not. flag) goto 10
MPI Broadcast call: MPI_Bcast
• All nodes call MPI_Bcast• One node (root) sends a message all others
receive the message • C
– MPI_Bcast(&buffer, count, datatype, root, communicator);
• Fortran – call MPI_Bcast(buffer, count, datatype, root, communicator, ierr)
• Root is node that sends the message
Scatter Operation using MPI_Scatter
• Similar to Broadcast but sends a section of an array to each processors
A(0) A(1) A(2) . . . A(N-1)
P0 P1 P2 . . . Pn-1Goes to processors:
Data in an array on root node:
MPI_Scatter
• C – int MPI_Scatter(&sendbuf, sendcnts, sendtype, &recvbuf, recvcnts, recvtype, root, comm );
• Fortran – MPI_Scatter(sendbuf,sendcnts,sendtype, recvbuf,recvcnts,recvtype,root,comm,ierror)
• Parameters– Sendbuf is an array of size (number processors*sendcnts)– Sendcnts number of elements sent to each processor– Recvcnts number of elements obtained from the root processor – Recvbuf elements obtained from the root processor, may be an
array
Scatter Operation using MPI_Scatter
• Scatter with Sendcnts = 2
A(0) A(2) A(4) . . . A(2N-2) A(1) A(3) A(5) . . . A(2N-1)
P0 P1 P2 . . . Pn-1
B(0) B(O) B(0) B(0)B(1) B(1) B(1) B(1)
Goes to processors:
Data in an array on root node:
Gather Operation using MPI_Gather
• Used to collect data from all processors to the root, inverse of scatter
• Data is collected into an array on root processor
A(0) A(1) A(2) . . . A(N-1)
P0 P1 P2 . . . Pn-1
A A A . . . AData from variousProcessors:
Goes to an array on root node:
MPI_Gather
• C – int MPI_Gather(&sendbuf,sendcnts, sendtype, &recvbuf, recvcnts,recvtype,root, comm );
• Fortran – MPI_Gather(sendbuf,sendcnts,sendtype, recvbuf,recvcnts,recvtype,root,comm,ierror)
• Parameters– Sendcnts # of elements sent from each processor– Sendbuf is an array of size sendcnts– Recvcnts # of elements obtained from each processor– Recvbuf of size Recvcnts*number of processors
Reduction Operations
• Used to combine partial results from all processors
• Result returned to root processor
• Several types of operations available
• Works on 1 or 2d arrays
MPI routine is MPI_Reduce
• C – int MPI_Reduce(&sendbuf, &recvbuf, count, datatype, operation,root, communicator)
• Fortran – call MPI_Reduce(sendbuf, recvbuf, count, datatype, operation,root, communicator, ierr)
• Parameters– Like MPI_Bcast, a root is specified. – Operation is a type of mathematical operation
Operations for MPI_Reduce MPI_MAX Maximum MPI_MIN Minimum MPI_PROD Product MPI_SUM Sum MPI_LAND Logical and MPI_LOR Logical or MPI_LXOR Logical exclusive or MPI_BAND Bitwise and MPI_BOR Bitwise or MPI_BXOR Bitwise exclusive or MPI_MAXLOC Maximum value and location MPI_MINLOC Minimum value and location
Global Sum with MPI_Reduce C
double sum_partial, sum_global;sum_partial = ...;
ierr = MPI_Reduce(&sum_partial, &sum_global, 1, MPI_DOUBLE_PRECISION, MPI_SUM,root, MPI_COMM_WORLD);Fortran
double precision sum_partial, sum_global sum_partial = ... call MPI_Reduce(sum_partial, sum_global, 1, MPI_DOUBLE_PRECISION, MPI_SUM,root, MPI_COMM_WORLD, ierr)
Global Sum with MPI_Reduce and 2d array
A0+A1+A2 B0+B1+B2 C0+C1+C2NODE 0NODE 1NODE 2
X(0) X(1) X(2)
A0 B0 C0A1 B1 C1A2 B2 C2
NODE 0NODE 1NODE 2
X(0) X(1) X(2)
All Gather and All Reduce
• Gather and Reduce come in an "ALL" variation
• Results are returned to all processors
• The root parameter is missing from the call
• Similar to a gather or reduce followed by a broadcast
Global Sum with MPI_AllReduce and 2d array
A0 B0 C0A1 B1 C1A2 B2 C2
X(0) X(1) X(2)
NODE 0 NODE 1 NODE 2
A0+A1+A2 B0+B1+B2 C0+C1+C2A0+A1+A2 B0+B1+B2 C0+C1+C2A0+A1+A2 B0+B1+B2 C0+C1+C2
X(0) X(1) X(2)
NODE 0 NODE 1 NODE 2
All to All communication with MPI_Alltoall
• Each processor sends and receives data to/from all others
• C – int MPI_Alltoall(&sendbuf,sendcnts, sendtype, &recvbuf, recvcnts, recvtype, root, MPI_Comm);
• Fortran – MPI_ MPI_Alltoall(sendbuf,sendcnts,sendtype, recvbuf,recvcnts,recvtype,root,comm,ierror)
All to All with MPI_Alltoall
• Parameters– Sendcnts # of elements sent to each processor– Sendbuf is an array of size sendcnts– Recvcnts # of elements obtained from each processor– Recvbuf of size Recvcnts*number of processors
• Note that both send buffer and receive buffer must be an array of size of the number of processors
The dreaded “V” or variable or operators
• A collection of very powerful but difficult to setup global communication routines
• MPI_Gatherv: Gather different amounts of data from each processor to the root processor
• MPI_Alltoallv: Send and receive different amounts of data form all processors
• MPI_Allgatherv: Gather different amounts of data from each processor and send all data to each
• MPI_Scatterv: Send different amounts of data to each processor from the root processor
Summary• MPI is used to create parallel programs based on
message passing• Usually the same program is run on multiple
processors• The 6 basic calls in MPI are:
– MPI_INIT( ierr )– MPI_COMM_RANK( MPI_COMM_WORLD, myid, ierr )– MPI_COMM_SIZE( MPI_COMM_WORLD, numprocs, ierr )– MPI_Send(buffer, count,MPI_INTEGER,destination, tag,
MPI_COMM_WORLD, ierr)– MPI_Recv(buffer, count, MPI_INTEGER,source,tag,
MPI_COMM_WORLD, status,ierr)– call MPI_FINALIZE(ierr)
Job Management inHPC Clusters
• Interactive and Batch Jobs
• Common Resource Management Systems: PBS, LSF, Condor
• Queues and Job Specification
• Sample PBS and LSF scripts
Job Management inHPC Clusters
• Interactive and Batch Jobs
Interactive Mode is useful for debugging, performance tuning and profiling of applications
Batch Mode is for production runs
Most clusters are configured with a number of Queues for submitting and running batch jobs
Batch jobs are submitted via scripts specific to the resource management system deployed
Example of Batch Queuesabose@nyx:~> qstat -qserver: nyx.engin.umich.edu
Queue Memory CPU Time Walltime Node Run Que Lm State
short -- -- 24:00:00 -- 0 122 -- E Rvs -- -- -- -- 0 0 -- E Rstaff -- -- -- -- 0 0 -- E Ratlas -- -- 30:00:00 4 0 0 -- E Rpowell -- -- -- -- 3 0 -- E Rlong -- -- 336:00:0 -- 47 29 -- E Rlandau -- -- -- -- 14 6 -- E Rdebug -- -- -- -- 0 0 -- E Rroute -- -- -- -- 0 0 -- E Rcoe1 -- -- -- -- 18 2 -- E R • Can assign maxm number of CPUs, maxm wall time etc. to specific queues
An Example PBS Script #PBS -N pbstrial # job name#PBS -l nodes=2,walltime=24:00:00 # number of CPUs, walltime#PBS -S /bin/sh#PBS -q npaci # queue name#PBS -M [email protected]#PBS -m abe # notification level request
#PBS -j oe#export GMPICONF=/home/abose/.gmpi/$PBS_JOBIDecho "I ran on `hostname`"##cd to your execution directory firstcd ~scp [email protected]:/home/abose/emin.cpp em##use mpirun to run my MPI binary with 4 CPUs for 1 hourmpirun -np 4 ~/a.out # run executable
An Example LSF Script #!/bin/ksh## LSF batch script to run the test MPI code#
#BSUB -P 03152005 # Project Number#BSUB -a mpich_gm # select the mpich-gm elim#BSUB -x # exclusive use of node (not_shared)#BSUB -n 2 # number of total tasks#BSUB -R "span[ptile=1]" # run 1 tasks per node#BSUB -J mpilsf.test # job name#BSUB -o mpilsf.out # output filename#BSUB -e mpilsf.err # error filename #BSUB -q regular # queue
# Fortran examplempif90 -o mpi_samp_f mpisamp.fmpirun.lsf ./mpi_samp_f
# C examplempicc -o mpi_samp_c mpisamp.cmpirun.lsf ./mpi_samp_c
# C++ examplempicxx -o mpi_samp_cc mpisamp.cc
mpirun.lsf ./mpi_samp_cc # run executable
The ability of a code to restart from the point of last execution or interruption.
Very useful for timeshared systems such as Clusters and Grids
High-end systems provide checkpointing in the OS and hardware, but on clusters/desktops, we have to checkpoint on our own.
You should checkpoint:(1) you have long-running jobs, and you usually run out of allocated queue time.
(2) you can restart the job from last saved global state (not always possible) – it is quite common for time-dependent simulations. (3) you want to protect yourself from hardware faults/lost CPU cycles
How often should one checkpoint ? Totally depends on the application and the user
Application Checkpointing
Find out how much time is left in a queue: use ctime/etime/dtime type timer routines in the code periodically to check for elapsed time. If close to queue limit, write data to disk from all processes.
An efficient way of writing checkpointed data: keep two different files A & B. File A is the checkpointed data from last write. First, write to File B. If write is successful, then replace A with B.
If you use the same file to write over and over again, a system crash during write will lose all checkpointed data so far. You can also combine files from multiple processes into a single file. In that case, name process files with the process id attached (simplifies management). We will talk about this in the MPI presentation.
How do you decide for a common checkpointed state among all processes ?
Application Checkpointing
Distributed Checkpointing:
In the distributed/message passing model, issue a broadcast to all processes to checkpoint the data at specific points in the code.
Or, use a distributed checkpointing algorithm when program logic does not lend to the above.
e.g. Read the paper: “A Survey of Checkpointing Algorithms for Parallel and Distributed Computers” by Kalaiselvi and Rajaraman. (google)
Simpler “quick-n-dirty” methods based on token passing also works although may not be very robust. (works most of the time)
Contact me at [email protected] if you wish to know more
Application Checkpointing
A Bioinformatics Introduction to Cluster Computing part II
By Andrew D. Boyd, MDResearch Fellow
Michigan Center for Biological InformationDepartment of Psychiatry,
University of Michigan Health SystemAnd
Abhijit Bose, PhDAssociate Director
Michigan Grid Research and Infrastructure Development
Computational Challenges
• Most scientific computationshave a parallel and a serialaspects
• Clusters tend to speed up theparallel aspect the most
• Serial computations decrease the ability to speed up the computation on a cluster
• One can take the data set and brake it up into smaller pieces
• Example: SETI@HOME– Gave everyone a small piece of the radio
frequency to compute on home computer
Parallelize the computation of a large data set
Serial Computations
• Less cost efficient with large interrelated problems that are hard to uncouple
• Less cost efficient when cost of communicating between the nodes exceeds the savings from distributing the computation load.
• Example: Molecular Dynamics– CHARMM and XPLOR
• Software programs developed for Serial Machines– NAMD
• Designed to run efficiently on such parallel machines for simulating large molecules
• Used Charm++, a parallel C++ library to parallelize the code
Large Genome vs Genome searches
• BLAT, BLAST, FASTA, SSAHA– All perform sequence matching– All have difference performance speed and
sensitivity – Some have higher memory requirements than
others
• However relatively easy to parallelize– Divide up the list of sequences you are
searching into smaller pieces.
A simple Bioinformatics Example
• Re-examine the labels of the Affemetrix probe set
• Desire to know other matches with other genes within organism
• Especially as Unigene changes every few months– 500,000 probes to search against the
database
Time Savings of a cluster
• On a single processor at 40 seconds per sequence 231 days to process all 500,000 probes or 57 days at 10 seconds per sequence
• On biosys1 (an old cluster) the job took approximately 2 weeks
• On morpheus (a new cluster) the job took approximately 4 days
One Cluster Computing Method
• Take the .5 million affy probe sequences and divide into 90 files
• submit multiple single BLAST execution with 5500 sequences to the que
• Allow the scheduler to dynamically allocate the jobs to the unused nodes
• Easy to code in perl – Will walk through the code later
Another Cluster Computer method for sequences mpiBLAST
• One concern is the memory of the computers compared to the size of the database
• One could take the database and break it up into smaller pieces and have the .5 million affy probes search against the smaller pieces of the database on nodes of a cluster
mpiBLAST
• Uses the mpi libraries to pass messages between the nodes
• One master node and a lot of slave nodes• Each slave node is assigned a part of the
database• If more database parts than nodes then the
master node dynamically allocates the database pieces
• At the end will recalculate the statistics for the complete database
mpiBLAST
• After initial testing program did not scale beyond 10 nodes
• Poor reliability in completion of BLAST runs• Early versions had very little error checking
– So if one node had an error the complete run was lost– If message between nodes was lost the complete run
was lost– Poor error exiting, mpiBLAST would never finish if
error came up had to be manually killed
Scalability of mpiBLAST
Time(sec)
0
5000
10000
15000
20000
25000
0 5 10 15 20 25 30
Number of Processors
Tim
e in
Sec
on
ds
Time(sec)
Another Parallel bioinformatics example
• InterProScan – Developed by European Bioinformatics Institute (EBI)– Performs 11 protein domain comparisons for a amino
acid sequence and creates a single report– The code is all developed in PERL– Like BLAST can submit sequences via the EBI web
site but for larger number of sequences the applications software can be download
InterProScan
• Their approach to the solution was to take the input sequence and divide it into chunks.
• Then each of the 11 programs has a script configured to run on a specific chunk
• A separate script maintains how many processes are being submitted to the cluster and as one finishes another is submitted
InterProScan
• Assumptions built into the model– The number of nodes you will be running on
will be the same and dedicated to you for the whole run
– InterProScan takes over the job of the cluster scheduler
Sample Code to divide an input file and Execute multiple instances of
BLAST• 3 perl scripts
– Hgdivide.pl– demomaker.pl– multiqsub.pl
• Could have only one script if truly desired
• Could build more error checking into the program as well
Hgdivide.pl• [adboyd@head input]$ cat Hgdivide.pl• #!/usr/bin/perl -w• # divide the affy probe set into files of 1000 probes per file
• open(IN, "<demoinput") || die("can't open out file"); #open input file• $j = 1; # index for file number• open(OUT, ">demoin".$j."") || die("can't open out file");• $k = 101; # number of lines in each input file• $i = 1; # line number• while ($line = <IN>) { # read line of input file• if ($i < $k) {print OUT $line}• # print line of file if not reach end of new input file• if ($i >= $k) {$i = 1; ++$j; open(OUT, ">demoin".$j.""); print OUT $line} ++$i;• # if at the end of the new file, reset index, add one to file number• # open new file• }
• close(OUT);• close(IN);
demomaker.pl• @num = (1..3); #number of scripts you want to make• foreach $chrom (@num) {• open(OUT, ">run".$chrom.".sh") || die "can't open file";• # script file name• print OUT ("#!/bin/tcsh\n");• # chose shell to run• print OUT ("cd affy\n");• # change directory to working directory• $input_file = "~/affy/input/demoin".$chrom."";• # input file and location• $input = "demoin".$chrom."";• # name of input file• $output_file = "~/affy/output/demo".$chrom.".out";• # output file for results of blast• $output2_file = "~/affy/output/demo".$chrom.".out2";• # maintence of output file• $scratch_dir = "/scratch";• # name of scratch directory on cluster• print OUT ("date > ".$output2_file."\n");• # print date to maintence file• print OUT ("hostname >> ".$output2_file."\n");• # print cluster node number to node file• print OUT ("/bin/cp -f Hs.seq.all.* ".$scratch_dir."\n");• # copy database to local node• print OUT ("echo 'db copied into scratch' >> ".$output2_file."\n");• # print status of copy to maintence file
demomaker.pl part 2• print OUT ("/bin/cp -f ".$input_file." ".$scratch_dir."\n");• # copy input file to local node• print OUT ("echo 'input copied into scratch' >> ".$output2_file."\n");• # print status of copy to maintence file• print OUT ("date >> ".$output2_file."\n");• # print time copying is done to maintence file• print OUT ("cd ".$scratch_dir."\n");• # move script to node directory• $cmd_line = "~/affy/blastall -i ".$input." -o ".$output_file." -p blastn -d Hs.seq.all -e 0.0001 -a 2 -U T ";• # commmand line to execute• print OUT ("echo '$cmd_line' >> ".$output2_file."\n");• # print command line to execute to maintence file• print OUT ($cmd_line."\n");• # run command line• print OUT ("echo Blastall finished >> ".$output2_file."\n");• # print status of Blast run to maintence file• print OUT ("/bin/rm -f Hs.seq.all.*\n");• # remove database from node• print OUT ("echo 'db removed from scratch' >> ".$output2_file."\n");• # print status of database removal to maintence file• print OUT ("/bin/rm -f ".$input."\n");• # remove input file for node• print OUT ("echo 'input removed from scratch' >> ".$output2_file."\n");• # print status of inputer removal from node• print OUT ("date >> ".$output2_file."\n");• # print time to maintence file• }
• close(OUT);
multiqsub.pl
• #!/usr/bin/perl
• #system call to submit to cluster
• $i=1; #index• while ($i < 4) {• system("qsub run".$i.".sh"); # system call
for each script• ++$i;• }
The joy of cluster computing
• This is the stone wall which you may encounter while working on a cluster
• This is “feature” not necessarily a bug
How software is maintained on a cluster
• Many clusters will allow you to install any software you desire
• Some clusters pre install common software like BLAST or InterProScan even databases– If multiple people are using the same database it is a
waste of memory to have it installed multiple times
Bring your own Software model
• Find out what libraries and compilers are supported on the cluster– Not everyone uses gcc– The latest version of the compiler you
need may not be installed– Find out the name of the scratch space
on the nodes, • scratch space is the area of the node you can
copy files to• Usually labeled /scratch
Software previously installed
• Find out what directory the software is located
• Find out what version they are running
• Find out if the software is on a drive that can be mounted to the compute nodes – Warning!!!! not all drives and directories
visible from the Head Node are mountable to the compute nodes
The importance of benchmarking
• Before submitting jobs to a cluster– First run a fraction of the job on head node try to figure
out how long the complete job will run– The program will probably not scale linearly for each
node added– However, if the individual executions will take longer
than 24 hours you will need to write a module to checkpoint your program
• There is an assumed direct relationship between the likelihood of a cluster failing and how long the program will run
The importance of troubleshooting• Some steps to take if the program
does not work on a cluster– First run a small job on the Head node– Second, if allowed, consider running in
interactive mode to see if errors aregenerated
• Interactive mode allows you to type commandline instructions on the individual node the program is executed on
• One can also read the error outputs directly• Just because a program runs on the head
node does not mean it will run on a compute node• If not performing normally contact your friendly
system administrator
The importance of troubleshooting part II
– Third submit one job to the scheduler• The scheduler should not modify the code if the commands
to the scheduler are not correct will not execute properly• Just because a program runs on the head node and the
compute node does not mean it will run after being assigned by the scheduler
• If not performing normally contact your friendly system administrator
– Fourth submit multiple jobs to scheduler• Just because a program runs on the head node, a compute
node, through the scheduler, does not mean it will run on every single node on a cluster
• While every node is supposed to be identical sometimes they are different
Information to bring to a System Administer when problems arise• What program/script were you trying to
use?• What command line parameters?• When did this happen?• What node/computer did this happen on?• What was the error message you received
if any?• If you have thoughts or clues to the
problem pass them on to the sys admin.
Errors that I have seen on clusters
• Software not properly installed by systems administrators on drives that can be mounted on the nodes– Results: no output from program, – Troubleshooting: Called system administrator, error was
reported to node which was inaccessible to user
• Scratch space permissions were changed by a previous user, node would not function– Results: Since that node finished first all subsequent jobs were
sent to that node and errored out, found out Monday morning that only half of the output was generated,
– Troubleshooting: tracked all of the uncompleted jobs to a single node from error output file
Errors that I have seen on clusters part II
• Complier version supported on cluster not version software was written– Results: Program failed to compile– Troubleshooting: emailed author of program, tried to install appropriate
compiler, ended up using binaries for operating system• Home storage disk space full, bioinformatics application tend to
take up more disk space than other applications– Results: Output from last twenty jobs were missing– Troubleshooting: looked to see if failed job runs were from a single
node, looked at disk space allocation, other users complaining as well• Programs errored out
– Results: Output from jobs stopped midway through run.– Troubleshooting: Looked to see how long program was running, (5
days), looked at input file did a brief calculation, would have taken 30 days to finish single job, no bench marking, revaluated experiment
Additional Information
• Vi short command reference:– http://linux.dbw.org/vi_short_ref.html
• BLAST– http://www.ncbi.nlm.nih.gov/BLAST/
• Sun Grid Engine sites– http://pbpl.physics.ucla.edu/Computing/
Beowulf_Cluster/Sun_Grid_Engine/
Acknowledgements • Brian Athey, Associate Professor• Abhijit Bose, Associate Director MGRID• Georgi Kostov, System Administrator• Chris Bliton, MCBI project manager• Fan Meng, Assistant Research Scientist• Joe Landman, Scalable Informatics• Jeff Ogden, IT project manager• Paul Trombley, graphic artist• Tom Hacker, Associate Director, Indiana Univ.• NPACI and SDSC