00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include "tagfifo.h"
00004 #include "fifo.h"
00005
00006 typedef struct {
00007 long tag;
00008 char *buf;
00009 } tagitem_t;
00010
00011 #define TRY(__try__,__catch__) if ((status = (__try__))) { \
00012 fprintf(stderr,"Error at %s, line %d: '"#__try__"' failed with status = %d\n", \
00013 __FILE__, __LINE__,status); \
00014 __catch__; \
00015 }
00016
00017 tqueue_t *tqueueInit(int qsize)
00018 {
00019 return (tqueue_t *) queueInit(qsize);
00020 }
00021
00022 int tqueueCork(tqueue_t *q)
00023 {
00024 return queueCork((queue_t *) q);
00025 }
00026
00027 int tqueueDelete(tqueue_t *q)
00028 {
00029 return queueDelete(q);
00030 }
00031
00032 int tqueueAdd(tqueue_t *q, long tag, char *in)
00033 {
00034 int status = 0;
00035 tagitem_t *item;
00036 item = malloc(sizeof(tagitem_t));
00037 item->tag = tag;
00038 item->buf = in;
00039
00040
00041 TRY( pthread_mutex_lock (q->mut), return status);
00042 while (q->full)
00043 TRY( pthread_cond_wait (q->notFull, q->mut), return status);
00044
00045
00046 q->buf[q->tail] = (char *)item;
00047 q->tail++;
00048 if (q->tail == q->qsize)
00049 q->tail = 0;
00050 if (q->tail == q->head)
00051 q->full = 1;
00052 q->empty = 0;
00053
00054
00055 TRY( pthread_mutex_unlock (q->mut), return status);
00056 status = pthread_cond_broadcast (q->notEmpty);
00057 return status;
00058 }
00059
00060 int tqueueDel(tqueue_t *q, long tag, char **out)
00061 {
00062 int status = 0;
00063 int stop;
00064 tagitem_t *item;
00065
00066 stop = 0;
00067 while (!stop)
00068 {
00069
00070 TRY( pthread_mutex_lock (q->mut), return status);
00071 while (q->empty)
00072 TRY( pthread_cond_wait (q->notEmpty, q->mut), return status);
00073
00074 item = (tagitem_t *) q->buf[q->head];
00075 if (item->tag == tag)
00076 {
00077
00078
00079 stop = 1;
00080 *out = item->buf;
00081 free(item);
00082 q->head++;
00083 if (q->head == q->qsize)
00084 q->head = 0;
00085 if (q->head == q->tail)
00086 q->empty = 1;
00087 q->full = 0;
00088
00089 TRY( pthread_mutex_unlock (q->mut), return status);
00090 TRY( pthread_cond_signal (q->notFull), return status);
00091 }
00092 else
00093 {
00094
00095
00096
00097 TRY( pthread_mutex_unlock (q->mut), return status);
00098 sched_yield();
00099 }
00100 }
00101 return status;
00102 }
00103
00104 int tqueueDelAny(tqueue_t *q, long *tag, char **out)
00105 {
00106 int status = 0;
00107 char *ptr;
00108 tagitem_t *item;
00109 status = queueDel((queue_t *) q, &ptr);
00110 item = (tagitem_t *) ptr;
00111 *tag = item->tag;
00112 *out = item->buf;
00113 free(item);
00114 return status;
00115 }
00116
00117