#include "CSE.h" #include "declarative.h" #include <algorithm> bool prefix(const char *pre, const char *str) { return strncmp(pre, str, strlen(pre)) == 0; } void CSE::initialize() { this->Uri = getId(); // this is the omnet id which is given when creating the module in the NED file (sequential numbering ) EV << "URI II " << Uri << "\n"; this->NotificationDepth = par("notification_depth"); this->multicastAlpha = par("alpha"); this->multicastBeta = par("beta"); this->multicastGamma = par("gamma"); this->multicastDelta = par("delta"); this->queryBufferTTL = par("queryBufferTTL"); this->maxHops = par("maxHops"); number_of_packets = 0; totalpacketsSignal = registerSignal("packet_size"); delay = par("delayTime"); number_of_hops= 0; latency= registerSignal("hop_count"); number_of_messages= 0; flood= registerSignal("flood"); success= 0; success_rate= registerSignal("success"); } /* * routeQuery * Used to perform semantic routing * Function returns the list of CSEs to redirect query to. * It returns the list of URIs of the same relationship type, * e.g. Customer, Sibling, Peer, Provider. * * */ std::vector<URI> CSE::routeQuery(discoveryMessage *msg) { std::string feature_type = msg->getFeature_type(); std::vector<URI> URI_Found; auto it = this->SemanticRoutingTable.find(feature_type); if (it == this->SemanticRoutingTable.end()) { EV << "feature Type not exist" << "\n"; return URI_Found; } if (it->second.CSECustomer.size() > 0) { for (auto cit = it->second.CSECustomer.begin(); cit != it->second.CSECustomer.end(); cit++) { URI_Found.push_back(cit->first); } return URI_Found; } if (it->second.CSESibling.size() > 0) { for (auto sit = it->second.CSESibling.begin(); sit != it->second.CSESibling.end(); sit++) { URI_Found.push_back(sit->first); } return URI_Found; } if (it->second.CSEPeer.size() > 0) { for (auto sit = it->second.CSEPeer.begin(); sit != it->second.CSEPeer.end(); sit++) { URI_Found.push_back(sit->first); } return URI_Found; } if (it->second.CSEProvider.size() > 0) { for (auto pit = it->second.CSEProvider.begin(); pit != it->second.CSEProvider.end(); pit++) { URI_Found.push_back(pit->first); } return URI_Found; } return URI_Found; } /* * processQuery is used to route query if local DB lookup failed. * It tries to perform semantic routing, and if no records satisfying query were found, * if uses a so-called fallback routing to multicast query to the best match neighbors. */ void CSE::processQuery(discoveryMessage *msg) { EV << "The Message is a query \n"; EV << "DB Lookup not Successful" << "\n"; if (msg->getHopCount() <= 0) { bubble("TTL: expired"); //Respond to the URI_init that the discovery ends // TODO: DBLookup part to be added here msg->setOp_code(RESPONSE); // TODO: set the message op_codes according to result from DBLookup //You extract from the top of the list the gate that has to be used EV << "Hop count is 0 so we generate a self response message \n"; number_of_messages++; generateResponseMessage(msg, ResultCode::NOT_FOUND); return; } // decrease the hop count EV << "we are in the else : hop count is currently " << msg->getHopCount() << "\n"; msg->setHopCount(msg->getHopCount() - 1); number_of_hops++; emit(latency, number_of_hops); // TODO: signal for hop count EV << "New HopCount=" << msg->getHopCount() << "\n"; auto res = routeQuery(msg); if (res.size() > 0) { for (auto it : res) { auto gateit = this->Gates[it]; int gateindex = gateit.second; std::string gateName = gateit.first + "$o"; bubble("Semantic record found"); number_of_messages++; sendDelayed(msg->dup(), delay, gateName.c_str(), gateindex); } return; } fallbackRouteQuery(msg); } /* * fallbackRouteQuery is used when semantic routing fails * (i.e. semantic routing table lookup returns no results) * It multicasts query with coefficients. * It routes query in valley-free manner. */ void CSE::fallbackRouteQuery(discoveryMessage *msg) { int D = msg->getDirection(); bool successful = false; /* * We need to send response only if all of the broadcasts have failed * * Thus, we are performing logical AND between all invocations of broadcast * * If all of them fail - we will send response * */ switch (D) { case DOWN: { successful = multicast("customer", msg, this->multicastAlpha); number_of_messages++; successful = !successful ? multicast("sibling", msg, this->multicastGamma) : true; break; } case SIDE_SIBLING: { successful = multicast("sibling", msg, this->multicastGamma); number_of_messages++; successful &= multicast("customer", msg, this->multicastAlpha); break; } case SIDE_PEER: { number_of_messages++; break; } case UP: { successful = multicast("provider", msg, this->multicastBeta); number_of_messages++; successful = !successful ? multicast("sibling", msg, this->multicastGamma) : true; number_of_messages++; successful &= multicast("customer", msg, this->multicastDelta); number_of_messages++; break; } default: break; } if (!successful) { bubble("No result"); number_of_messages++; generateResponseMessage(msg, ResultCode::NOT_FOUND); } } /* * seenQuery is used to check whether the query being processed was previously processed. * It checks the local query buffer for the query ID. * Also, performs cleanup of stale buffer records. */ bool CSE::seenQuery(discoveryMessage *msg) { std::map<queryKey, int64_t> newProcessed(this->processedQueries); for (auto record : newProcessed) { if (record.second < simTime().inUnit(SimTimeUnit::SIMTIME_S)) { this->processedQueries.erase(record.first); } } queryKey key; key.second = msg->getQueryID(); key.first = msg->getURI_init(); if (this->processedQueries.find(key) != this->processedQueries.end()) { return true; } return false; } /* * handleQuery is used to handle message of type QUERY. * * It memorizes distinct queries and omits duplicate ones. */ void CSE::handleQuery(discoveryMessage *msg) { auto cse = msg->getURI_route(); std::string inputGate = msg->getArrivalGate()->getBaseName(); this->Gates[cse] = std::make_pair(inputGate, msg->getArrivalGate()->getIndex()); if (seenQuery(msg)) { bubble("Dropping seen query"); return; } int64_t ttl = SimTime(this->queryBufferTTL).inUnit(SimTimeUnit::SIMTIME_S); ttl = ttl + msg->getArrivalTime().inUnit(SimTimeUnit::SIMTIME_S); queryKey key; key.first = msg->getURI_init(); key.second = msg->getQueryID(); this->processedQueries[key] = ttl; auto res = DBLookup(msg); // If we find the index "NOT_FOUND" in the map, it means that // the feature is not present in the database if (res == NOT_FOUND) { processQuery(msg); return; } DBresult dbres = std::map<int, int>(); dbres[res] = 0; EV << "DB Lookup Successful" << "\n"; msg->setDbResult(dbres); number_of_messages++; generateResponseMessage(msg); } /* * handleDiscoveryMessage is used to handle `discoveryMessage`. */ void CSE::handleDiscoveryMessage(cMessage *msg) { EV << "entering the CSE part " << "\n"; // if the message comes from another resource that an AE discoveryMessage *discoveryMsg = check_and_cast<discoveryMessage*>(msg); EV << "The Message is of type : " << discoveryMsg->getOp_code() << "\n"; if (msg->isSelfMessage()) { //the discovery message comes from the AE and should be forwarded EV << "It is a self Message " << "\n"; if (discoveryMsg->getOp_code() == QUERY) { processQuery(discoveryMsg); delete discoveryMsg; return; } } EV << "It is not a self Message "; if (discoveryMsg->getOp_code() == QUERY) { EV << "of type query\n"; std::vector<cGate*> tempGateVector; // You put on top of the list the name of the gate to be used in the return path (getOtherHalf) tempGateVector = discoveryMsg->getGateVector(); tempGateVector.push_back(msg->getArrivalGate()->getOtherHalf()); discoveryMsg->setGateVector(tempGateVector); EV << "A new gate is added = " << tempGateVector.back()->getFullName() << "\n"; } else { EV << "of type response so no new gate added\n"; } // end if self-message // switch on 2 possible opcodes between CSEs : QUERY or RESPONSE int op_code = discoveryMsg->getOp_code(); EV << "Switch OPCODE \n"; switch (op_code) { case NOTIFY: handleNotify(discoveryMsg); break; case QUERY: handleQuery(discoveryMsg); break; case RESPONSE: { returnResponse(discoveryMsg); break; } } delete discoveryMsg; } /* * returnResponse is used to return response in predefined manner, i.e. * unfolding path step by step and sending messages back. */ void CSE::returnResponse(discoveryMessage *msg) { EV << "The Message is a response \n"; int i = msg->getGateVector().size(); if (i <= 0) { EV << "We are in the last gate Message Delivered" << "\n"; return; } EV << "Size of Gate vector is " << i << "\n"; std::vector<cGate*> tempGateVector; // You put on top of the list the name of the gate to be used in the return path (getOtherHalf) tempGateVector = msg->getGateVector(); const char *returnGate = tempGateVector.back()->getName(); int returnIndex = tempGateVector.back()->getIndex(); tempGateVector.pop_back(); msg->setGateVector(tempGateVector); EV << "gate removed = " << returnGate << "of index " << returnIndex << "\n"; i = msg->getGateVector().size(); EV << "New Size of Gate vector is " << i << "\n"; EV << "<Module Name" << msg->getName() << "gate name" << returnGate << "\n" << "gateIndex" << returnIndex << "\n"; sendDelayed(msg->dup(), delay, returnGate, returnIndex); } /* * handleAEMessage is used to process message from Application Entities (AEs) * Messages include registration, cancellation and queries. */ void CSE::handleAEMessage(cMessage *msg) { EV << "entering the AE part of the IF " << "\n"; AEMessage *aeMsg = check_and_cast<AEMessage*>(msg); // Create message object and set source and destination field. int op_code = aeMsg->getOp_code(); // op_code contains the type of message switch (op_code) { case REGISTRATION: { handleAERegistration(aeMsg); break; } case CANCELLATION: { handleAECancellation(aeMsg); break; } case QUERY: { // if it is a query msg we create a discovery msg and we start ASDR number_of_messages++; generateDiscoveryMessage(aeMsg); break; } default: break; } delete aeMsg; } /* * saveAEData is used to save AE data into CSE local database */ void CSE::saveAEData(std::string feature_type, URI uri, int data) { // we create an internal map std::map<URI, int> internalMap; // we create an Iterator on the database std::map<std::string, std::map<URI, int>>::iterator it; // we search for the feature_type in the database it = database.find(feature_type); // if we don't find it if (it == database.end()) { // putting data in the internal map as a new entry internalMap[uri] = data; } // if we find the feature_type else { internalMap = database[feature_type]; // we put the internal map inside the DataBase map next to the feature_type internalMap[uri] = data; } database[feature_type] = internalMap; EV << "feature type added in Database" << feature_type << "\n"; } /* * handleAERegistration is used to perform Application Entity (AE) registration at parent CSE. * Also, it invokes CSE neighbors notification as a result of new entity registration. */ void CSE::handleAERegistration(AEMessage *msg) { // we extract the feature_type; URI_route; data from the AEmessage std::string feature_type = msg->getFeature_type(); int URI_route = msg->getURI(); int data = msg->getData(); bubble(feature_type.c_str()); registerAE(feature_type, URI_route); saveAEData(feature_type, URI_route, data); notifyCSE(feature_type, 1); } /* * handleAECancellation is used to perform Application Entity (AE) deregistration at parent CSE. * Also, it invokes CSE neighbors notification as a result of new entity deregistration (cancellation). */ void CSE::handleAECancellation(AEMessage *msg) { std::string feature_type = msg->getFeature_type(); int URI_route = msg->getURI(); deregisterAE(feature_type, URI_route); notifyCSE(feature_type, -1); } /* * registerAE is used to update semantic routing table to accustom for AE registration. */ void CSE::registerAE(std::string feature_type, URI uri) { auto entry = getOrCreateRoutingEntry(feature_type); entry.database.insert(std::pair<URI, int>(uri, 1)); this->SemanticRoutingTable[feature_type] = entry; } /* * deregisterAE is used to update semantic routing table to accustom for AE cancellation. */ void CSE::deregisterAE(std::string feature_type, URI uri) { auto entry = mustGetRoutingEntry(feature_type); auto it = entry.database.find(uri); if (it == entry.database.end()) { EV_FATAL << "Expected routing entry to exist\n"; } if (it->second < 1) { EV_FATAL << "Expected to have at least one AE registered\n"; } entry.database.erase(it); this->SemanticRoutingTable[feature_type] = entry; } /* * handleMessage is and entry point for message handling. */ void CSE::handleMessage(cMessage *msg) { // SWITCH ON THE 5 operational codes number_of_packets++; // assigning the values to the signal emit(totalpacketsSignal, number_of_packets); EV << "URI " << msg->getSenderModuleId() << "\n"; // if the message comes from the AE if (prefix("AE", msg->getSenderModule()->getName())) { handleAEMessage(msg); } else { handleDiscoveryMessage(msg); emit(flood, number_of_messages); } } // end of handle message /* * generateResponseMessage is used to generate query response message (scheduling self-message) * to be redirected to the source of the query */ void CSE::generateResponseMessage(discoveryMessage *msg, ResultCode result) { EV << "inside generateResponseMessage Procedure" << "\n"; auto responseMsg = generateMessage(RESPONSE); //These data may change during the routing of the query // we set the direction to NODIR responseMsg->setDirection(NODIR); responseMsg->setFeature_type(msg->getFeature_type()); responseMsg->setGateVector(msg->getGateVector()); responseMsg->setReturnCode(result); responseMsg->setURI_init(this->Uri); cancelEvent(responseMsg); scheduleAt(simTime(), responseMsg); } // this method forward the initial query to CSE // void CSE::parseRouting(AEMessage *msg) { // this function is transforming a query message to a discovery message void CSE::generateDiscoveryMessage(AEMessage *msg) { // this function transforms a query message to a discovery message // these data should not change during the routing between CSEs // TODO lets consider if the URI parameter is useful ?? // we created a discovery message discoveryMessage *queryMsg = new discoveryMessage("QUERY"); // we extract the URI from the AE URI_init of the message queryMsg->setURI_init(msg->getURI()); // we extract the msg feature_type from AEmessage and we set it in the discovery Message queryMsg->setFeature_type(msg->getFeature_type()); // we set op_code to QUERY queryMsg->setOp_code(QUERY); queryMsg->setQueryID(msg->getQueryID()); //These data may change during the routing of the query // set the hop count queryMsg->setHopCount(msg->getMaxHop()); // we set the direction UP queryMsg->setDirection(UP); // create a omnet vector of type cGate* named gateVector std::vector<cGate*> gateVector = queryMsg->getGateVector(); //You update the discoveryMessage with this object queryMsg->setGateVector(gateVector); // You put on top of the list the name of the gate to be used in the return path (getOtherHalf) gateVector.push_back(msg->getArrivalGate()->getOtherHalf()); EV << "back cse event7 " << gateVector.back()->getFullName(); EV << "front " << gateVector.front()->getFullName(); // We update the query msg with this vector queryMsg->setGateVector(gateVector); EV << "back cse event7 " << queryMsg->getGateVector().back()->getFullName(); EV << "front " << queryMsg->getGateVector().front()->getFullName(); // we schedule this query message to be sent asap in the simulation schedule number_of_messages++; scheduleAt(simTime(), queryMsg); // delete the AE message } /* * multicast is used to send messages in a multicast manner through the specified gate, * optionally restricting maximal number of messages. */ bool CSE::multicast(std::string gateName, discoveryMessage *discoveryMsg, int maxMessages) { auto dir = gateToDirection[gateName]; std::string outGate = gateName + "$o"; // checking the size of gate int t = gateSize(gateName.c_str()); //if it is greater than zero means if we have customer if (t <= 0) { return false; } // it detects the size of the customer gates int Uri = gate(outGate.c_str(), 0)->getId(); EV << "uri of destination " << Uri << "\n"; int vectSize = gate(outGate.c_str(), 0)->getVectorSize(); // it register in the scheduler map the UR of the CSE and the parameters of the gate // we will forward through the vectSize of customer gate which have all the customer int sent = 0; for (int i = 0; i < vectSize; i++) { if (sent >= maxMessages) { break; } auto gateVector = discoveryMsg->getGateVector(); bool visited = false; cGate *gateToSend = gate(outGate.c_str(), i); for (auto g : gateVector) { auto gID = g->getConnectionId(); auto sID = gateToSend->getConnectionId(); if (gID == sID) { visited = true; break; } } if (visited) { continue; } auto msg = discoveryMsg->dup(); msg->setDirection(dir); sendDelayed(msg, delay, outGate.c_str(), i); sent++; } // delete discoveryMsg; return sent > 0; } /*std::vector<URI> CSE::UpdateBucket(discoveryMessage *msg) { std::string feature_type = msg->getFeature_type(); auto entry = getOrCreateRoutingEntry(feature_type); int uri = msg->getSenderModuleId(); auto f = std::find(entry.CSEBucket.begin(), entry.CSEBucket.end(), uri); // if the response is positive, we check the URI in the if (f != entry.CSEBucket.end()) { entry.CSEBucket.insert(entry.CSEBucket.begin(), msg->getSenderModuleId()); return entry.CSEBucket.second; } //otherwise if (entry.CSEBucket.size() <= 100) { entry.CSEBucket.insert(entry.CSEBucket.begin(), msg->getSenderModuleId()); return entry.CSEBucket; } // otherwise int i = entry.CSEBucket auto it = this->Gates[i]; int gateIndex = it.second; std::string gateName = it.first + "$o"; pingMessage *pingMsg = new pingMessage("ping"); pingMsg->setURI(uri); pingMsg->setFeature_type(feature_type); pingMsg->setFlag(PING); // ping message send(pingMsg, gateName.c_str(), gateIndex); // after receiving ping if() entry.CSEBucket.pop_back(); entry.CSEBucket.insert(entry.CSEBucket.begin(), msg->getSenderModuleId()); return entry.CSEBucket; // save the data in Routing Table this->SemanticRoutingTable[feature_type] = entry; }*/ /* * getOrCreateRoutingEntry is a primitive to avoid cumbersome map access and entry creation * if map element with such key is missing * */ RoutingEntry CSE::getOrCreateRoutingEntry(std::string feature_type) { auto it = this->SemanticRoutingTable.find(feature_type); if (it == this->SemanticRoutingTable.end()) { return RoutingEntry { }; } return it->second; } /* * mustGetRoutingEntry is used to always get non-empty routing entry by key. * If it fails, fatal error will be thrown. */ RoutingEntry CSE::mustGetRoutingEntry(std::string feature_type) { auto it = this->SemanticRoutingTable.find(feature_type); if (it == this->SemanticRoutingTable.end()) { EV_INFO<< "Expected routing entry to exist\n"; return RoutingEntry { }; } return it->second; } /* * handleNotify is used to process `discoveryMessage` of type NOTIFY. * It updates sematic routing table for specific relationship type * (e.g. Customer, Peer, Sibling, Provider) */ void CSE::handleNotify(discoveryMessage *msg) { std::string feature_type = msg->getFeature_type(); URI cse = msg->getURI_route(); int delta = msg->getDelta(); int direction = msg->getDirection(); auto entry = getOrCreateRoutingEntry(feature_type); std::string inputGate = msg->getArrivalGate()->getBaseName(); this->Gates[cse] = std::make_pair(inputGate, msg->getArrivalGate()->getIndex()); switch (direction) { case UP: entry.CSECustomer[cse] += delta; break; case DOWN: entry.CSEProvider[cse] += delta; break; case SIDE_SIBLING: entry.CSESibling[cse] += delta; break; case SIDE_PEER: entry.CSEPeer[cse] += delta; break; } this->SemanticRoutingTable[feature_type] = entry; // notification depth reached if (msg->getHopCount() >= this->NotificationDepth) { return; } EV << "Redirecting notify\n"; // notify msg->setHopCount(msg->getHopCount() + 1); msg->setURI_route(this->Uri); notifyNeighbors(msg->dup()); } /* * notifyCSE is used to create and broadcast notification message to the neighbors. */ void CSE::notifyCSE(std::string feature_type, int delta) { EV << "inside notify\n"; //assemble message auto msg = generateMessage(NOTIFY); msg->setFeature_type(feature_type.c_str()); msg->setDelta(delta); // send to CSEs notifyNeighbors(msg); } /* * notifyNeighbors is used to broadcast notification to all neighbors, * excluding the neighbor that sent the message to the current CSE. * Also, populates gate vector of the message with the arrival gate. */ // TODO: change the name of the notify message void CSE::notifyNeighbors(discoveryMessage *msg) { std::vector<cGate*> gateVector = msg->getGateVector(); //You update the discoveryMessage with this object msg->setGateVector(gateVector); if (msg->getArrivalGate() != nullptr) { gateVector.push_back(msg->getArrivalGate()->getOtherHalf()); msg->setGateVector(gateVector); } EV << "sending messages to downstream\n"; multicast("customer", msg); EV << "sending messages to sidestream\n"; multicast("peer", msg); multicast("sibling", msg); EV << "sending messages to upsteam\n"; multicast("provider", msg); delete msg; } /* * DBLookup is used to perform lookup in the semantic routing table for * AEs children of the current CSE. */ URI CSE::DBLookup(discoveryMessage *msg) { auto feature_type = msg->getFeature_type(); // extracting the feature_type auto it = this->SemanticRoutingTable.find(feature_type); // if we find the data correspond to the feature_type if (it == this->SemanticRoutingTable.end()) { return NOT_FOUND; } if (it->second.database.size() == 0) { return NOT_FOUND; } bubble("Success"); return it->second.database.begin()->first; } /* * generateMessage is used to generate message of specified type. */ discoveryMessage* CSE::generateMessage(int op_code) { switch (op_code) { case QUERY: { // Produce source and destination addresses. int URI_route = getId(); char msgname[20]; sprintf(msgname, "Q"); // Create message object and set source and destination field. discoveryMessage *msg = new discoveryMessage(msgname); msg->setDirection(DOWN); msg->setOp_code(QUERY); msg->setURI_route(URI_route); return msg; break; } case RESPONSE: { int URI_route = getId(); char msgname[20]; sprintf(msgname, "Rsp"); // Create message object and set source and destination field. discoveryMessage *msg = new discoveryMessage(msgname); //msg->setPayload("thermometer"); msg->setDirection(DOWN); msg->setOp_code(RESPONSE); msg->setURI_route(URI_route); return msg; break; } case NOTIFY: { int URI_route = getId(); char msgname[20]; sprintf(msgname, "N"); // Create message object and set source and destination field. discoveryMessage *msg = new discoveryMessage(msgname); //msg->setPayload("thermometer"); msg->setDirection(DOWN); msg->setOp_code(NOTIFY); msg->setURI_route(URI_route); msg->setURI_init(URI_route); return msg; break; } case REGISTRATION: { int URI_route = getId(); char msgname[20]; sprintf(msgname, "Rg"); // Create message object and set source and destination field. discoveryMessage *msg = new discoveryMessage(msgname); //msg->setPayload("thermometer"); msg->setDirection(DOWN); msg->setOp_code(REGISTRATION); msg->setURI_route(URI_route); return msg; break; } case CANCELLATION: { int URI_route = getId(); char msgname[20]; sprintf(msgname, "C"); // Create message object and set source and destination field. discoveryMessage *msg = new discoveryMessage(msgname); //msg->setPayload("thermometer"); msg->setDirection(DOWN); msg->setOp_code(REGISTRATION); msg->setURI_route(URI_route); return msg; break; } default: break; } return nullptr; } void CSE::orderingMap(std::map<int, int>) { return; }