Badanie wydajności przesyłania komunikatów pomiędzy procesami

    -- Sebastian Pawlak, 2000.



Kod źródłowy pliku "serial.c":

/* Program bada wydajnosc przesylania message`ow pomiedzy procesami.
 * s1222, 2000.03.25 
 * Oto schemat ilustrujacy w jaki sposob przesylane sa message
 *
 *    +-----+ ----> +-----+ ----> +-----+ ----> +-----+
 *    |  A  +       |  B  |       |  B  |       |  C  |
 *    +-----+ <---- +-----+ <---- +-----+ <---- +-----+
 *
 * Program przyjmuje parametry:
 *  [1] - liczba iteracji (1 iteracja to cykl: A,B,..,B,C,B,..,B,A 
 *  [2] - liczba bajtow message`a
 */
#include <stdio.h>
#include "mpi.h"

/*#define debug
 */
int showWhatProcessDid(char *text, int x, int y);

void doSth(void)
{
    int i, j;
    for(i = 0; i < 100000 ; i++)
        for(j = 0; j < 500 ; j++)
	    ;
}

int main(int argc, char **argv)
{
    int rank, size, messageLength, liczbaIteracji, i;
    int tag = 55;
    char *bufor;
    MPI_Status status;
    double startTime, time;
    
    void *sendBufor;     /* bufor dla MPI_Bsend */
    int sendBuforSize;
    
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(size < 2) {
        printf("ten program musi miec przydzielone co najmniej 2 procesy !\n");
	MPI_Abort(MPI_COMM_WORLD, -1);
	MPI_Finalize();
	return -1;
    }
        
    if(argc < 3 && rank == 0) {
        printf("nalezy podac parametry:   [liczbaIteracji] [dlugoscMessage`a]\n");
	MPI_Abort(MPI_COMM_WORLD, -1);
	MPI_Finalize();
	return -1;
    }

    liczbaIteracji = atoi(argv[1]);
    if((bufor = (char *)malloc(messageLength = atoi(argv[2]))) == NULL) {
        printf("za malo dostepnej pamieci aby zainicjalizowac bufor message`a !\n");
	MPI_Abort(MPI_COMM_WORLD, -1);
	MPI_Finalize();
	return -1;
    }
    
    /* inicjalizowanie bufora dla MPI_Bsend */
    MPI_Pack_size(messageLength, MPI_CHAR, MPI_COMM_WORLD, &i);
    sendBuforSize = i + MPI_BSEND_OVERHEAD;
    sendBufor = (void *)malloc(sendBuforSize);
    MPI_Buffer_attach(sendBufor, sendBuforSize);
    
    MPI_Barrier(MPI_COMM_WORLD);   /* synchronizacja procesow */
    
    if(rank == 0) {                    /**** A ****/

        startTime = MPI_Wtime();
        for(i = 0; i < liczbaIteracji ; i++) {
	    #ifdef debug
	     showWhatDidProcessDo("* cykl nr %d\n", i, 0);
            #endif
	    doSth();
	    MPI_Bsend(bufor, messageLength, MPI_CHAR, 1, tag, MPI_COMM_WORLD);
	    #ifdef debug
	     showWhatDidProcessDo("A: nr %d wyslal do %d\n", 0, 1);
            #endif
	    MPI_Recv(bufor, messageLength, MPI_CHAR, 1, tag, MPI_COMM_WORLD, &status);
	    #ifdef debug
	     showWhatDidProcessDo("A: nr %d odebral od %d\n", 0, 1);
	    #endif
	}
	time = MPI_Wtime() - startTime;
	printf("%f\n", time);
	
    } else if(rank == size - 1)        /**** C ****/
    
        for(i = 0; i < liczbaIteracji ; i++) {
            MPI_Recv(bufor, messageLength, MPI_CHAR, size - 2, tag, MPI_COMM_WORLD, &status);
            #ifdef debug
	     showWhatDidProcessDo("C: nr %d odebral od %d\n", rank, size - 2);
            #endif
	    doSth();
	    MPI_Bsend(bufor, messageLength, MPI_CHAR, size - 2, tag, MPI_COMM_WORLD);
            #ifdef debug
	     showWhatDidProcessDo("C: nr %d wyslal do %d\n", rank, size - 2);
            #endif
	}
	
    else                             /**** B ****/
    
        for(i = 0; i < liczbaIteracji ; i++) {
            MPI_Recv(bufor, messageLength, MPI_CHAR, rank - 1, tag, MPI_COMM_WORLD, &status);
            #ifdef debug
	     showWhatDidProcessDo("B: nr %d odebral od %d\n", rank, rank - 1);
	    #endif
	    doSth();
	    MPI_Bsend(bufor, messageLength, MPI_CHAR, rank + 1, tag, MPI_COMM_WORLD);
	    #ifdef debug
	     showWhatDidProcessDo("B: nr %d wyslal do %d\n", rank, rank + 1);
	    #endif 

	    MPI_Recv(bufor, messageLength, MPI_CHAR, rank + 1, tag, MPI_COMM_WORLD, &status);
	    #ifdef debug
	     showWhatDidProcessDo("B: nr %d odebral od %d\n", rank, rank + 1);
	    #endif
	    doSth();
	    MPI_Bsend(bufor, messageLength, MPI_CHAR, rank - 1, tag, MPI_COMM_WORLD);
	    #ifdef debug
	     showWhatDidProcessDo("B: nr %d wyslal do %d\n", rank, rank - 1);
	    #endif
	}
    
    free(bufor);
    
    MPI_Finalize();
    
    return 0;
}

int showWhatProcessDid(char *text, int x, int y)
{
    printf(text, x, y);
    return 0;
}

Kod źródłowy pliku "parallel.c":

/* Program przesyla message`e pomiedzy poszczegolnymi procesami.
 * Dla 3 procesow przeslanie pojedynczego message: 0 -> 1, 1 -> 2,
 * 2 -> 1, 1 -> 0. Przesylanie odbywa sie potokowo - jednoczesnie,
 * przesylane jest kilka message`ow.
 * s1222, 2000.04.05 
 * Oto schemat ilustrujacy w jaki sposob przesylane sa message
 * (3 procesy i 2 message - przypadek ten nie ilustruje zlozonosci
 * algorytmu, ktora wystepuje dla wiekszej liczby message`ow)
 *
 *     0          1          2
 *     |          |          |      #  - message nr 1
 *     #          |          |
 *     # ---_     |          |      %  - message nr 2
 *     %     ---> #          |
 *     % ---_     # ---_     |    |
 *     |     ---> %     ---> #    |  C
 *     |          % ---__--- #    |  z
 *     |          # <--  --> %    |  a
 *     |     _--- #     _--- %    |  s
 *   x # <---     % <---     |   \|/
 *     #     _--- %          |    
 *   x % <---     |          |
 *     %          |          |
 *     |          |          |
 *
 * Program przyjmuje parametry:
 *  [1] - liczba message`ow do przeslania
 *  [2] - liczba bajtow message`a
 */
#include <stdio.h>
#include "mpi.h"

/*#define debug
 */

/* funkcja symuluje obciazenie procesu */
void doSth(void)
{
    int i, j;
    for(i = 0; i < 100000 ; i++)
        for(j = 0; j < 500 ; j++)
	    ;
}

int main(int argc, char **argv)
{
    int rank, size, messageLength, liczbaMessageow, i;
    int tag = 55;
    char *bufor;
    MPI_Status status;
    double startTime, time;
    char strona;       /* flaga - z ktorej strony odebrac message */

    void *sendBufor;   /* bufor dla MPI_Bsend */
    int sendBuforSize;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(size < 2) {
        printf("ten program musi miec przydzielone co najmniej 2 procesy !\n");
	MPI_Abort(MPI_COMM_WORLD, -1);
	MPI_Finalize();
	return -1;
    }
        
    if(argc < 3 && rank == 0) {
        printf("nalezy podac parametry:   [liczbaMessage`ow] [dlugoscMessage`a]\n");
	MPI_Abort(MPI_COMM_WORLD, -1);
	MPI_Finalize();
	return -1;
    }

    liczbaMessageow = atoi(argv[1]);
    if((bufor = (char *)malloc(messageLength = atoi(argv[2]))) == NULL) {
        printf("za malo dostepnej pamieci aby zainicjalizowac bufor message`a !\n");
	MPI_Abort(MPI_COMM_WORLD, -1);
	MPI_Finalize();
	return -1;
    }
    
    /* inicjalizowanie bufora dla MPI_BSend */
    MPI_Pack_size(messageLength, MPI_CHAR, MPI_COMM_WORLD, &i);
    sendBuforSize = size * i + size * MPI_BSEND_OVERHEAD;
    sendBufor = (void *)malloc(sendBuforSize);
    MPI_Buffer_attach(sendBufor, sendBuforSize);
	  
    MPI_Barrier(MPI_COMM_WORLD);   /* synchronizacja procesow */
    
    if(rank == 0) {                    /**** A ****/

        startTime = MPI_Wtime();

        for(i = 0; i < (liczbaMessageow < size ? liczbaMessageow : size) ; i++) {
            doSth(); 
	    MPI_Bsend(bufor, messageLength, MPI_CHAR, 1, tag, MPI_COMM_WORLD);
	    #ifdef debug
	     printf("A: nr %d wyslal do %d\n", 0, 1);
            #endif
	}
	
        for(i = 0; i < liczbaMessageow ; i++) {
	    MPI_Recv(bufor, messageLength, MPI_CHAR, 1, tag, MPI_COMM_WORLD, &status);
	    #ifdef debug
	     printf("A: nr %d odebral od %d\n", 0, 1);
	    #endif
	    if(i < liczbaMessageow - size) { /* czy mam jeszcze wysylac paczki */
	        doSth();
	        MPI_Bsend(bufor, messageLength, MPI_CHAR, 1, tag, MPI_COMM_WORLD);
	        #ifdef debug
		 printf("A: nr %d wyslal do %d\n", 0, 1);
                #endif
	    }
	}

	time = MPI_Wtime() - startTime;

	printf("%f\n", time);

    } else if(rank == size - 1)        /**** C ****/
    
        for(i = 0; i < liczbaMessageow ; i++) {
            MPI_Recv(bufor, messageLength, MPI_CHAR, size - 2, tag, MPI_COMM_WORLD, &status);
            #ifdef debug
	     printf("C: nr %d odebral od %d\n", rank, size - 2);
            #endif
	    doSth();
	    MPI_Bsend(bufor, messageLength, MPI_CHAR, size - 2, tag, MPI_COMM_WORLD);
            #ifdef debug
	     printf("C: nr %d wyslal do %d\n", rank, size - 2);
            #endif
	}
	
    else {                            /**** B ****/

        for(i = 0; i < (liczbaMessageow < size ? liczbaMessageow : size - rank) ; i++) {
	    MPI_Recv(bufor, messageLength, MPI_CHAR, rank - 1, tag, MPI_COMM_WORLD, &status);
            #ifdef debug
	     printf("B: nr %d odebral od %d\n", rank, rank - 1);
            #endif
	    doSth();
	    MPI_Send(bufor, messageLength, MPI_CHAR, rank + 1, tag, MPI_COMM_WORLD);
	    #ifdef debug
	     printf("B: nr %d wyslal do %d\n", rank, rank + 1);
	    #endif 
	}
    
        strona = 1;  /* 1 prawa,  -1 lewa */
        for(i = 0; i < liczbaMessageow * 2 - (liczbaMessageow <
                   size ? liczbaMessageow : size - rank); i++) {
	    if(i >= liczbaMessageow * 2 - (liczbaMessageow <
               size ? 2 * liczbaMessageow : 2*(size - rank)))
	        strona = 1;
            MPI_Recv(bufor, messageLength, MPI_CHAR, rank + strona, tag, MPI_COMM_WORLD, &status);
            #ifdef debug
	     showWhatDidProcessDo("B: nr %d odebral od %d\n", rank, rank + strona);
	    #endif
	    doSth();
	    MPI_Send(bufor, messageLength, MPI_CHAR, rank + (strona == 1 ? -1 : 1),
                     tag, MPI_COMM_WORLD);
	    #ifdef debug
	     showWhatDidProcessDo("B: nr %d wyslal do %d\n", rank,
                                  rank + (strona == 1 ? -1 : 1));
	    #endif 
	    strona == 1 ? (strona = -1) : (strona = 1);
	}
    }
    
    MPI_Buffer_detach(&sendBufor, &sendBuforSize);
    free(sendBufor);
    free(bufor);
    MPI_Finalize();
    
    return 0;
}

Kod źródłowy pliku "show.c":

/* Program demonstruje otwarcie okienka i kreslenie w nim.
 * s1222, 2000.03.28
 */
#include <stdio.h>
#include <stdlib.h>
#include "Xlib.h"
#include "Xutil.h"

struct wezel {
    int nrNode;
    double serialTime;
    double parallelTime;
};

int main(int argc, char **argv)
{
    Display *displayHandle;
    Window win;
    GC gc;
    int i;
    FILE *plik;
    char text[200], text2[20];
    int liczbaPomiarow;
    int nrNode;
    double serialTime, parallelTime;
    struct wezel pomiary[100];
    double maxSpeedup;
    int xOld, yOld, x, y;
    const char napis[] = "SPEEDUP (czas sekwencyjny / czas rownolegly)";
    
    if((plik = fopen("wynik.txt", "r")) == NULL ) {
        printf("brak pliku wynik.txt !\n");
	return -1;
    }
    
    fscanf(plik, "%s", text);
    strcpy(text2, &text[6]);
    liczbaPomiarow = atoi(text2);
    text[6] = '\0';
    if(strcmp(text, "wyniki")) {
        printf("plik wynik.txt ma zly format !\n");
	return -1;
    }    
    
    /* laczy sie z domyslnym serwerem X */
    displayHandle = XOpenDisplay((char *)NULL);
    if(displayHandle == NULL) {
        printf("blad otwarcia display`a\n");
        exit(-1);
    }
    
    /* tworzy okienko na display`u */
    win = XCreateSimpleWindow(displayHandle, RootWindow(displayHandle, 0),
                              0, 0, 800, 630, 2, BlackPixel(displayHandle, 0),
			      WhitePixel(displayHandle, 0));	
    gc = XCreateGC(displayHandle, win, 0, 0);
    XMapWindow(displayHandle, win);
    XFlush(displayHandle);
    sleep(1);
    
    XSetForeground(displayHandle, gc, BlackPixel(displayHandle, 0));
    
    XDrawLine(displayHandle, win, gc, 50, 20, 50, 570);
    XDrawLine(displayHandle, win, gc, 50, 570, 770, 570);
    XDrawString(displayHandle, win, gc, 400 - 7 * 7, 610, "Liczba procesow", 15);
    for(i = 0; i < strlen(napis) ; i++)
        XDrawString(displayHandle, win, gc, 4,
                    315 - strlen(napis) * 11 / 2 + i * 11, &napis[i], 1);
    
    XFlush(displayHandle);
    
    for(i = 0; i < liczbaPomiarow ; i++) {
        fscanf(plik, "%s", text);
        strcpy(text, &text[1]);
	nrNode = atoi(text);

        fscanf(plik, "%s", text);
        strcpy(text, &text[1]);
	serialTime = atof(text);

        fscanf(plik, "%s", text);
        strcpy(text, &text[1]);
	parallelTime = atof(text);
    	
        pomiary[i].nrNode = nrNode;
	pomiary[i].serialTime = serialTime;
	pomiary[i].parallelTime = parallelTime;
	
	if(serialTime / parallelTime > maxSpeedup)
	    maxSpeedup = serialTime / parallelTime;
	        
	printf("node: %d, serialTime: %f, parallelTime: %f\n",
	       nrNode, serialTime, parallelTime); 
    } 
    
    xOld = 50 + 720 / (liczbaPomiarow - 1) * 0;
    yOld = 570 - 550 / maxSpeedup * (pomiary[0].serialTime / pomiary[0].parallelTime);
    
    for(i = 0; i < liczbaPomiarow ; i++) {
        XDrawLine(displayHandle, win, gc, xOld, yOld,
	          x = 50 + 720 / (liczbaPomiarow - 1) * i,
		  y = 570 - 550 / maxSpeedup * 
	          (pomiary[i].serialTime / pomiary[i].parallelTime));
	xOld = x;
	yOld = y;
	
        XDrawLine(displayHandle, win, gc, 48, 570 - 550 / maxSpeedup * 
	          (pomiary[i].serialTime / pomiary[i].parallelTime),
		  52, 570 - 550 / maxSpeedup * 
	          (pomiary[i].serialTime / pomiary[i].parallelTime));
	sprintf(text, "%.1f", pomiary[i].serialTime / pomiary[i].parallelTime);
        XDrawString(displayHandle, win, gc, 44 - strlen(text) * 7,
	            570 - 550 / maxSpeedup * 
	            (pomiary[i].serialTime / pomiary[i].parallelTime) + 4,
		    text, strlen(text));
	
        XDrawLine(displayHandle, win, gc, 50 + 720 / (liczbaPomiarow - 1) * i, 568,
	          50 + 720 / (liczbaPomiarow - 1) * i, 572);
        sprintf(text, "%d", pomiary[i].nrNode);
	XDrawString(displayHandle, win, gc,
	            50 + 720 / (liczbaPomiarow - 1) * i - strlen(text) * 7 / 2,
		    586, text, strlen(text));

    }
    
    XFlush(displayHandle);
    getchar();    

    XCloseDisplay(displayHandle);
    fclose(plik);
    
    return 0;
}
w3cw3c
automatyka przemysłowa