]> AND Private Git Repository - loba.git/blob - sync_queue.h
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Cosmetics: rename message types, INFO -> CTRL, and LOAD -> DATA.
[loba.git] / sync_queue.h
1 #ifndef SYNC_QUEUE_H
2 #define SYNC_QUEUE_H
3
4 #if __GNUC__ == 4 && __GNUC_MINOR__ == 4
5 #  include <cstdatomic>         // <atomic> is named <cstdatomic> in gcc 4.4
6
7 template<typename _Tp>          // fix missing definition in gcc 4.4
8 void
9 atomic<_Tp*>::store(_Tp* __v, memory_order __m) volatile
10 { atomic_address::store(__v, __m); }
11
12 #else
13 #  include <atomic>
14 #endif
15
16 #define SYNC_QUEUE_BUFSIZE 16
17
18 template <typename T>
19 class sync_queue {
20 public:
21     sync_queue()
22     {
23         head_node = tail_node = new node();
24         head.store(head_node->values);
25         tail.store(tail_node->values);
26     }
27
28     ~sync_queue()
29     {
30         node* n = head_node;
31         while (n != NULL) {
32             node* prev = n;
33             n = n->next;
34             delete prev;
35         }
36     }
37
38     bool empty() const
39     {
40         return head.load() == tail.load();
41     }
42
43     // size() is not not thread-safe
44     size_t size() const
45     {
46         size_t count = 0;
47         if (head_node == tail_node) {
48             count = tail.load() - head.load();
49         } else {
50             count =
51                 (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load();
52             for (node* n = head_node->next; n != tail_node; n = n->next)
53                 count += SYNC_QUEUE_BUFSIZE;
54             count += tail.load() - tail_node->values;
55         }
56         return count;
57     }
58
59     bool push(const T& val)
60     {
61         T* old_tail = tail.load();
62         T* new_tail;
63         if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
64             tail_node->next = new node();
65             tail_node = tail_node->next;
66             new_tail = tail_node->values;
67         } else {
68             new_tail = old_tail + 1;
69         }
70         *new_tail = val;
71         tail.store(new_tail);
72         return (old_tail == head.load());
73     }
74
75     bool try_pop(T& res)
76     {
77         T* old_head = head.load();
78         if (old_head == tail.load()) // empty?
79             return false;
80
81         T* new_head;
82         if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
83             node* old_head_node = head_node;
84             head_node = head_node->next;
85             delete old_head_node;
86             new_head = head_node->values;
87         } else {
88             new_head = old_head + 1;
89         }
90         res = *new_head;
91         head.store(new_head);
92         return true;
93     }
94
95 private:
96     struct node {
97         node(): next(NULL) { }
98         T values[SYNC_QUEUE_BUFSIZE];
99         node* next;
100     };
101
102     node* head_node;
103     node* tail_node;
104     std::atomic<T*> head;
105     std::atomic<T*> tail;
106 };
107
108 #endif // !SYNC_QUEUE_H
109
110 // Local variables:
111 // mode: c++
112 // End: