]> AND Private Git Repository - loba.git/blobdiff - sync_queue.h
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Document a bug with parallel executions.
[loba.git] / sync_queue.h
index 357a72d8379a1c127b1499646fb4009aa18bf5a6..c3f29e566f0b849a32c21a92c4e140a6f6c46d44 100644 (file)
@@ -13,19 +13,21 @@ atomic<_Tp*>::store(_Tp* __v, memory_order __m) volatile
 #  include <atomic>
 #endif
 
+#define SYNC_QUEUE_BUFSIZE 16
+
 template <typename T>
 class sync_queue {
 public:
     sync_queue()
     {
-        node* n = new node(NULL);
-        head.store(n);
-        tail.store(n);
+        head_node = tail_node = new node();
+        head.store(head_node->values);
+        tail.store(tail_node->values);
     }
 
     ~sync_queue()
     {
-        node* n = head.load();
+        node* n = head_node;
         while (n != NULL) {
             node* prev = n;
             n = n->next;
@@ -42,43 +44,65 @@ public:
     size_t size() const
     {
         size_t count = 0;
-        for (node* n = head.load()->next; n != NULL; n = n->next)
-            ++count;
+        if (head_node == tail_node) {
+            count = tail.load() - head.load();
+        } else {
+            count =
+                (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load();
+            for (node* n = head_node->next; n != tail_node; n = n->next)
+                count += SYNC_QUEUE_BUFSIZE;
+            count += tail.load() - tail_node->values;
+        }
         return count;
     }
 
     bool push(const T& val)
     {
-        node* old_tail = tail.load();
-        node* n = new node(val);
-        old_tail->next = n;
-        tail.store(n);
+        T* old_tail = tail.load();
+        T* new_tail;
+        if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+            tail_node->next = new node();
+            tail_node = tail_node->next;
+            new_tail = tail_node->values;
+        } else {
+            new_tail = old_tail + 1;
+        }
+        *new_tail = val;
+        tail.store(new_tail);
         return (old_tail == head.load());
     }
 
     bool try_pop(T& res)
     {
-        node* old_head = head.load();
-        if (old_head == tail.load()) // empty?
+        T* old_head = head.load();
+        if (old_head == tail.load()) // empty?
             return false;
+
+        T* new_head;
+        if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+            node* old_head_node = head_node;
+            head_node = head_node->next;
+            delete old_head_node;
+            new_head = head_node->values;
         } else {
-            node* new_head = old_head->next;
-            head.store(new_head);
-            delete old_head;
-            res = new_head->value;
-            return true;
+            new_head = old_head + 1;
         }
+        res = *new_head;
+        head.store(new_head);
+        return true;
     }
 
 private:
     struct node {
-        node(const T& v): value(v), next(NULL) { }
-        T value;
+        node(): next(NULL) { }
+        T values[SYNC_QUEUE_BUFSIZE];
         node* next;
     };
 
-    std::atomic<node*> head;
-    std::atomic<node*> tail;
+    node* head_node;
+    node* tail_node;
+    std::atomic<T*> head;
+    std::atomic<T*> tail;
 };
 
 #endif // !SYNC_QUEUE_H