f6cc7c2a | 2015-09-11 00:10:48 | Timothy Pearson |
Update ProtoTerminal part to use threading and message queues |
||
M clients/tde/src/part/prototerminal/part.cpp M clients/tde/src/part/prototerminal/part.h |
||
diff --git a/clients/tde/src/part/prototerminal/part.cpp b/clients/tde/src/part/prototerminal/part.cpp index 7818988..b5e5a1d 100644 --- a/clients/tde/src/part/prototerminal/part.cpp +++ b/clients/tde/src/part/prototerminal/part.cpp @@ -15,9 +15,18 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * - * (c) 2014 Timothy Pearson + * (c) 2014 - 2015 Timothy Pearson * Raptor Engineering * http://www.raptorengineeringinc.com + */ + +/* This part illustrates the correct method of transmitting and receiving + * data in a dedicated thread, using two separate message queues to enable + * fully non-blocking, event-driven execution. + * + * NOTE + * inboundQueue is filled by the GUI thread with data inbound to the worker thread + * outboundQueue is filled by the worker thread with data outbound to the GUI thread */ #include "define.h" @@ -60,6 +69,146 @@ #define CLIENT_LIBRARY "libremotelab_prototerminal" K_EXPORT_COMPONENT_FACTORY( libremotelab_prototerminal, RemoteLab::Factory ) +ProtoTerminalWorker::ProtoTerminalWorker() : TQObject() { + m_networkDataMutex = new TQMutex(false); + m_outboundQueueMutex = new TQMutex(false); + m_inboundQueueMutex = new TQMutex(false); + m_newData = false; +} + +ProtoTerminalWorker::~ProtoTerminalWorker() { + delete m_networkDataMutex; + m_networkDataMutex = NULL; + delete m_inboundQueueMutex; + m_inboundQueueMutex = NULL; + delete m_outboundQueueMutex; + m_outboundQueueMutex = NULL; +} + +void ProtoTerminalWorker::run() { + TQEventLoop* eventLoop = TQApplication::eventLoop(); + if (!eventLoop) { + return; + } + + while (1) { + m_instrumentMutex->lock(); + + // Handle inbound queue + m_inboundQueueMutex->lock(); + if (m_inboundQueue.count() > 0) { + TQDataStream ds(m_socket); + ds.setPrintableData(true); + + ProtoTerminalEventQueue::iterator it; + for (it = m_inboundQueue.begin(); it != m_inboundQueue.end(); ++it) { + if ((*it).first == TxRxSyncPoint) { + break; + } + if ((*it).first == ConsoleTextSend) { + ds << (*it).second.toString(); + m_socket->writeEndOfFrame(); + } + it = m_inboundQueue.erase(it); + } + m_socket->flush(); + } + m_inboundQueueMutex->unlock(); + + // Handle outbound queue + if (m_newData) { + bool queue_modified = false; + m_networkDataMutex->lock(); + m_newData = false; + + // Receive data + if (m_socket->canReadFrame()) { + TQDataStream ds(m_socket); + ds.setPrintableData(true); + + // Get command status + TQString input; + while (!ds.atEnd()) { + ds >> input; + m_outboundQueueMutex->lock(); + m_outboundQueue.push_back(ProtoTerminalEvent(ConsoleTextReceive, TQVariant(input))); + m_outboundQueueMutex->unlock(); + queue_modified = true; + } + m_socket->clearFrameTail(); + } + m_networkDataMutex->unlock(); + + if (queue_modified) { + m_inboundQueueMutex->lock(); + ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin(); + if ((it) && (it != m_inboundQueue.end())) { + // Remove sync point + if ((*it).first == TxRxSyncPoint) { + it = m_inboundQueue.erase(it); + } + } + m_inboundQueueMutex->unlock(); + emit(outboundQueueUpdated()); + } + } + + m_instrumentMutex->unlock(); + + // Wait for queue status change or new network activity + if (!eventLoop->processEvents(TQEventLoop::ExcludeUserInput)) { + eventLoop->processEvents(TQEventLoop::ExcludeUserInput | TQEventLoop::WaitForMore); + } + } + + eventLoop->exit(0); +} + +void ProtoTerminalWorker::appendItemToInboundQueue(ProtoTerminalEvent item, bool syncPoint) { + m_inboundQueueMutex->lock(); + m_inboundQueue.push_back(item); + if (syncPoint) { + m_inboundQueue.push_back(ProtoTerminalEvent(TxRxSyncPoint, TQVariant())); + } + m_inboundQueueMutex->unlock(); +} + +bool ProtoTerminalWorker::syncPointActive() { + bool active = false; + + m_inboundQueueMutex->lock(); + ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin(); + if ((it) && (it != m_inboundQueue.end())) { + if ((*it).first == TxRxSyncPoint) { + active = true; + } + } + m_inboundQueueMutex->unlock(); + + return active; +} + +void ProtoTerminalWorker::wake() { + // Do nothing -- the main event loop will wake when this is called +} + +void ProtoTerminalWorker::dataReceived() { + m_networkDataMutex->lock(); + m_newData = true; + m_networkDataMutex->unlock(); +} + +void ProtoTerminalWorker::lockOutboundQueue() { + m_outboundQueueMutex->lock(); +} + +void ProtoTerminalWorker::unlockOutboundQueue() { + m_outboundQueueMutex->unlock(); +} + +ProtoTerminalEventQueue* ProtoTerminalWorker::outboundQueue() { + return &m_outboundQueue; +} ProtoTerminalPart::ProtoTerminalPart( TQWidget *parentWidget, const char *widgetName, TQObject *parent, const char *name, const TQStringList& ) : RemoteInstrumentPart( parent, name ), m_commHandlerState(-1), m_commHandlerMode(0), m_commHandlerCommandState(0), m_connectionActiveAndValid(false), m_base(0) @@ -74,11 +223,16 @@ setInstance(Factory::instance()); setWidget(new TQVBox(parentWidget, widgetName)); + // Set up worker + m_worker = new ProtoTerminalWorker(); + m_workerThread = new TQEventLoopThread(); + m_worker->moveToThread(m_workerThread); + TQObject::connect(this, TQT_SIGNAL(wakeWorkerThread()), m_worker, TQT_SLOT(wake())); + TQObject::connect(m_worker, TQT_SIGNAL(outboundQueueUpdated()), this, TQT_SLOT(processOutboundQueue())); + // Create timers - m_forcedUpdateTimer = new TQTimer(this); - connect(m_forcedUpdateTimer, SIGNAL(timeout()), this, SLOT(mainEventLoop())); m_updateTimeoutTimer = new TQTimer(this); - connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(mainEventLoop())); + connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(networkTimeout())); // Create widgets m_base = new ProtoTerminalBase(widget()); @@ -99,6 +253,15 @@ disconnectFromServer(); delete m_instrumentMutex; + + if (m_workerThread) { + m_workerThread->terminate(); + m_workerThread->wait(); + delete m_workerThread; + m_workerThread = NULL; + delete m_worker; + m_worker = NULL; + } } void ProtoTerminalPart::postInit() { @@ -129,22 +292,31 @@ } void ProtoTerminalPart::disconnectFromServerCallback() { - m_forcedUpdateTimer->stop(); m_updateTimeoutTimer->stop(); m_connectionActiveAndValid = false; } void ProtoTerminalPart::connectionFinishedCallback() { + // Finish worker setup + m_worker->m_socket = m_socket; + m_worker->m_instrumentMutex = m_instrumentMutex; + m_socket->moveToThread(m_workerThread); + connect(m_socket, SIGNAL(readyRead()), m_socket, SLOT(processPendingData())); m_socket->processPendingData(); - connect(m_socket, SIGNAL(newDataReceived()), this, SLOT(mainEventLoop())); + connect(m_socket, SIGNAL(newDataReceived()), m_worker, SLOT(dataReceived())); m_tickerState = 0; m_commHandlerState = 0; m_commHandlerMode = 0; m_socket->setDataTimeout(NETWORK_COMM_TIMEOUT_MS); m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE); + + // Start worker + m_workerThread->start(); + TQTimer::singleShot(0, m_worker, SLOT(run())); + processLockouts(); - mainEventLoop(); + networkTick(); return; } @@ -176,119 +348,60 @@ } } -#define UPDATEDISPLAY_TIMEOUT m_connectionActiveAndValid = false; \ - m_tickerState = 0; \ - m_commHandlerState = 2; \ - m_commHandlerMode = 0; \ ** Diff limit reached (max: 250 lines) ** |