#include <vector>
#define PIECE_SIZE 65536
#define MESSAGE_BUILD_CHAIN_SIZE 40
#define MESSAGE_SEND_DATA_HEADER_SIZE 1
class ChainMessage {
public:
unsigned int num_pieces = 0;
: prev_(prev), next_(next), num_pieces(num_pieces)
{
}
~ChainMessage() = default;
};
class FilePiece {
public:
FilePiece() = default;
~FilePiece() = default;
};
class Peer {
public:
std::vector<simgrid::s4u::CommPtr> pending_recvs;
std::vector<simgrid::s4u::CommPtr> pending_sends;
unsigned long long received_bytes = 0;
unsigned int received_pieces = 0;
unsigned int total_pieces = 0;
~Peer() = default;
void joinChain()
{
ChainMessage* msg = static_cast<ChainMessage*>(me->get());
prev = msg->prev_;
next = msg->next_;
total_pieces = msg->num_pieces;
XBT_DEBUG(
"Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->getCname(),
prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
delete msg;
}
void forwardFile()
{
void* received;
while (not done) {
pending_recvs.push_back(comm);
if (idx != -1) {
comm = pending_recvs.at(idx);
XBT_DEBUG(
"Peer %s got a 'SEND_DATA' message", me->getCname());
pending_recvs.erase(pending_recvs.begin() + idx);
if (next != nullptr) {
XBT_DEBUG(
"Sending (asynchronously) from %s to %s", me->getCname(), next->getCname());
pending_sends.push_back(send);
} else
delete static_cast<FilePiece*>(received);
received_pieces++;
received_bytes += PIECE_SIZE;
XBT_DEBUG(
"%u pieces received, %llu bytes received", received_pieces, received_bytes);
if (received_pieces >= total_pieces) {
done = true;
}
}
}
}
};
class Broadcaster {
public:
std::vector<simgrid::s4u::MailboxPtr>
mailboxes;
unsigned int piece_count;
void buildChain()
{
auto cur = mailboxes.begin();
if (cur != mailboxes.end()) {
first = next;
do {
++cur;
prev = last;
if (cur != mailboxes.end())
next = *cur;
else
next = nullptr;
XBT_DEBUG(
"Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
current_mailbox->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
last = current_mailbox;
} while (cur != mailboxes.end());
}
}
void sendFile()
{
std::vector<simgrid::s4u::CommPtr> pending_sends;
for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
XBT_DEBUG(
"Sending (send) piece %u from %s into mailbox %s", current_piece,
simgrid::s4u::CommPtr comm = first->put_async(
new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
pending_sends.push_back(comm);
}
}
Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
{
for (int i = 1; i <= hostcount; i++) {
}
}
~Broadcaster() = default;
};
static void peer()
{
Peer* p = new Peer();
p->joinChain();
p->forwardFile();
XBT_INFO(
"### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
delete p;
}
static void broadcaster(int hostcount, unsigned int piece_count)
{
Broadcaster* bc = new Broadcaster(hostcount, piece_count);
bc->buildChain();
bc->sendFile();
delete bc;
}
{
e.loadPlatform(argv[1]);
e.run();
return 0;
}