Branch: master

f6cc7c2a 2015-09-11 00:10:48 Timothy Pearson
Update ProtoTerminal part to use threading and message queues
M clients/tde/src/part/prototerminal/part.cpp
M clients/tde/src/part/prototerminal/part.h
diff --git a/clients/tde/src/part/prototerminal/part.cpp b/clients/tde/src/part/prototerminal/part.cpp
index 7818988..b5e5a1d 100644
--- a/clients/tde/src/part/prototerminal/part.cpp
+++ b/clients/tde/src/part/prototerminal/part.cpp
@@ -15,9 +15,18 @@
  * with this program; if not, write to the Free Software Foundation, Inc.,
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  *
- * (c) 2014 Timothy Pearson
+ * (c) 2014 - 2015 Timothy Pearson
  * Raptor Engineering
  * http://www.raptorengineeringinc.com
+ */
+
+/* This part illustrates the correct method of transmitting and receiving
+ * data in a dedicated thread, using two separate message queues to enable
+ * fully non-blocking, event-driven execution.
+ *
+ * NOTE
+ * inboundQueue is filled by the GUI thread with data inbound to the worker thread
+ * outboundQueue is filled by the worker thread with data outbound to the GUI thread
  */
 
 #include "define.h"
@@ -60,6 +69,146 @@
 #define CLIENT_LIBRARY "libremotelab_prototerminal"
 K_EXPORT_COMPONENT_FACTORY( libremotelab_prototerminal, RemoteLab::Factory )
 
+ProtoTerminalWorker::ProtoTerminalWorker() : TQObject() {
+	m_networkDataMutex = new TQMutex(false);
+	m_outboundQueueMutex = new TQMutex(false);
+	m_inboundQueueMutex = new TQMutex(false);
+	m_newData = false;
+}
+
+ProtoTerminalWorker::~ProtoTerminalWorker() {
+	delete m_networkDataMutex;
+	m_networkDataMutex = NULL;
+	delete m_inboundQueueMutex;
+	m_inboundQueueMutex = NULL;
+	delete m_outboundQueueMutex;
+	m_outboundQueueMutex = NULL;
+}
+
+void ProtoTerminalWorker::run() {
+	TQEventLoop* eventLoop = TQApplication::eventLoop();
+	if (!eventLoop) {
+		return;
+	}
+
+	while (1) {
+		m_instrumentMutex->lock();
+
+		// Handle inbound queue
+		m_inboundQueueMutex->lock();
+		if (m_inboundQueue.count() > 0) {
+			TQDataStream ds(m_socket);
+			ds.setPrintableData(true);
+
+			ProtoTerminalEventQueue::iterator it;
+			for (it = m_inboundQueue.begin(); it != m_inboundQueue.end(); ++it) {
+				if ((*it).first == TxRxSyncPoint) {
+					break;
+				}
+				if ((*it).first == ConsoleTextSend) {
+					ds << (*it).second.toString();
+					m_socket->writeEndOfFrame();
+				}
+				it = m_inboundQueue.erase(it);
+			}
+			m_socket->flush();
+		}
+		m_inboundQueueMutex->unlock();
+
+		// Handle outbound queue
+		if (m_newData) {
+			bool queue_modified = false;
+			m_networkDataMutex->lock();
+			m_newData = false;
+
+			// Receive data
+			if (m_socket->canReadFrame()) {
+				TQDataStream ds(m_socket);
+				ds.setPrintableData(true);
+
+				// Get command status
+				TQString input;
+				while (!ds.atEnd()) {
+					ds >> input;
+					m_outboundQueueMutex->lock();
+					m_outboundQueue.push_back(ProtoTerminalEvent(ConsoleTextReceive, TQVariant(input)));
+					m_outboundQueueMutex->unlock();
+					queue_modified = true;
+				}
+				m_socket->clearFrameTail();
+			}
+			m_networkDataMutex->unlock();
+
+			if (queue_modified) {
+				m_inboundQueueMutex->lock();
+				ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin();
+				if ((it) && (it != m_inboundQueue.end())) {
+					// Remove sync point
+					if ((*it).first == TxRxSyncPoint) {
+						it = m_inboundQueue.erase(it);
+					}
+				}
+				m_inboundQueueMutex->unlock();
+				emit(outboundQueueUpdated());
+			}
+		}
+
+		m_instrumentMutex->unlock();
+
+		// Wait for queue status change or new network activity
+		if (!eventLoop->processEvents(TQEventLoop::ExcludeUserInput)) {
+			eventLoop->processEvents(TQEventLoop::ExcludeUserInput | TQEventLoop::WaitForMore);
+		}
+	}
+
+	eventLoop->exit(0);
+}
+
+void ProtoTerminalWorker::appendItemToInboundQueue(ProtoTerminalEvent item, bool syncPoint) {
+	m_inboundQueueMutex->lock();
+	m_inboundQueue.push_back(item);
+	if (syncPoint) {
+		m_inboundQueue.push_back(ProtoTerminalEvent(TxRxSyncPoint, TQVariant()));
+	}
+	m_inboundQueueMutex->unlock();
+}
+
+bool ProtoTerminalWorker::syncPointActive() {
+	bool active = false;
+
+	m_inboundQueueMutex->lock();
+	ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin();
+	if ((it) && (it != m_inboundQueue.end())) {
+		if ((*it).first == TxRxSyncPoint) {
+			active = true;
+		}
+	}
+	m_inboundQueueMutex->unlock();
+
+	return active;
+}
+
+void ProtoTerminalWorker::wake() {
+	// Do nothing -- the main event loop will wake when this is called
+}
+
+void ProtoTerminalWorker::dataReceived() {
+	m_networkDataMutex->lock();
+	m_newData = true;
+	m_networkDataMutex->unlock();
+}
+
+void ProtoTerminalWorker::lockOutboundQueue() {
+	m_outboundQueueMutex->lock();
+}
+
+void ProtoTerminalWorker::unlockOutboundQueue() {
+	m_outboundQueueMutex->unlock();
+}
+
+ProtoTerminalEventQueue* ProtoTerminalWorker::outboundQueue() {
+	return &m_outboundQueue;
+}
 
 ProtoTerminalPart::ProtoTerminalPart( TQWidget *parentWidget, const char *widgetName, TQObject *parent, const char *name, const TQStringList& )
 	: RemoteInstrumentPart( parent, name ), m_commHandlerState(-1), m_commHandlerMode(0), m_commHandlerCommandState(0), m_connectionActiveAndValid(false), m_base(0)
@@ -74,11 +223,16 @@
 	setInstance(Factory::instance());
 	setWidget(new TQVBox(parentWidget, widgetName));
 
+	// Set up worker
+	m_worker = new ProtoTerminalWorker();
+	m_workerThread = new TQEventLoopThread();
+	m_worker->moveToThread(m_workerThread);
+	TQObject::connect(this, TQT_SIGNAL(wakeWorkerThread()), m_worker, TQT_SLOT(wake()));
+	TQObject::connect(m_worker, TQT_SIGNAL(outboundQueueUpdated()), this, TQT_SLOT(processOutboundQueue()));
+
 	// Create timers
-	m_forcedUpdateTimer = new TQTimer(this);
-	connect(m_forcedUpdateTimer, SIGNAL(timeout()), this, SLOT(mainEventLoop()));
 	m_updateTimeoutTimer = new TQTimer(this);
-	connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(mainEventLoop()));
+	connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(networkTimeout()));
 
 	// Create widgets
 	m_base = new ProtoTerminalBase(widget());
@@ -99,6 +253,15 @@
 
 	disconnectFromServer();
 	delete m_instrumentMutex;
+
+	if (m_workerThread) {
+		m_workerThread->terminate();
+		m_workerThread->wait();
+		delete m_workerThread;
+		m_workerThread = NULL;
+		delete m_worker;
+		m_worker = NULL;
+	}
 }
 
 void ProtoTerminalPart::postInit() {
@@ -129,22 +292,31 @@
 }
 
 void ProtoTerminalPart::disconnectFromServerCallback() {
-	m_forcedUpdateTimer->stop();
 	m_updateTimeoutTimer->stop();
 	m_connectionActiveAndValid = false;
 }
 
 void ProtoTerminalPart::connectionFinishedCallback() {
+	// Finish worker setup
+	m_worker->m_socket = m_socket;
+	m_worker->m_instrumentMutex = m_instrumentMutex;
+	m_socket->moveToThread(m_workerThread);
+
 	connect(m_socket, SIGNAL(readyRead()), m_socket, SLOT(processPendingData()));
 	m_socket->processPendingData();
-	connect(m_socket, SIGNAL(newDataReceived()), this, SLOT(mainEventLoop()));
+	connect(m_socket, SIGNAL(newDataReceived()), m_worker, SLOT(dataReceived()));
 	m_tickerState = 0;
 	m_commHandlerState = 0;
 	m_commHandlerMode = 0;
 	m_socket->setDataTimeout(NETWORK_COMM_TIMEOUT_MS);
 	m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE);
+
+	// Start worker
+	m_workerThread->start();
+	TQTimer::singleShot(0, m_worker, SLOT(run()));
+
 	processLockouts();
-	mainEventLoop();
+	networkTick();
 	return;
 }
 
@@ -176,119 +348,60 @@
 	}
 }
 
-#define UPDATEDISPLAY_TIMEOUT		m_connectionActiveAndValid = false;														\
-					m_tickerState = 0;																\
-					m_commHandlerState = 2;																\
-					m_commHandlerMode = 0;																\
 ** Diff limit reached (max: 250 lines) **