1 -- A SimGrid Lua implementation of the Kademlia protocol.
7 RANDOM_LOOKUP_INTERVAL = 100,
12 FIND_NODE_TIMEOUT = 10,
13 FIND_NODE_GLOBAL_TIMEOUT = 50,
16 JOIN_BUCKETS_QUERIES = 5
20 require("routing_table")
27 find_node_succedded = 0,
35 if #args ~= 2 and #args ~= 3 then
36 simgrid.info("Wrong argument count: " .. #args)
39 data.id = tonumber(args[1])
40 routing_table_init(data.id)
41 data.mailbox = tostring(data.id)
43 data.deadline = tonumber(args[3])
44 simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
45 if join_network(tonumber(args[2])) then
48 simgrid.info("Couldn't join the network")
51 data.deadline = tonumber(args[2])
52 routing_table_update(data.id)
53 data.comm = simgrid.task.irecv(data.mailbox)
56 simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
57 simgrid.process.sleep(10000)
60 local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
61 local now = simgrid.get_clock()
62 while now < data.deadline do
63 task,err = data.comm:test()
66 data.comm = simgrid.task.irecv(data.mailbox)
68 data.comm = simgrid.task.irecv(data.mailbox)
70 if now >= next_lookup_time then
72 next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
73 now = simgrid.get_clock()
75 simgrid.process.sleep(1)
76 now = simgrid.get_clock()
81 function random_lookup()
84 function join_network(id_known)
85 local answer_got = false
86 local time_begin = simgrid.get_clock()
88 simgrid.debug("Joining the network knowing " .. id_known)
90 routing_table_update(data.id)
91 routing_table_update(id_known)
93 -- Send a FIND_NODE to the node we know
94 send_find_node(id_known,data.id)
95 -- Wait for the answer
98 data.comm = simgrid.task.irecv(data.mailbox)
101 task,err = data.comm:test()
103 if task.type == "FIND_NODE_ANSWER" then
105 local answer = task.answer
106 -- Add the nodes we received to our routing table
107 for i,v in pairs(answer.nodes) do
108 routing_table_update(v.id)
113 data.comm = simgrid.task.irecv(data.mailbox)
115 data.comm = simgrid.task.irecv(data.mailbox)
117 simgrid.process.sleep(1)
119 until answer_got or trials >= common.MAX_JOIN_TRIALS
120 -- Second step: Send a FIND_NODE in a node in each bucket
121 local bucket_id = find_bucket(id_known).id
123 while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
124 if bucket_id - i > 0 then
125 local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
126 find_node(id_in_bucket,false)
128 if bucket_id + i <= common.IDENTIFIER_SIZE then
129 local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
130 find_node(id_in_bucket,false)
136 -- Send a request to find a node in the node's routing table.
137 function find_node(destination, counts)
138 local queries, answers
139 local total_answers = 0
141 local nodes_added = 0
142 local destination_found = false
145 local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
146 -- Build a list of the closest nodes we already know.
147 local node_list = find_closest(destination)
149 simgrid.debug("Doing a FIND_NODE on " .. destination)
152 queries = send_find_node_to_best(node_list)
154 timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
157 task, err = data.comm:test()
159 if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
160 routing_table_update(task.sender_id)
161 for i,v in pairs(task.answer.nodes) do
162 routing_table_update(v.id)
164 nodes_added = merge_answer(node_list,task.answer)
168 data.comm = simgrid.task.irecv(data.mailbox)
170 data.comm = simgrid.task.irecv(data.mailbox)
172 simgrid.process.sleep(1)
175 until answers >= queries or simgrid.get_clock() >= timeout
176 if (#node_list.nodes > 0) then
177 destination_found = (node_list.nodes[1].distance == 0)
179 destination_found = false
181 until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
182 if destination_found then
183 simgrid.debug("Find node on " .. destination .. " succedded")
185 data.find_node_succedded = data.find_node_succedded + 1
188 simgrid.debug("Find node on " .. destination .. " failed")
190 data.find_node_failed = data.find_node_failed + 1
194 return destination_found
196 -- Sends a "FIND_NODE" request (task) to a node we know.
197 function send_find_node(id, destination)
198 simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
200 local task = simgrid.task.new("",0, common.COMM_SIZE)
201 task.type = "FIND_NODE"
202 task.sender_id = data.id
203 task.destination = destination
205 task:dsend(tostring(id))
208 -- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
210 function send_find_node_to_best(node_list)
211 destination = node_list.destination
213 while i <= common.ALPHA and i < #node_list.nodes do
214 if data.id ~= node_list.nodes[i].id then
215 send_find_node(node_list.nodes[i].id,destination)
221 -- Handles an incomming task
222 function handle_task(task)
223 routing_table_update(task.sender_id)
224 if task.type == "FIND_NODE" then
225 handle_find_node(task)
226 elseif task.type == "PING" then
230 function handle_find_node(task)
231 simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
232 local answer = find_closest(task.destination)
233 local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
234 task_answer.type = "FIND_NODE_ANSWER"
235 task_answer.sender_id = data.id
236 task_answer.destination = task.destination
237 task_answer.answer = answer
238 task_answer:dsend(tostring(task.sender_id))
240 function handle_ping(task)
241 simgrid.info("Received a PING from " .. task.sender_id)
242 local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
243 task_answer.type = "PING_ANSWER"
244 task_answer.sender_id = data.id
245 task_answer.destination = task.destination
246 task_answer:dsend(tostring(task.sender_id))
248 function merge_answer(m1, m2)
250 for i,v in pairs(m2.nodes) do
251 table.insert(m1.nodes,v)
252 nb_added = nb_added + 1
256 simgrid.platform(arg[1] or "../../msg/msg_platform.xml")
257 simgrid.application(arg[2] or "kademlia.xml")