00001 #include "fifo.h"
00002 #include <stdio.h>
00003 #include <unistd.h>
00004 #include <stdlib.h>
00005 #include <string.h>
00006
00007
00008
00009
00010
00011
00012 #define TRY(__try__,__catch__) if ((status = (__try__))) { \
00013 fprintf(stderr,"Error at %s, line %d: '"#__try__"' failed with status = %d\n", \
00014 __FILE__, __LINE__,status); \
00015 fprintf(stderr, "%s\n", strerror(status));\
00016 __catch__; \
00017 }
00018
00019
00020 queue_t *queueInit(int qsize)
00021 {
00022 int status = 0;
00023 queue_t *q;
00024
00025 q = (queue_t *)malloc (sizeof (queue_t));
00026 if (q == NULL) return (NULL);
00027 q->qsize = qsize;
00028 q->buf = (char **) malloc (qsize*sizeof(char *));
00029 q->empty = 1;
00030 q->full = 0;
00031 q->flush = 0;
00032 q->head = 0;
00033 q->tail = 0;
00034 q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
00035 TRY( pthread_mutex_init (q->mut, NULL), return NULL);
00036 q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
00037 TRY( pthread_cond_init (q->notFull, NULL), return NULL);;
00038 q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
00039 TRY( pthread_cond_init (q->notEmpty, NULL), return NULL);;
00040
00041 return (q);
00042 }
00043
00044
00045 int queueDelete(queue_t *q)
00046 {
00047 int status = 0;
00048 TRY( pthread_mutex_destroy(q->mut), return status);
00049 free(q->mut);
00050 TRY( pthread_cond_destroy(q->notFull), return status);
00051 free(q->notFull);
00052 TRY( pthread_cond_destroy(q->notEmpty), return status);
00053 free(q->notEmpty);
00054 free(q->buf);
00055 free(q);
00056 return 0;
00057 }
00058
00059
00060
00061
00062 int queueCork(queue_t *q)
00063 {
00064 int empty;
00065 int status;
00066 TRY( pthread_mutex_lock (q->mut), return status);
00067 if (q->empty)
00068 empty = 1;
00069 else
00070 {
00071 q->flush = 1;
00072 q->full = 1;
00073 empty = 0;
00074 }
00075 TRY( pthread_mutex_unlock (q->mut), return status);
00076 return empty;
00077 }
00078
00079
00080
00081 int queueAdd(queue_t *q, char *in)
00082 {
00083 int status = 0;
00084
00085 TRY( pthread_mutex_lock (q->mut), return status);
00086 while (q->full)
00087 TRY( pthread_cond_wait (q->notFull, q->mut), return status) ;
00088
00089
00090 q->buf[q->tail] = in;
00091 q->tail++;
00092 if (q->tail == q->qsize)
00093 q->tail = 0;
00094 if (q->tail == q->head)
00095 q->full = 1;
00096 q->empty = 0;
00097
00098
00099 TRY( pthread_mutex_unlock (q->mut), return status);
00100 TRY( pthread_cond_signal (q->notEmpty), return status);
00101 return 0;
00102 }
00103
00104
00105 int queueDel(queue_t *q, char **out)
00106 {
00107 int status = 0;
00108
00109 TRY( pthread_mutex_lock (q->mut), return status);
00110 while (q->empty)
00111 TRY( pthread_cond_wait (q->notEmpty, q->mut), return status);
00112
00113
00114 *out = q->buf[q->head];
00115 q->head++;
00116 if (q->head == q->qsize)
00117 q->head = 0;
00118 if (q->head == q->tail)
00119 q->empty = 1;
00120
00121 if (!q->flush)
00122 {
00123 q->full = 0;
00124
00125 TRY( pthread_cond_signal (q->notFull), return status);
00126 }
00127 else
00128 {
00129 if (q->empty)
00130 {
00131 q->flush = 0;
00132 q->full = 0;
00133
00134 TRY( pthread_mutex_unlock (q->mut), return status);
00135 TRY( pthread_cond_signal (q->notFull), return status);
00136 return 1;
00137 }
00138 }
00139 TRY( pthread_mutex_unlock (q->mut), return status);
00140 return 0;
00141 }
00142
00143
00144
00145
00146
00147
00148
00149
00150 static void *producer(void *q)
00151 {
00152 queue_t *fifo;
00153 int (*pfunc)(char **buf);
00154 int stop;
00155 char *buf;
00156
00157 fifo = (queue_t *)((char **)q)[0];
00158 pfunc = (int (*)(char **)) ((char **)q)[1];
00159 stop = 0;
00160 while (!stop)
00161 {
00162
00163 stop = (*pfunc)(&buf);
00164
00165 queueAdd (fifo, buf);
00166 }
00167 return (NULL);
00168 }
00169
00170
00171 static void *consumer(void *q)
00172 {
00173 queue_t *fifo;
00174 int (*cfunc)(char *buf);
00175 int stop;
00176 char *buf;
00177 int count = 0, status;
00178
00179 fifo = (queue_t *)((char **)q)[0];
00180 cfunc = (int (*)(char *)) ((char **)q)[1];
00181 stop = 0;
00182 while (!stop && count++<3)
00183 {
00184
00185 status = queueDel (fifo, &buf);
00186 printf("consumer status = %d\n",status);
00187
00188 stop = (*cfunc)(buf);
00189 }
00190 queueCork (fifo);
00191 printf("Corked queue.\n");
00192 while (!stop)
00193 {
00194
00195 status = queueDel (fifo, &buf);
00196 printf("consumer status = %d\n",status);
00197
00198 stop = (*cfunc)(buf);
00199 }
00200
00201 return (NULL);
00202 }
00203
00204
00205
00206
00207
00208
00209
00210
00211 int fifomain(int qsize, int (*pfunc)(char **buf), int (*cfunc)(char *buf))
00212 {
00213 queue_t *fifo;
00214 pthread_t pro, con;
00215 void *proarg[2], *conarg[2];
00216
00217 fifo = queueInit (qsize);
00218 if (fifo == NULL) {
00219 fprintf (stderr, "fifomain: Queue Init failed.\n");
00220 return 1;
00221 }
00222 proarg[0] = fifo;
00223 proarg[1] = pfunc;
00224 pthread_create (&pro, NULL, producer, proarg);
00225 conarg[0] = fifo;
00226 conarg[1] = cfunc;
00227 pthread_create (&con, NULL, consumer, conarg);
00228 pthread_join (pro, NULL);
00229 pthread_join (con, NULL);
00230 queueDelete (fifo);
00231
00232 return 0;
00233 }