#define MD_CORE #include "cliever-md.h" #include "masterDaemon.h" #include "Listener.cpp" #include "EventSender2.cpp" void attention(); void arCallback(const boost::system::error_code& error); void mdWQ(); void masterDaemon::dispatch(mdWQitem *next) { bool success; boost::system::error_code ec; const char *failure; int sentBytes,step=1; mdResponse *ackOrNak = (mdResponse *)next->what; switch(next->kind) { case DV_MDQUERY: failure = (const char *)&ackOrNak->reply.dg.payLoad[0]; success = ackOrNak->reply.dg.hdr.dgType.value == 1; goto commonReply; case MD_NEWBORN: ackOrNak->reply.dg.hdr.msgType = MDDG_NEWBORN; failure = "Stillbirth"; success = ackOrNak->mdStdDevIdx >= 0; commonReply: assert(cb.find(ackOrNak->mdStdDevIdx) != cb.end()); ackOrNak->reply.dg.hdr.clientType = MDDEV_MD; if (success) { if (!cb[ackOrNak->mdStdDevIdx]->connection.open) { ackOrNak->bus->connect_to(ackOrNak->ip,ec,step,ackOrNak->mdStdDevIdx); if (cb[ackOrNak->mdStdDevIdx]->connection.open) { if ((sentBytes=ackOrNak->bus->send(ackOrNak->reply,ec)) != sizeof(mdDGReply)) theseLogs->logNdebug(1,2,"incomplete blocking send: %d: %s",sentBytes,ec.message().c_str()); } else theseLogs->logNdebug(1,2,"Couldn't get back channel to client: %s (in step %d).",ec.message().c_str(),step); } else if ((sentBytes=ackOrNak->bus->send(ackOrNak->reply,ec)) != sizeof(mdDGReply)) theseLogs->logNdebug(1,2,"incomplete blocking send: %d: %s",sentBytes,ec.message().c_str()); } else { theseLogs->logNdebug(1,0,failure); } delete ackOrNak; break; } delete next; } /* * Prenatal heartbeats and commands are not queued. * Everything else is. * */ void masterDaemon::dispatch(const mdIncoming &what) { bool isObservation; const char *name,*xStr; int about=what.dg.hdr.handle; md_device thisKind; mdPeer *d1; mdMachine *d2; map::iterator iter = thisConfig->allClients.find(about); switch(what.dg.hdr.msgType) { case MDDG_CDRESET: theseLogs->logN(0,"Shutdown request received from a Cliever"); thisConfig->halt = true; break; case MDDG_MDQUERY: if( iter == thisConfig->allClients.end() ) theseLogs->logN(1,"Query for device whose handle (%d) has disappeared, ignored.", about ); else { thisKind = thisConfig->allClients[about]->devType; name = &what.dg.payLoad[0]; xStr = &what.dg.payLoad[what.dg.hdr.primeOffset]; switch(what.dg.hdr.dgSubType) { case MDDG_REGSCPI: theseLogs->logNdebug(NORMAL_DEBUG,4,"Src SCPI: '%s' from type: %d ('%s'), handle %d.", name, what.dg.hdr.clientType,xStr,about); if (thisKind == MACHINE) theMachine->registerCmd(name,what); else thisConfig->allEPPPeers[about]->registerCmd(name,what); break; // case MDDG_REGODE: // theseLogs->logNdebug(NORMAL_DEBUG,4,"Src ODE: '%s' from type: %d ('%s'), handle %d.", name, what.dg.hdr.clientType,xStr,about); // goto regName; // case MDDG_REGOBS: // theseLogs->logNdebug(NORMAL_DEBUG,4,"Src Obs: '%s' from type: %d ('%s'), handle %d.", name, what.dg.hdr.clientType,xStr,about); // regName: // if (thisKind == MACHINE) theMachine->state.registerData(name,what); // else thisConfig->eppPeers[about]->state.registerData(name,what); // break; }} break; case MDDG_HEARTBEAT: if (!what.dg.hdr.sourceHandle) { if (what.dg.hdr.clientType < MDDEV_CD || what.dg.hdr.clientType > MDDEV_CLIENT) { theseLogs->logN(1,"Heartbeat from unknown client type: %d, ignored.", what.dg.hdr.clientType); break; } theseLogs->logNdebug(NORMAL_DEBUG*4,1,"Heartbeat from new %s ...",clientTypes[what.dg.hdr.clientType]); if (what.dg.hdr.primeOffset >= 5) { mdClientBirth itsAWhat; name = (char *)(&what.dg.payLoad[what.dg.hdr.primeOffset]); theseLogs->logNdebug(NORMAL_DEBUG*4,1," ... its telemetry port is %s.",what.dg.payLoad); theseLogs->logNdebug(NORMAL_DEBUG*4,1," ... '%s' will be its _deviceName.",name); itsAWhat.dg = what.dg; itsAWhat.ip = what.ip; itsAWhat.ip.port((unsigned short)atoi(what.dg.payLoad)); itsAWhat.dgDetermined = true; itsAWhat.send(); } else theseLogs->logN(1,"Heartbeat didn't appear to say what port to use, ignored."); } else { theseLogs->logNdebug(MAX_DEBUG,2,"Heartbeat from client with handle: %d.",what.dg.hdr.sourceHandle); } break; } } int masterDaemon::initAusRegTK(void) { #if ARTKENABLED > 0 int rc=OK; const std::string cfg(thisConfig->cfg_path.empty() ? "/home/jdaugherty/etc/toolkit2.conf" : thisConfig->cfg_path ); try { theseLogs->logN(2,"Attaching AusRegistry EPP Toolkit %d at tkScenario %d.", ARTKENABLED, thisConfig->tkScenario); artk = new ausRegEPPTK(); if (!artk->doNothing(cfg)) { theseLogs->logN(1,"Toolkit init failed."); rc = NOT_OK; } else theseLogs->logN(0,"Attached AusRegistry EPP Toolkit."); } catch (std::exception& e) { theseLogs->logN(1,"Fatal error binding toolkit: %s .",e.what()); } catch(...) { theseLogs->logN(1,"General exception attaching toolkit."); rc = NOT_OK; } return rc; #endif } void masterDaemon::listen() { EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); EventSender::add(*this); assert(EventSender::getNumListeners() == 1); boost::thread mdAr(attention); } void masterDaemon::processEvent( const mdAttention &thisAR ) { assert(EventSender::isSending()); } void masterDaemon::processEvent( const mdCDPulse &thisPulse ) { assert(EventSender::isSending()); } void masterDaemon::processEvent( const mdClientBirth &thisWhat ) { assert(EventSender::isSending()); if (thisWhat.dgDetermined) { deviceFactory->newFromHeartbeat(thisWhat); } else { deviceFactory->newFromAPI( thisWhat.clientType,thisWhat.signature); } } void masterDaemon::processEvent( const mdClientDeath &thisWas ) { assert(EventSender::isSending()); } void masterDaemon::processEvent( const mdHostCommand &thisCmd ) { assert(EventSender::isSending()); } void masterDaemon::processEvent( const mdIncoming &thisDatagram ) { assert(EventSender::isSending()); thisService->dispatch(thisDatagram); } void masterDaemon::processEvent( const mdResponse &thisReply ) { const void *queued = &thisReply; assert(EventSender::isSending()); queue(new mdWQitem( queued, thisReply.dCat, 0 )); } void masterDaemon::processEvent( const mdAPIFrame &thisFrame ) { assert(EventSender::isSending()); } void oteA() { #if (ARTKENABLED > 0) int nthDay; for (nthDay=0;nthDay<30;nthDay++) { theseLogs->logNdebug(MAX_DEBUG,1,"OTE Connectivity day %d",nthDay); thisService->artk->doOTEA(); theseLogs->logNdebug(MAX_DEBUG,1,"OTE Connectivity day %d",nthDay); thisService->artk->daysRunning = nthDay; } #endif }void oteB() { #if (ARTKENABLED > 0) theseLogs->logNdebug(MAX_DEBUG,0,"OTE Basic Access BOJ"); thisService->artk->doOTEB(); theseLogs->logNdebug(MAX_DEBUG,0,"OTE Basic Access end of Batch"); #endif } void oteC() { #if (ARTKENABLED > 0) int nthDay; theseLogs->logNdebug(MAX_DEBUG,0,"OTE Full Access BOJ"); thisService->artk->doOTEB(); theseLogs->logNdebug(MAX_DEBUG,0,"OTE Full Access end of Batch"); #endif } void oteP() { #if (ARTKENABLED > 0) int nthDay; for (nthDay=0;nthDay<30;nthDay++) { theseLogs->logNdebug(MAX_DEBUG,1,"AC Production SoD %d",nthDay); thisService->artk->doOTEP(); theseLogs->logNdebug(MAX_DEBUG,1,"AC Production EoD %d",nthDay); thisService->artk->daysRunning = nthDay; } #endif } void masterDaemon::run() { deviceFactory = new mdHostFabrik(); fg = new mdDGChannel( thisService->io_, 0 ); #if (ARTKENABLED > 0) if (initAusRegTK()) return; theseLogs->logNdebug(MAX_DEBUG,1,"Aus Registry Toolkit Scenario %d.", thisConfig->tkScenario); if (artk->tkScenario == ACTK_OTEA) { boost::thread oteArun(oteA); assert(oteArun.joinable()); oteArun.join(); theseLogs->logNdebug(MAX_DEBUG,0,"OTE Connectivity Test EOJ."); } if (artk->tkScenario == ACTK_OTEB) { boost::thread oteBrun(oteB); assert(oteBrun.joinable()); oteBrun.join(); theseLogs->logNdebug(MAX_DEBUG,0,"OTE Basic Access Test EOJ."); } if (artk->tkScenario == ACTK_OTEC) { boost::thread oteCrun(oteC); assert(oteCrun.joinable()); oteCrun.join(); theseLogs->logNdebug(MAX_DEBUG,0,"OTE Full Access EOJ."); } if (artk->tkScenario == ACTK_PROD) { boost::thread otePrun(oteC); assert(otePrun.joinable()); otePrun.join(); theseLogs->logNdebug(MAX_DEBUG,0,"AC Production EOJ."); } #endif listen(); boost::thread work(mdWQ); assert(work.joinable()); theseLogs->logNdebug(MAX_DEBUG,0,"Batches done, MD worker start, joins worker."); io_.run(); work.join(); } void mdDGChannel::handle_receive_from(const boost::system::error_code& error, size_t bytes_recvd) { const char *c1; if (!error && bytes_recvd > 0) { mdIncoming incoming(thisService->bg); incoming.ip = thisService->bg->p_endpoint_; c1 = thisService->bg->p_endpoint_.address().to_string().c_str(); if (incoming.dg.hdr.clientType > 0 && incoming.dg.hdr.clientType < N_MDDEV_TYPES) { theseLogs->logNdebug(MAX_DEBUG,3,"msgtype %d received from %s (a '%s').",incoming.dg.hdr.msgType,c1,clientTypes[incoming.dg.hdr.clientType]); incoming.send(); } else theseLogs->logNdebug(1,2,"msgtype %d received from unknown MD client type at %s, ignored.",incoming.dg.hdr.msgType,c1); } passive_.async_receive_from( boost::asio::buffer(data_, MD_MAX_DATAGRAM), p_endpoint_, boost::bind(&mdDGChannel::handle_receive_from, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } // Callbacks, thread runners, and other ordinary functions. void attention() { bool announced = false; // The housekeeping routine is the only cyclic event in MD boost::asio::deadline_timer t0(thisService->io_, boost::posix_time::seconds(MD_HAUSHALT)); while (!thisService->shuttingDown) {t0.async_wait(arCallback); sleep(2*MD_HAUSHALT); if (!announced) {announced = true; theseLogs->logNdebug(MAX_DEBUG,0,"First invocation of housekeeping.");} } } void arCallback(const boost::system::error_code& error) { mdAttention housekeep( thisService->arCycles++ ); housekeep.send(); } void runMasterDaemon() { cb[0] = new mdCB(); thisService = new masterDaemon( thisConfig ); thisService->run(); } void mdWQ() { while(!thisConfig->shutdown) { if (!thisConfig->halt && thisService->q.size()) { thisService->dispatch(thisService->q.top()); thisService->q.pop(); } else boost::this_thread::yield(); } } void runAPILayer() { boost::asio::io_service io_; theseLogs->logN(1,"Background dgram service thread starting on port %d.",thisConfig->clientPort); try { thisService->bg = new mdDGChannel(io_, thisConfig->clientPort); io_.run(); } catch (std::exception& e) { theseLogs->logN(1,"Fatal error on API dgram layer: %s .",e.what()); } catch (...) { theseLogs->logN(0,"Unknown failure in background service layer."); } theseLogs->logNdebug(1,0,"mainbus background thread exited."); }