]> AND Private Git Repository - loba.git/blob - sync_queue.h
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Prepare next version.
[loba.git] / sync_queue.h
1 #ifndef SYNC_QUEUE_H
2 #define SYNC_QUEUE_H
3
4 #include "atomic_compat.h"
5
6 #define SYNC_QUEUE_BUFSIZE 16
7
8 template <typename T>
9 class sync_queue {
10 public:
11     sync_queue()
12     {
13         head_node = tail_node = new node();
14         head.store(head_node->values);
15         tail.store(tail_node->values);
16     }
17
18     ~sync_queue()
19     {
20         node* n = head_node;
21         while (n != NULL) {
22             node* prev = n;
23             n = n->next;
24             delete prev;
25         }
26     }
27
28     bool empty() const
29     {
30         return head.load() == tail.load();
31     }
32
33     // size() is not not thread-safe
34     size_t size() const
35     {
36         size_t count = 0;
37         if (head_node == tail_node) {
38             count = tail.load() - head.load();
39         } else {
40             count =
41                 (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load();
42             for (node* n = head_node->next; n != tail_node; n = n->next)
43                 count += SYNC_QUEUE_BUFSIZE;
44             count += tail.load() - tail_node->values;
45         }
46         return count;
47     }
48
49     bool push(const T& val)
50     {
51         T* old_tail = tail.load();
52         T* new_tail;
53         if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
54             tail_node->next = new node();
55             tail_node = tail_node->next;
56             new_tail = tail_node->values;
57         } else {
58             new_tail = old_tail + 1;
59         }
60         *new_tail = val;
61         tail.store(new_tail);
62         return (old_tail == head.load());
63     }
64
65     bool try_pop(T& res)
66     {
67         T* old_head = head.load();
68         if (old_head == tail.load()) // empty?
69             return false;
70
71         T* new_head;
72         if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
73             node* old_head_node = head_node;
74             head_node = head_node->next;
75             delete old_head_node;
76             new_head = head_node->values;
77         } else {
78             new_head = old_head + 1;
79         }
80         res = *new_head;
81         head.store(new_head);
82         return true;
83     }
84
85 private:
86     struct node {
87         node(): next(NULL) { }
88         T values[SYNC_QUEUE_BUFSIZE];
89         node* next;
90     };
91
92     node* head_node;
93     node* tail_node;
94     std::atomic<T*> head;
95     std::atomic<T*> tail;
96 };
97
98 #endif // !SYNC_QUEUE_H
99
100 // Local variables:
101 // mode: c++
102 // End: