+========================================================================
+========================================================================
+##### RESOLVED BUGS COME AFTER THIS ####################################
+========================================================================
+Il faut réviser l'utilisation du mutex entre le thread d'équilibrage
+et le thread de calcul. Il semble gardé beaucoup trop longtemps.
+
+Bon, une partie du problème est rectifiée par le commit
+48de954 Stop locking the mutex on data_receive.
+
+Pour le reste, je pense maintenant que ça ne gêne pas, au moins dans
+le simulateur. Pour faire bien, il faudrait plus séparer les deux
+threads d'équilibrage et de calcul, et faire en sorte que chacun garde
+un cache des données globales partagées. Il suffirait alors de
+synchroniser ces caches à chaque itération.
+
+Les données partagées sont essentiellement les données des voisins :
+load, to_send et debt.
+
========================================================================
Comment expliquer ces différences entre SG 3.5 et SG svn ?
#SIMGRID_INSTALL_DIR ?= $(PWD)/simgrid-stable
SIMGRID_INSTALL_DIR ?= $(PWD)/simgrid-dev
-OPTIM_FLAGS += -O3
+OPTIM_FLAGS += -pipe -O3
DEBUG_FLAGS += -g
#DEBUG_FLAGS += -pg
CHECK_FLAGS += -Wall -Wextra
#CFLAGS += -fgnu89-inline # workaround simgrid bug
CFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
-#CXXFLAGS += -std=c++0x
+CXXFLAGS += -std=c++0x
CXXFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
LDFLAGS += -L $(SIMGRID_INSTALL_DIR)/lib
LDLIBS := -lsimgrid
MAKEDEPEND.FLAGS = $(CPPFLAGS) -MM -MG -MF $@ $<
-MAKEDEPEND.CXX = $(CXX) $(MAKEDEPEND.FLAGS)
+MAKEDEPEND.C = $(CC) $(CFLAGS) $(MAKEDEPEND.FLAGS)
+MAKEDEPEND.CXX = $(CXX) $(CXXFLAGS) $(MAKEDEPEND.FLAGS)
SRC.loba := main.cpp \
communicator.cpp \
$(RM) $(XML_DEV_FILES)
$(RM) *~
+.%.d: %.c ; $(MAKEDEPEND.C)
+
.%.d: %.cpp ; $(MAKEDEPEND.CXX)
$(FLAVOURED_LOBA):
minus pending sends.
* With bookkeeping, it corresponds to the
"virtual load".
+
+process::received_load Real load received from neighbors.
+ Used when receiveing data messages, and then
+ added to real_load.
main.cpp le programme principal
+ message.h file de messages reçus
+ message.cpp
+
misc.h divers trucs inclassables
misc.cpp
+ msg_thread.h creation de threads SG/MSG
+ msg_thread.cpp
+
named_object_list.h gestion d'une table de constructeurs
avec des noms et des descriptions
simgrid_features.h macros pour détecter la version de SimGrid
- simple_async.cpp un simple programme de test
+ statistics.h pour calculer moyenne, variance, etc.
+
+ synchro.h mutex, condition, etc.
+
+ sync_queue.h lock-free synchronized queue
timer.h gestion de timer
+ tracing.h définitions liées au traçage
+
version.h gestion de la version du programme
version.cpp
extract.pl outil d'extraction des données à partir des
traces, pour tracer des courbes
+ new_loba.sh pour créer le squelette d'un nouvel algo
+ d'équiblibrage loba_*
+
setlocalversion calcule un numéro de version à partir du hash
du dernier commit (git)
* autres fichiers
.gitignore liste des fichiers ignorés par git
- valgrind_suppressions_3.5 liste de quelques suppressions pour valgrind
+ valgrind_suppressions liste de quelques suppressions pour valgrind
avec SimGrid 3.5
#include <algorithm>
-#include <tr1/functional>
+#include <functional>
#include <msg/msg.h>
#include <xbt/log.h>
communicator::communicator()
: host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
{
- using std::tr1::bind;
receiver_thread = new_msg_thread("receiver",
- bind(&communicator::receiver, this));
+ std::bind(&communicator::receiver, this));
receiver_thread->start();
}
XBT_DEBUG("send %s to %s", msg->to_string().c_str(), dest);
m_task_t task = MSG_task_create("message", 0.0, msg->get_size(), msg);
TRACE_msg_set_task_category(task,
- msg->get_type() == message::LOAD ?
+ msg->get_type() == message::DATA ?
TRACE_CAT_DATA : TRACE_CAT_CTRL);
return MSG_task_isend(task, dest);
}
#include <algorithm>
-#include <tr1/functional>
+#include <functional>
#include <numeric>
#include <iterator>
#include <sstream>
double cost_func::operator()(double amount) const
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
- using std::tr1::placeholders::_2;
+ using std::placeholders::_1;
+ using std::placeholders::_2;
return std::accumulate(++factors.begin(), factors.end(), factors.front(),
- bind(std::plus<double>(),
- bind(std::multiplies<double>(), amount, _1),
- _2));
+ std::bind(std::plus<double>(),
+ std::bind(std::multiplies<double>(),
+ amount, _1), _2));
}
std::string cost_func::to_string()
#include <algorithm>
#include <cstdlib>
-#include <tr1/functional>
+#include <functional>
#include <iomanip>
#include <numeric>
#include <sstream>
void deployment_generator::distribute_load()
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
+ using std::placeholders::_1;
if (!opt::auto_depl::random_distribution) {
set_load(0, opt::auto_depl::load);
double factor = opt::auto_depl::load /
std::accumulate(loads.begin(), loads.end(), 0.0);
std::transform(loads.begin(), loads.end(), loads.begin(),
- bind(std::multiplies<double>(), _1, factor));
+ std::bind(std::multiplies<double>(), _1, factor));
for (unsigned i = 0 ; i < hosts.size() ; ++i)
set_load(i, loads[i]);
}
#include <algorithm>
#include <cstring>
-#include <tr1/functional>
+#include <functional>
#include <stdexcept>
#include <xbt/log.h>
#include <xbt/sysdep.h>
void hostdata::create()
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
- using std::tr1::placeholders::_2;
+ using std::placeholders::_1;
+ using std::placeholders::_2;
int nhosts = MSG_get_host_number();
m_host_t* host_list = MSG_get_host_table();
// only sort hosts for automatically created deployment
if (opt::auto_depl::enabled)
std::sort(host_list, host_list + nhosts,
- bind(std::less<int>(), bind(strcmp,
- bind(MSG_host_get_name, _1),
- bind(MSG_host_get_name, _2)), 0));
+ std::bind(std::less<int>(),
+ std::bind(strcmp,
+ std::bind(MSG_host_get_name, _1),
+ std::bind(MSG_host_get_name, _2)), 0));
hosts.assign(host_list, host_list + nhosts);
xbt_free(host_list);
#include <cerrno>
+#include <csignal>
#include <cstring> // strchr
#include <iostream>
-#include <signal.h>
#include <stdexcept>
#include <msg/msg.h>
#include <xbt/log.h>
EXIT_FAILURE_INIT = 0x02, // failed to initialize simulator
EXIT_FAILURE_SIMU = 0x04, // simulation failed
EXIT_FAILURE_CLEAN = 0x08, // error at cleanup
- EXIT_FAILURE_OTHER = 0x10, // other error
+ EXIT_FAILURE_INTR = 0x10, // interrupted by user
+ EXIT_FAILURE_LOAD = 0x20, // lost load on exit
+ EXIT_FAILURE_OTHER = 0x40, // other error
};
// Cannot be globally initialized...
return result;
}
-static void check_for_lost_load()
+static bool check_for_lost_load()
{
+ bool res = true;
double total_init = process::get_total_load_init();
double total_exit = process::get_total_load_exit();
double lost = total_init - total_exit;
double lost_ratio = 100.0 * lost / total_init;
- if (lost_ratio < -opt::load_ratio_threshold)
+ if (lost_ratio < -opt::load_ratio_threshold) {
XBT_ERROR("Gained load at exit! %g (%g%%) <============",
-lost, -lost_ratio);
- else if (lost_ratio > opt::load_ratio_threshold)
+ res = false;
+ } else if (lost_ratio > opt::load_ratio_threshold) {
XBT_ERROR("Lost load at exit! %g (%g%%) <============",
lost, lost_ratio);
- else
+ res = false;
+ } else
XBT_VERB("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
double total_running = process::get_total_load_running();
double running_ratio = 100.0 * total_running / total_init;
- if (running_ratio < -opt::load_ratio_threshold)
+ if (running_ratio < -opt::load_ratio_threshold) {
XBT_ERROR("Negative running load at exit! %g (%g%%) <============",
total_running, running_ratio);
- else if (running_ratio > opt::load_ratio_threshold)
+ res = false;
+ } else if (running_ratio > opt::load_ratio_threshold) {
XBT_ERROR("Remaining running load at exit! %g (%g%%) <============",
total_running, running_ratio);
- else
+ res = false;
+ } else
XBT_VERB("Running load at exit looks good: %g (%g%%)",
total_running, running_ratio);
+ return res;
}
static void signal_handler(int /*sig*/)
XBT_CRITICAL(">>>>>>>>>>"
" caught CTRL-C: global exit requested "
"<<<<<<<<<<");
- opt::exit_request = true;
+ opt::exit_request = 1;
+ } else {
+ XBT_CRITICAL(">>>>>>>>>>"
+ " caught CTRL-C for the 2nd time: exit immediately "
+ "<<<<<<<<<<");
+ exit(EXIT_FAILURE_INTR);
}
}
action.sa_handler = signal_handler;
sigemptyset(&action.sa_mask);
action.sa_flags = SA_RESTART;
- if (sigaction (SIGINT, &action, NULL) == -1) {
+ if (sigaction(SIGINT, &action, NULL) == -1) {
std::cerr << "sigaction: " << strerror(errno) << "\n";
exit(EXIT_FAILURE_OTHER);
}
if (simulated_time >= 0.0) {
simulation_time.stop();
elapsed_time.stop();
- check_for_lost_load();
+ if (!check_for_lost_load())
+ exit_status |= EXIT_FAILURE_LOAD;
XBT_INFO(",----[ Results ]");
PR_STATS("Load", loads);
PR_STATS("Computation", comps);
#include "messages.h"
+message::message(message_type t, double a, double c)
+ : type(t), amount(a) , credit(c)
+{
+ // compute message size
+ // arbitrary: 8 for type, and 8 for each double
+ switch (type) {
+ case CTRL:
+ size = opt::bookkeeping ? 24 : 16; // type + amount + (credit)?
+ break;
+ case DATA:
+ size = 16 + opt::comm_cost(amount); // type + amount + data size
+ break;
+ default:
+ size = 8; // type
+ break;
+ }
+}
+
std::string message::to_string()
{
- static const char* str[] = { "INFO", "CREDIT", "LOAD",
- "CTRL_CLOSE", "DATA_CLOSE" };
+ static const char* str[DATA_CLOSE + 1] = { "CTRL", "DATA",
+ "CTRL_CLOSE", "DATA_CLOSE" };
std::ostringstream oss;
oss << str[type] << ": " << amount;
return oss.str();
}
-double message::get_size() const
-{
- // arbitrary: 8 for type, and 8 for amount
- double size = 16;
- if (type == LOAD)
- size += opt::comm_cost(amount);
- return size;
-}
-
void message_queue::push(m_task_t task)
{
- mutex.acquire();
- queue.push(task);
- cond.signal();
- mutex.release();
+ if (queue.push(task)) {
+ // list was empty, the push must be signaled
+ mutex.acquire();
+ cond.signal();
+ mutex.release();
+ }
}
bool message_queue::pop(message*& msg, m_host_t& from, double timeout)
{
- if (timeout != 0) {
- volatile double deadline =
- timeout > 0 ? MSG_get_clock() + timeout : 0.0;
+ m_task_t task;
+ if (!queue.try_pop(task)) {
+ if (timeout == 0.0)
+ return false;
+
mutex.acquire();
- while (queue.empty() && (!deadline || deadline > MSG_get_clock())) {
+ if (!queue.try_pop(task)) {
xbt_ex_t e;
XBT_DEBUG("waiting for a message to come");
TRY {
- if (deadline)
- cond.timedwait(mutex, deadline - MSG_get_clock());
+ if (timeout > 0)
+ cond.timedwait(mutex, timeout);
else
cond.wait(mutex);
}
+ TRY_CLEANUP {
+ mutex.release();
+ }
CATCH (e) {
if (e.category != timeout_error)
RETHROW;
xbt_ex_free(e);
+ return false; // got a timeout
}
+ bool pop_was_successful = queue.try_pop(task);
+ xbt_assert(pop_was_successful);
+ } else {
+ mutex.release();
}
- mutex.release();
}
-
- if (queue.empty())
- return false;
-
- m_task_t task = queue.front();
- queue.pop();
msg = static_cast<message*>(MSG_task_get_data(task));
from = MSG_task_get_source(task);
MSG_task_destroy(task);
#include <string>
#include <msg/msg.h>
#include "synchro.h"
+#include "sync_queue.h"
class message {
public:
- enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
+ enum message_type { CTRL, DATA, CTRL_CLOSE, DATA_CLOSE };
- message(message_type t, double a): type(t), amount(a) { }
+ message(message_type t, double a, double c = 0.0);
message_type get_type() const { return type; }
double get_amount() const { return amount; }
- double get_size() const;
+ double get_credit() const { return credit; }
+ double get_size() const { return size; }
std::string to_string();
private:
message_type type;
double amount;
+ double credit;
+ double size;
};
class message_queue {
private:
mutex_t mutex;
condition_t cond;
- std::queue<m_task_t> queue;
+ sync_queue<m_task_t> queue;
};
#endif // !MESSAGES_H
// Simulation parameters
int log_rate = 1;
- bool exit_request = false;
+ volatile std::sig_atomic_t exit_request = 0;
// Platform and deployment
std::string platform_file;
#ifndef OPTIONS_H
#define OPTIONS_H
+#include <csignal> // std::sig_atomic_t
#include <string>
#include "cost_func.h"
#include "named_object_list.h"
// Simulation parameters
extern int log_rate;
- extern bool exit_request;
+ extern volatile std::sig_atomic_t exit_request;
// Platform and deployment
extern std::string platform_file;
#include <algorithm>
-#include <tr1/functional>
+#include <functional>
#include <iterator>
#include <numeric>
#include <stdexcept>
expected_load = real_load;
total_load_running += real_load;
total_load_init += real_load;
+ received_load = 0.0;
ctrl_close_pending = data_close_pending = neigh.size();
close_received = false;
comp_iter = lb_iter = 0;
lb_thread = new_msg_thread("loba",
- std::tr1::bind(&process::load_balance_loop,
- this));
+ std::bind(&process::load_balance_loop, this));
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
oss << ESSE(neigh.size()) << ": ";
std::transform(neigh.begin(), neigh.end() - 1,
std::ostream_iterator<const char*>(oss, ", "),
- std::tr1::mem_fn(&neighbor::get_name));
+ std::mem_fn(&neighbor::get_name));
oss << neigh.back().get_name();
}
XBT_LOG(logp, "Got %s.", oss.str().c_str());
{
delete lb_thread;
total_load_exit += real_load;
+ xbt_assert(received_load == 0.0,
+ "received_load is %g, but should be 0.0 !", received_load);
if (opt::log_rate < 0)
return;
XBT_INFO("Final load after %d:%d iterations: %g",
void process::load_balance_loop()
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
+ using std::placeholders::_1;
double next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
while (still_running()) {
// send
comm.ctrl_flush(false);
std::for_each(neigh.begin(), neigh.end(),
- bind(&process::ctrl_send, this, _1));
+ std::bind(&process::ctrl_send, this, _1));
prev_load_broadcast = expected_load;
mutex.release();
XBT_DEBUG("send CTRL_CLOSE to %zu neighbor%s",
neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
- bind(&process::ctrl_close, this, _1));
+ std::bind(&process::ctrl_close, this, _1));
while (ctrl_close_pending) {
comm.ctrl_flush(false);
- XBT_DEBUG("waiting for %d CTRL CLOSE", ctrl_close_pending);
+ XBT_DEBUG("waiting for %d CTRL_CLOSE", ctrl_close_pending);
ctrl_receive(-1.0);
}
comm.ctrl_flush(true);
void process::compute_loop()
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
+ using std::placeholders::_1;
double next_iter_after_date = MSG_get_clock() + opt::min_comp_iter_duration;
while (still_running()) {
- // receive
- mutex.acquire();
- if (real_load > 0.0)
- data_receive(0.0);
- else
- data_receive(opt::min_comp_iter_duration);
- mutex.release();
+ // receive (do not block if there is something to compute)
+ data_receive(real_load > 0.0 ? 0.0 : opt::min_comp_iter_duration);
// send
comm.data_flush(false);
mutex.acquire();
+ real_load += received_load;
+ received_load = 0.0;
std::for_each(neigh.begin(), neigh.end(),
- bind(&process::data_send, this, _1));
+ std::bind(&process::data_send, this, _1));
mutex.release();
if (real_load == 0.0)
}
XBT_VERB("Going to finalize for %s...", __func__);
- // last send, for not losing load scheduled to be sent
- std::for_each(neigh.begin(), neigh.end(),
- bind(&process::data_send, this, _1));
finalizing = true;
- total_load_running -= real_load;
XBT_DEBUG("send DATA_CLOSE to %zu neighbor%s",
neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
- bind(&process::data_close, this, _1));
+ std::bind(&process::data_close, this, _1));
while (data_close_pending) {
comm.data_flush(false);
- XBT_DEBUG("waiting for %d DATA CLOSE", data_close_pending);
+ XBT_DEBUG("waiting for %d DATA_CLOSE", data_close_pending);
data_receive(-1.0);
}
+ real_load += received_load;
+ received_load = 0.0;
+ total_load_running -= real_load;
comm.data_flush(true);
}
double process::get_sum_of_to_send() const
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
- using std::tr1::placeholders::_2;
+ using std::placeholders::_1;
+ using std::placeholders::_2;
return std::accumulate(neigh.begin(), neigh.end(), 0.0,
- bind(std::plus<double>(),
- _1, bind(&neighbor::get_to_send, _2)));
+ std::bind(std::plus<double>(), _1,
+ std::bind(&neighbor::get_to_send, _2)));
}
void process::load_balance()
void process::ctrl_send(neighbor& nb)
{
double info_to_send = expected_load;
- if (info_to_send != prev_load_broadcast) {
- message* msg = new message(message::INFO, info_to_send);
- add_ctrl_send_mesg(msg->get_size());
- comm.ctrl_send(nb.get_ctrl_mbox(), msg);
- }
- if (opt::bookkeeping) {
- double debt_to_send = nb.get_to_send();
+ double debt_to_send;
+ if (opt::bookkeeping) { // bookkeeping
+ debt_to_send = nb.get_to_send();
if (debt_to_send > 0.0) {
nb.set_to_send(0.0);
nb.set_debt(nb.get_debt() + debt_to_send);
- message* msg = new message(message::CREDIT, debt_to_send);
- add_ctrl_send_mesg(msg->get_size());
- comm.ctrl_send(nb.get_ctrl_mbox(), msg);
}
+ } else { // !bookkeeping
+ debt_to_send = 0.0;
+ }
+ if (info_to_send != prev_load_broadcast || debt_to_send > 0.0) {
+ message* msg = new message(message::CTRL, info_to_send, debt_to_send);
+ add_ctrl_send_mesg(msg->get_size());
+ comm.ctrl_send(nb.get_ctrl_mbox(), msg);
}
}
amount = std::min(load_to_send, opt::max_transfer_amount);
else
amount = load_to_send;
- message* msg = new message(message::LOAD, amount);
+ message* msg = new message(message::DATA, amount);
add_data_send_mesg(msg->get_size());
comm.data_send(nb.get_data_mbox(), msg);
load_to_send -= amount;
void process::handle_message(message* msg, m_host_t from)
{
switch (msg->get_type()) {
- case message::INFO: {
+ case message::CTRL: {
neighbor* n = rev_neigh[from];
n->set_load(msg->get_amount() + n->get_to_send());
+ expected_load += msg->get_credit(); // may be 0.0 if !opt::bookkeeping
break;
}
- case message::CREDIT:
- expected_load += msg->get_amount();
- break;
- case message::LOAD: {
+ case message::DATA: {
double ld = msg->get_amount();
- real_load += ld;
- if (finalizing)
- total_load_running -= ld;
+ received_load += ld;
break;
}
case message::CTRL_CLOSE:
#define print_loads_generic(vec, verbose, logp, cat) \
if (_XBT_LOG_ISENABLEDV((*cat), logp)) { \
- using std::tr1::bind; \
- using std::tr1::placeholders::_1; \
+ using std::placeholders::_1; \
XBT_XCLOG(cat, logp, "My load: %g (real); %g (expected). " \
"Neighbor loads:", real_load, expected_load); \
std::for_each(vec.begin(), vec.end(), \
- bind(&neighbor::print, _1, verbose, logp, cat)); \
+ std::bind(&neighbor::print, _1, verbose, logp, cat)); \
} else ((void)0)
void process::print_loads(bool verbose,
//#undef USE_UNORDERED_MAP
#include <algorithm>
-#include <tr1/functional>
+#include <functional>
#ifdef USE_UNORDERED_MAP
-# include <tr1/unordered_map>
-# define MAP_TEMPLATE std::tr1::unordered_map
+# include <unordered_map>
+# define MAP_TEMPLATE std::unordered_map
#else
# include <map>
# define MAP_TEMPLATE std::map
// a same information messages
double real_load; // current load
double expected_load; // expected load in bookkeeping mode
+ double received_load; // load received from neighbors
mutex_t mutex; // synchronization between threads
condition_t cond;
template <typename Compare>
void process::pneigh_sort_by_load(const Compare& comp)
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
- using std::tr1::placeholders::_2;
+ using std::placeholders::_1;
+ using std::placeholders::_2;
std::sort(pneigh.begin(), pneigh.end(),
- bind(comp,
- bind(&neighbor::get_load, _1),
- bind(&neighbor::get_load, _2)));
+ std::bind(comp,
+ std::bind(&neighbor::get_load, _1),
+ std::bind(&neighbor::get_load, _2)));
}
#endif // !PROCESS_H
--- /dev/null
+#ifndef SYNC_QUEUE_H
+#define SYNC_QUEUE_H
+
+#if __GNUC__ == 4 && __GNUC_MINOR__ == 4
+# include <cstdatomic> // <atomic> is named <cstdatomic> in gcc 4.4
+
+template<typename _Tp> // fix missing definition in gcc 4.4
+void
+atomic<_Tp*>::store(_Tp* __v, memory_order __m) volatile
+{ atomic_address::store(__v, __m); }
+
+#else
+# include <atomic>
+#endif
+
+#define SYNC_QUEUE_BUFSIZE 16
+
+template <typename T>
+class sync_queue {
+public:
+ sync_queue()
+ {
+ head_node = tail_node = new node();
+ head.store(head_node->values);
+ tail.store(tail_node->values);
+ }
+
+ ~sync_queue()
+ {
+ node* n = head_node;
+ while (n != NULL) {
+ node* prev = n;
+ n = n->next;
+ delete prev;
+ }
+ }
+
+ bool empty() const
+ {
+ return head.load() == tail.load();
+ }
+
+ // size() is not not thread-safe
+ size_t size() const
+ {
+ size_t count = 0;
+ if (head_node == tail_node) {
+ count = tail.load() - head.load();
+ } else {
+ count =
+ (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load();
+ for (node* n = head_node->next; n != tail_node; n = n->next)
+ count += SYNC_QUEUE_BUFSIZE;
+ count += tail.load() - tail_node->values;
+ }
+ return count;
+ }
+
+ bool push(const T& val)
+ {
+ T* old_tail = tail.load();
+ T* new_tail;
+ if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+ tail_node->next = new node();
+ tail_node = tail_node->next;
+ new_tail = tail_node->values;
+ } else {
+ new_tail = old_tail + 1;
+ }
+ *new_tail = val;
+ tail.store(new_tail);
+ return (old_tail == head.load());
+ }
+
+ bool try_pop(T& res)
+ {
+ T* old_head = head.load();
+ if (old_head == tail.load()) // empty?
+ return false;
+
+ T* new_head;
+ if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+ node* old_head_node = head_node;
+ head_node = head_node->next;
+ delete old_head_node;
+ new_head = head_node->values;
+ } else {
+ new_head = old_head + 1;
+ }
+ res = *new_head;
+ head.store(new_head);
+ return true;
+ }
+
+private:
+ struct node {
+ node(): next(NULL) { }
+ T values[SYNC_QUEUE_BUFSIZE];
+ node* next;
+ };
+
+ node* head_node;
+ node* tail_node;
+ std::atomic<T*> head;
+ std::atomic<T*> tail;
+};
+
+#endif // !SYNC_QUEUE_H
+
+// Local variables:
+// mode: c++
+// End: