/*
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
*/
// #define WHISKERWORKER_DEBUG_SOCKETS
#include "whiskerworker.h"
#include <QDebug>
#include <QTcpSocket>
#include <QTextStream>
#include "lib/datetime.h"
#include "whisker/whiskermanager.h"
using namespace whiskerconstants;
// ============================================================================
// Helper functions
// ============================================================================
// Disable the Nagle algorithm for a TCP socket, putting the socket into
// "no-delay" mode.
void disableNagle(QTcpSocket* socket)
{
Q_ASSERT(socket);
socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
}
// ============================================================================
// WhiskerWorker
// ============================================================================
WhiskerWorker::WhiskerWorker() :
QObject(nullptr),
// ... no QObject parent; see docs for QObject::moveToThread()
m_imm_port(0),
m_main_socket(new QTcpSocket(this)), // will be autodeleted by QObject
m_immediate_socket(new QTcpSocket(this)) // will be autodeleted by QObject
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
disableNagle(m_main_socket);
disableNagle(m_immediate_socket);
connect(
m_main_socket,
&QTcpSocket::connected,
this,
&WhiskerWorker::onMainSocketConnected
);
connect(
m_main_socket,
&QTcpSocket::readyRead,
this,
&WhiskerWorker::onDataReadyFromMainSocket
);
connect(
m_main_socket,
&QTcpSocket::disconnected,
this,
&WhiskerWorker::onAnySocketDisconnected
);
connect(
m_main_socket,
&QTcpSocket::errorOccurred,
this,
&WhiskerWorker::onMainSocketError
);
connect(
m_immediate_socket,
&QTcpSocket::connected,
this,
&WhiskerWorker::onImmSocketConnected
);
connect(
m_immediate_socket,
&QTcpSocket::readyRead,
this,
&WhiskerWorker::onDataReadyFromImmediateSocket
);
connect(
m_immediate_socket,
&QTcpSocket::disconnected,
this,
&WhiskerWorker::onAnySocketDisconnected
);
connect(
m_immediate_socket,
&QTcpSocket::errorOccurred,
this,
&WhiskerWorker::onImmSocketError
);
setConnectionState(WhiskerConnectionState::A_Disconnected);
}
void WhiskerWorker::connectToServer(const QString& host, quint16 main_port)
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
qInfo().nospace() << "Connecting to Whisker server: host " << host
<< ", main port " << main_port;
if (m_connection_state != WhiskerConnectionState::A_Disconnected) {
disconnectFromServer();
}
m_host = host;
m_main_port = main_port;
m_main_socket->connectToHost(host, main_port);
setConnectionState(WhiskerConnectionState::B_RequestingMain);
}
void WhiskerWorker::disconnectFromServer()
{
// This function may be called directly and triggered by sockets closing,
// including as a result of what we do here, so make sure it's happy with
// recursive/multiple calls.
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
if (m_immediate_socket->state() != QTcpSocket::UnconnectedState) {
m_immediate_socket->disconnectFromHost();
}
if (m_main_socket->state() != QTcpSocket::UnconnectedState) {
m_main_socket->disconnectFromHost();
}
if (m_connection_state != WhiskerConnectionState::A_Disconnected) {
qInfo() << "Disconnecting from Whisker server";
}
setConnectionState(WhiskerConnectionState::A_Disconnected);
}
void WhiskerWorker::sendToServer(const WhiskerOutboundCommand& cmd)
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO << cmd;
#endif
if (cmd.m_immediate_socket) {
if (!isImmediateConnected()) {
qWarning() << Q_FUNC_INFO
<< "Attempt to write to closed immediate socket";
return;
}
// WHETHER OR NOT we want the reply, we push the command.
// (We throw away the result later if we didn't want it.)
m_mutex_imm.lock();
m_imm_commands_awaiting_reply.push_back(cmd);
m_mutex_imm.unlock();
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO
<< "Writing to immediate socket:" << cmd.bytes();
#endif
m_immediate_socket->write(cmd.bytes());
} else {
if (!isMainConnected()) {
qWarning() << Q_FUNC_INFO
<< "Attempt to write to closed main socket";
return;
}
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO << "Writing to main socket:" << cmd.bytes();
#endif
m_main_socket->write(cmd.bytes());
}
}
void WhiskerWorker::setConnectionState(WhiskerConnectionState state)
{
if (state == m_connection_state) {
return;
}
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << "New Whisker connection state:"
<< whiskerConnectionStateDescription(state);
#endif
m_connection_state = state;
emit connectionStateChanged(state);
if (state == WhiskerConnectionState::G_FullyConnected) {
emit onFullyConnected();
}
}
bool WhiskerWorker::isMainConnected() const
{
return m_connection_state != WhiskerConnectionState::A_Disconnected
&& m_connection_state != WhiskerConnectionState::B_RequestingMain;
}
bool WhiskerWorker::isImmediateConnected() const
{
return m_connection_state
== WhiskerConnectionState::F_BothConnectedAwaitingLink
|| m_connection_state == WhiskerConnectionState::G_FullyConnected;
}
bool WhiskerWorker::isFullyConnected() const
{
return m_connection_state == WhiskerConnectionState::G_FullyConnected;
}
bool WhiskerWorker::isFullyDisconnected() const
{
return m_connection_state == WhiskerConnectionState::A_Disconnected;
}
void WhiskerWorker::onMainSocketConnected()
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
setConnectionState(WhiskerConnectionState::C_MainConnectedAwaitingImmPort);
}
void WhiskerWorker::onImmSocketConnected()
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
setConnectionState(WhiskerConnectionState::F_BothConnectedAwaitingLink);
// Special command follows! See pushImmediateReply()
WhiskerOutboundCommand cmd({CMD_LINK, m_code}, true, true);
sendToServer(cmd); // will send only when we quit back to the event loop
}
void WhiskerWorker::onAnySocketDisconnected()
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
disconnectFromServer();
}
void WhiskerWorker::onMainSocketError(QAbstractSocket::SocketError error)
{
QString msg;
QTextStream s(&msg);
s << "Whisker main socket error:" << error;
qWarning() << msg;
emit socketError(msg);
disconnectFromServer();
}
void WhiskerWorker::onImmSocketError(QAbstractSocket::SocketError error)
{
QString msg;
QTextStream s(&msg);
s << "Whisker immediate socket error:" << error;
qWarning() << msg;
emit socketError(msg);
disconnectFromServer();
}
void WhiskerWorker::onDataReadyFromMainSocket()
{
// We get here from a QTcpSocket event.
const QVector<WhiskerInboundMessage> messages
= getIncomingMessagesFromSocket(false);
for (const WhiskerInboundMessage& msg : messages) {
processMainSocketMessage(msg);
}
}
void WhiskerWorker::onDataReadyFromImmediateSocket()
{
// We get here from a QTcpSocket event.
QVector<WhiskerInboundMessage> messages
= getIncomingMessagesFromSocket(true);
for (WhiskerInboundMessage& msg : messages) {
pushImmediateReply(msg);
}
}
void WhiskerWorker::processMainSocketMessage(const WhiskerInboundMessage& msg)
{
// Handle the low-level connection messages, and pass anything else on
// via our signals.
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO << msg;
#endif
const QString& line = msg.message();
const QRegularExpressionMatch immport_match = IMMPORT_REGEX.match(line);
if (immport_match.hasMatch()) {
if (m_connection_state
!= WhiskerConnectionState::C_MainConnectedAwaitingImmPort) {
qWarning() << "ImmPort message received at wrong stage";
disconnectFromServer();
return;
}
m_imm_port = static_cast<quint16>(immport_match.captured(1).toUInt());
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << "Whisker server offers immediate port" << m_imm_port;
#endif
setConnectionState(WhiskerConnectionState::D_MainConnectedAwaitingCode
);
return;
}
const QRegularExpressionMatch code_match = CODE_REGEX.match(line);
if (code_match.hasMatch()) {
if (m_connection_state
!= WhiskerConnectionState::D_MainConnectedAwaitingCode) {
qWarning() << "Code message received at wrong stage";
disconnectFromServer();
return;
}
m_code = code_match.captured(1);
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << "Whisker server has provided code for immediate port";
#endif
qInfo().nospace(
) << "Connecting immediate socket to Whisker server: host "
<< m_host << ", immediate port " << m_imm_port;
m_immediate_socket->connectToHost(m_host, m_imm_port);
setConnectionState(
WhiskerConnectionState::E_MainConnectedRequestingImmediate
);
return;
}
if (line == PING) {
WhiskerOutboundCommand cmd(PING_ACK, false);
sendToServer(cmd);
return;
}
emit receivedFromServerMainSocket(msg);
}
void WhiskerWorker::pushImmediateReply(WhiskerInboundMessage& msg)
{
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
bool wake = false;
m_mutex_imm.lock();
Q_ASSERT(!m_imm_commands_awaiting_reply.isEmpty());
const WhiskerOutboundCommand& cmd = m_imm_commands_awaiting_reply.front();
if (!cmd.m_immediate_ignore_reply) {
msg.setCausalCommand(cmd.m_command);
m_imm_replies_awaiting_collection.push_back(msg);
wake = true;
}
m_imm_commands_awaiting_reply.pop_front();
m_mutex_imm.unlock();
if (m_connection_state
== WhiskerConnectionState::F_BothConnectedAwaitingLink) {
// Special!
if (msg.immediateReplySucceeded()) {
qInfo().nospace() << "Fully connected to Whisker server: host "
<< m_host << ", main port " << m_main_port
<< ", immediate port " << m_imm_port;
setConnectionState(WhiskerConnectionState::G_FullyConnected);
} else {
qWarning() << "Failed to execute Link command; reply was"
<< msg.message();
disconnectFromServer();
}
return;
}
if (wake) {
m_immediate_reply_arrived.wakeAll(); // wakes: waitForImmediateReply()
}
}
WhiskerInboundMessage WhiskerWorker::getPendingImmediateReply()
{
// CALLED FROM A DIFFERENT THREAD
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO;
#endif
m_mutex_imm.lock();
if (m_imm_replies_awaiting_collection.isEmpty()) {
// ... must hold mutex to read this
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO << "waiting for a reply...";
#endif
m_immediate_reply_arrived.wait(&m_mutex_imm);
// ... woken by: pushImmediateReply()
// ... this mutex is UNLOCKED as we go to sleep, and LOCKED
// as we wake: https://doc.qt.io/qt-6.5/qwaitcondition.html#wait
Q_ASSERT(!m_imm_replies_awaiting_collection.isEmpty());
#ifdef WHISKERWORKER_DEBUG_SOCKETS
qDebug() << Q_FUNC_INFO << "... reply ready";
#endif
}
WhiskerInboundMessage msg = m_imm_replies_awaiting_collection.front();
m_imm_replies_awaiting_collection.pop_front();
m_mutex_imm.unlock();
return msg;
}
QVector<WhiskerInboundMessage>
WhiskerWorker::getIncomingMessagesFromSocket(bool via_immediate_socket)
{
const QDateTime timestamp = datetime::now();
QTcpSocket* socket
= via_immediate_socket ? m_immediate_socket : m_main_socket;
QString& buffer
= via_immediate_socket ? m_inbound_buffer_imm : m_inbound_buffer_main;
const QByteArray bytes = socket->readAll();
const QString& string = QString::fromLatin1(bytes);
buffer += string;
return getIncomingMessagesFromBuffer(
buffer, via_immediate_socket, timestamp
);
}
QVector<WhiskerInboundMessage> WhiskerWorker::getIncomingMessagesFromBuffer(
QString& buffer, bool via_immediate_socket, const QDateTime& timestamp
)
{
QStringList strings = buffer.split(EOL);
// If the buffer contains complete responses, the last string will be
// empty. In all cases, the last string is the residual.
const int length = strings.length();
Q_ASSERT(length > 0);
buffer = strings.at(length - 1); // the residual
QVector<WhiskerInboundMessage> messages;
for (int i = 0; i < length - 1; ++i) { // the messages
const QString& content = strings.at(i);
WhiskerInboundMessage msg(content, via_immediate_socket, timestamp);
messages.append(msg);
}
return messages;
}