1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
3 * (C) 2014 by Argonne National Laboratory.
4 * See COPYRIGHT in top-level directory.
7 /* Test case from John Bent (ROMIO req #835)
8 * Aggregation code was not handling certain access patterns when collective
11 /* Uses nonblocking collective I/O.*/
20 #define OBJ_SIZE 1048576
23 extern int optind, opterr, optopt;
29 static void Usage(int line)
32 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
35 "Usage (line %d): %s [-d] [-h] -f filename\n"
36 "\t-d for debugging\n"
37 "\t-h to turn on the hints to force collective aggregation\n", line, prog);
42 static void fatal_error(int mpi_ret, MPI_Status * mpi_stat, const char *msg)
44 fprintf(stderr, "Fatal error %s: %d\n", msg, mpi_ret);
45 MPI_Abort(MPI_COMM_WORLD, -1);
48 static void print_hints(int rank, MPI_File * mfh)
56 MPI_Barrier(MPI_COMM_WORLD);
58 MPI_File_get_info(*mfh, &info);
59 MPI_Info_get_nkeys(info, &nkeys);
62 for (i = 0; i < nkeys; i++) {
63 MPI_Info_get_nthkey(info, i, key);
64 printf("%35s -> ", key);
65 MPI_Info_get(info, key, 1024, value, &dummy_int);
66 printf("%s\n", value);
70 MPI_Barrier(MPI_COMM_WORLD);
73 static void fill_buffer(char *buffer, int bufsize, int rank, MPI_Offset offset)
75 memset((void *) buffer, 0, bufsize);
76 snprintf(buffer, bufsize, "Hello from %d at %lld\n", rank, offset);
79 static MPI_Offset get_offset(int rank, int num_objs, int obj_size, int which_obj)
82 offset = (MPI_Offset) rank *num_objs * obj_size + which_obj * obj_size;
86 static void write_file(char *target, int rank, MPI_Info * info)
95 request = (MPI_Request *) malloc(NUM_OBJS * sizeof(MPI_Request));
96 mpi_stat = (MPI_Status *) malloc(NUM_OBJS * sizeof(MPI_Status));
97 buffer = (char **) malloc(NUM_OBJS * sizeof(char *));
100 printf("%d writing file %s\n", rank, target);
102 if ((mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
103 MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh))
105 fatal_error(mpi_ret, NULL, "open for write");
108 /* nonblocking collective write */
109 for (i = 0; i < NUM_OBJS; i++) {
110 MPI_Offset offset = get_offset(rank, NUM_OBJS, OBJ_SIZE, i);
111 buffer[i] = (char *) malloc(OBJ_SIZE);
112 fill_buffer(buffer[i], OBJ_SIZE, rank, offset);
114 printf("%s", buffer[i]);
115 if ((mpi_ret = MPI_File_iwrite_at_all(wfh, offset, buffer[i], OBJ_SIZE,
116 MPI_CHAR, &request[i]))
118 fatal_error(mpi_ret, NULL, "write");
123 print_hints(rank, &wfh);
125 MPI_Waitall(NUM_OBJS, request, mpi_stat);
127 if ((mpi_ret = MPI_File_close(&wfh)) != MPI_SUCCESS) {
128 fatal_error(mpi_ret, NULL, "close for write");
131 printf("%d wrote file %s\n", rank, target);
133 for (i = 0; i < NUM_OBJS; i++)
140 static int reduce_corruptions(int corrupt_blocks)
144 if ((mpi_ret = MPI_Reduce(&corrupt_blocks, &sum, 1, MPI_INT, MPI_SUM, 0,
145 MPI_COMM_WORLD)) != MPI_SUCCESS) {
146 fatal_error(mpi_ret, NULL, "MPI_Reduce");
151 static void read_file(char *target, int rank, MPI_Info * info, int *corrupt_blocks)
155 MPI_Request *request;
156 MPI_Status *mpi_stat;
160 char **verify_buf = NULL;
162 offset = (MPI_Offset *) malloc(NUM_OBJS * sizeof(MPI_Offset));
163 request = (MPI_Request *) malloc(NUM_OBJS * sizeof(MPI_Request));
164 mpi_stat = (MPI_Status *) malloc(NUM_OBJS * sizeof(MPI_Status));
165 buffer = (char **) malloc(NUM_OBJS * sizeof(char *));
166 verify_buf = (char **) malloc(NUM_OBJS * sizeof(char *));
169 printf("%d reading file %s\n", rank, target);
171 if ((mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, MPI_MODE_RDONLY,
172 *info, &rfh)) != MPI_SUCCESS) {
173 fatal_error(mpi_ret, NULL, "open for read");
176 /* nonblocking collective read */
177 for (i = 0; i < NUM_OBJS; i++) {
178 offset[i] = get_offset(rank, NUM_OBJS, OBJ_SIZE, i);
179 buffer[i] = (char *) malloc(OBJ_SIZE);
180 verify_buf[i] = (char *) malloc(OBJ_SIZE);
181 fill_buffer(verify_buf[i], OBJ_SIZE, rank, offset[i]);
183 printf("Expecting %s", verify_buf[i]);
184 if ((mpi_ret = MPI_File_iread_at_all(rfh, offset[i], buffer[i],
185 OBJ_SIZE, MPI_CHAR, &request[i]))
187 fatal_error(mpi_ret, NULL, "read");
191 MPI_Waitall(NUM_OBJS, request, mpi_stat);
194 for (i = 0; i < NUM_OBJS; i++) {
195 if (memcmp(verify_buf[i], buffer[i], OBJ_SIZE) != 0) {
197 printf("Corruption at %lld\n", offset[i]);
199 printf("\tExpecting %s\n" "\tRecieved %s\n", verify_buf[i], buffer[i]);
204 if ((mpi_ret = MPI_File_close(&rfh)) != MPI_SUCCESS) {
205 fatal_error(mpi_ret, NULL, "close for read");
208 for (i = 0; i < NUM_OBJS; i++) {
219 static void set_hints(MPI_Info * info)
221 MPI_Info_set(*info, "romio_cb_write", "enable");
222 MPI_Info_set(*info, "romio_no_indep_rw", "1");
223 MPI_Info_set(*info, "cb_nodes", "1");
224 MPI_Info_set(*info, "cb_buffer_size", "4194304");
229 set_hints(MPI_Info *info, char *hints) {
230 char *delimiter = " ";
231 char *hints_cp = strdup(hints);
232 char *key = strtok(hints_cp, delimiter);
235 val = strtok(NULL, delimiter);
236 if (debug) printf("HINT: %s = %s\n", key, val);
240 MPI_Info_set(*info, key, val);
241 key = strtok(NULL, delimiter);
247 int main(int argc, char *argv[])
249 int nproc = 1, rank = 0;
254 int corrupt_blocks = 0;
256 MPI_Init(&argc, &argv);
257 MPI_Comm_size(MPI_COMM_WORLD, &nproc);
258 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
260 if ((mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
262 fatal_error(mpi_ret, NULL, "MPI_info_create.\n");
265 prog = strdup(argv[0]);
268 while ((c = getopt(argc, argv, "df:h")) != EOF) {
274 target = strdup(optarg);
288 target = (char*)"testfile";
292 write_file(target, rank, &info);
293 read_file(target, rank, &info, &corrupt_blocks);
295 corrupt_blocks = reduce_corruptions(corrupt_blocks);
297 if (corrupt_blocks == 0) {
298 fprintf(stdout, " No Errors\n");
301 fprintf(stdout, "%d/%d blocks corrupt\n", corrupt_blocks, nproc * NUM_OBJS);
304 MPI_Info_free(&info);