]> AND Private Git Repository - loba.git/blobdiff - sync_queue.h
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
xbt_cond_timedwait: mutex is not held on timeout.
[loba.git] / sync_queue.h
index 888854871e408697737566eb3d7d16ce04e5de43..fc35339af8c5e2007ab7203a692ab905c44f5d97 100644 (file)
@@ -1,25 +1,23 @@
 #ifndef SYNC_QUEUE_H
 #define SYNC_QUEUE_H
 
-#if __GNUC__ == 4 && __GNUC_MINOR__ == 4
-#  include <cstdatomic>         // <atomic> is named <cstdatomic> in gcc 4.4
-#else
-#  include <atomic>
-#endif
+#include "atomic_compat.h"
+
+#define SYNC_QUEUE_BUFSIZE 16
 
 template <typename T>
 class sync_queue {
 public:
     sync_queue()
     {
-        node* n = new node(NULL, NULL);
-        std::atomic_store(&head, n); // head.store(n);
-        std::atomic_store(&tail, n); // tail.store(n);
+        head_node = tail_node = new node();
+        head.store(head_node->values);
+        tail.store(tail_node->values);
     }
 
     ~sync_queue()
     {
-        node* n = head.load();
+        node* n = head_node;
         while (n != NULL) {
             node* prev = n;
             n = n->next;
@@ -36,43 +34,65 @@ public:
     size_t size() const
     {
         size_t count = 0;
-        for (node* n = head.load()->next; n != NULL; n = n->next)
-            ++count;
+        if (head_node == tail_node) {
+            count = tail.load() - head.load();
+        } else {
+            count =
+                (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load();
+            for (node* n = head_node->next; n != tail_node; n = n->next)
+                count += SYNC_QUEUE_BUFSIZE;
+            count += tail.load() - tail_node->values;
+        }
         return count;
     }
 
     bool push(const T& val)
     {
-        node* old_tail = tail.load();
-        node* n = new node(val, NULL);
-        old_tail->next = n;
-        std::atomic_store(&tail, n); // tail.store(n);
+        T* old_tail = tail.load();
+        T* new_tail;
+        if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+            tail_node->next = new node();
+            tail_node = tail_node->next;
+            new_tail = tail_node->values;
+        } else {
+            new_tail = old_tail + 1;
+        }
+        *new_tail = val;
+        tail.store(new_tail);
         return (old_tail == head.load());
     }
 
     bool try_pop(T& res)
     {
-        node* old_head = head.load();
-        if (old_head == tail.load()) // empty?
+        T* old_head = head.load();
+        if (old_head == tail.load()) // empty?
             return false;
+
+        T* new_head;
+        if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+            node* old_head_node = head_node;
+            head_node = head_node->next;
+            delete old_head_node;
+            new_head = head_node->values;
         } else {
-            node* new_head = old_head->next;
-            std::atomic_store(&head, new_head); // head.store(new_head);
-            delete old_head;
-            res = new_head->value;
-            return true;
+            new_head = old_head + 1;
         }
+        res = *new_head;
+        head.store(new_head);
+        return true;
     }
 
 private:
     struct node {
-        node(const T& v, node* n): value(v), next(n) { }
-        T value;
+        node(): next(NULL) { }
+        T values[SYNC_QUEUE_BUFSIZE];
         node* next;
     };
 
-    std::atomic<node*> head;
-    std::atomic<node*> tail;
+    node* head_node;
+    node* tail_node;
+    std::atomic<T*> head;
+    std::atomic<T*> tail;
 };
 
 #endif // !SYNC_QUEUE_H