WebSocketサーバーの終了1(2012/10/02-1)

現在、WebSocketサーバープログラムは起動すると指定されたポート番号で接続の待機処理を行う。接続要求が到達すると、別スレッドを立ててクライアントとの通信を開始するが、メインスレッドでは再びポートを待ち受け状態にし、別のクライアントが接続してくるのを待つようにしている。
通常のサーバーの場合、何らかの外部アクションに対して終了したり設定を再読み込みしたり、接続しているクライアントの情報を返したりなどという、管理用の処理も行う必要があるが、現状、本WebSocketサーバーではこの様な処理を行わせていない。
そのため、サーバーの終了のためにはプロセス自体を強制終了させる必要がある。

今回の改造では、外部からのアクションに応じてサーバーを終了させる処理を実装する。将来的にマネージャープログラムとの通信による管理処理を行うことを念頭に置きつつその処理を実装して行こうと思う。

サーバーの終了処理を行うためには、サーバーのメインスレッドの待機ループを終わらせる必要があるが、それだけでは不十分である。
現在接続されているクライアントがある場合には、そのクライアントを安全に終了させるかあるいは、サーバー用のスレッドを安全に終了させる必要がある。各スレッドが安全に終了したことを確認して、メインスレッドを終了させてやらなければ、サーバープログラムはクラッシュして終了することになるからだ。
そのためには、以下の手順が必要となると思われる。 この処理を実装するために、まずはメインスレッドで各サーバースレッドの情報を管理する必要が出てくる。
そのためのクラス、CThreadManagerを以下のように定義した

ThreadManager.h
#pragma once

#include "SPWebSocketServerBase.h"

class CThreadElement
{
    friend class CThreadManager;

protected:
    CSPWebSocketServerBase * m_pServer;
    CThreadElement * m_pNextElement;

public:
    CThreadElement();
    ~CThreadElement();

    void SetServerThread( CSPWebSocketServerBase * pServer );
    CSPWebSocketServerBase * GetServerThread();
    HANDLE GetThreadHandle();

};

class CThreadManager
{
protected:
    CThreadElement * m_pFirstElement;
    CThreadElement * m_pLastElement;

    void ReleaseThreadElements();
    HANDLE m_hMutex;
    int m_iThreadCounter;

    static bool ms_bStopServer;

public:
    CThreadManager();
    ~CThreadManager();

    void AddServerThread( CSPWebSocketServerBase * pServer );
    void DeleteServerThread( CSPWebSocketServerBase * pServer );
    void WaitAllThread();

    static bool ServerStopped();
    static void StopServer();

};
CThreadElementクラスは起動したスレッドのメインルーチンであるCSPWebSocketServerBaseのインスタンスへのポインタを保持する。CThreadManagerはそのCThreadElementのインスタンスを複数保持する。CThreadManagerはスレッドセーフのクラスとするため、ミューテックスをひとつ使用する。
CSPWebSocketServerBase内には、メインスレッドから送られてくるパラメータCServerParameterのインスタンスへのポインタが含まれている。
このCServerParametrer内のパラメータにメインスレッドでサーバースレッドを作成時に取得したスレッドへのハンドルを保持させるようにする。
以下はそのために行ったCServerParameterおよびメインルーチンの改造箇所となる。

SPWebSocketServerBase.h(CServerParaameterの改造箇所)
//
//    サーバーパラメータ
//
class CServerParameter
{
private:
    sockaddr_in m_sockAddr;
    SOCKET m_sockClient;
    HANDLE m_hThreadHandle;

public:
    void SetSockAddr( sockaddr_in sockAddr )
    {
        m_sockAddr = sockAddr;
    }
    sockaddr_in * GetSockAddr()
    {
        return &m_sockAddr;
    }

    void SetSocketClient( SOCKET sockClient )
    {
        m_sockClient = sockClient;
    }
    SOCKET GetSockClient()
    {
        return m_sockClient;
    }

    void SetThreadHandle( HANDLE hThreadHandle )
    {
        m_hThreadHandle = hThreadHandle;
    }
    HANDLE GetThreadHandle()
    {
        return m_hThreadHandle;
    }

};

SPWSSMain.cpp(アクセプト時のスレッドハンドルのセット)
                if( events.lNetworkEvents & FD_ACCEPT )
                {
                    sockaddr_in sockAddr;
                    int iAddrLen = sizeof(sockaddr_in);
                    SOCKET sockClient = accept( sockListen , (sockaddr*)&sockAddr , &iAddrLen );

                    //接続実行
                    if( sockClient != SOCKET_ERROR )
                    {
                        DWORD dwThreadID;
                        CServerParameter *pParameter = new CServerParameter();
                        pParameter->SetSocketClient( sockClient );
                        pParameter->SetSockAddr( sockAddr );

                        pParameter->SetThreadHandle( CreateThread( NULL , 0 , CSPWebSocketServerBase::ServerThread , pParameter, 0, &dwThreadID ) );
                    }
                }

ThreadManager.cpp
#include "ThreadManager.h"
#include "Setting.h"

#include "memoryChecker.h"

#pragma once

//
//    スレッド情報を保持するCThreadElementのコンストラクタ
//
CThreadElement::CThreadElement()
{
    m_pNextElement = NULL;
    m_pServer = NULL;
}

CThreadElement::~CThreadElement()
{
}

//
//    スレッド毎に作成される各サーバープログラムのインスタンスへのポインタ(セッタ)
//
void CThreadElement::SetServerThread( CSPWebSocketServerBase * pServer )
{
    m_pServer = pServer;
}
//
//    スレッド毎に作成される各サーバープログラムのインスタンスへのポインタ(ゲッタ)
//
CSPWebSocketServerBase * CThreadElement::GetServerThread()
{
    return m_pServer;
}

//
//    スレッドのハンドルの取得
//        各サーバープログラムのサーバーパラメータより取得する
//
HANDLE CThreadElement::GetThreadHandle()
{
    HANDLE hThreadHandle = INVALID_HANDLE_VALUE;
    if( m_pServer && m_pServer->GetServerParameter() )
    {
        hThreadHandle = m_pServer->GetServerParameter()->GetThreadHandle();
    }

    return hThreadHandle;
}

///////////////////////////////////////////////////////////////////////////////

//
//    サーバーが終了処理に入っているかどうか
//
bool CThreadManager::ms_bStopServer = false;

//
//    サーバーマネージャーのコンストラクタ
//        同期用のミューテックスを作成する
//
CThreadManager::CThreadManager()
{
    m_hMutex = CreateMutex( NULL, FALSE, NULL );
    m_pFirstElement = m_pLastElement = NULL;
    m_iThreadCounter = 0;
}

//
//    サーバーマネージャーのデストラクタ
//        スレッドが登録されている場合には全てを破棄する
//        メモリダンプによりメモリリークを検出する
//
CThreadManager::~CThreadManager()
{
    ReleaseThreadElements();

    CloseHandle( m_hMutex );

    //メモリダンプ
    _CrtDumpMemoryLeaks();
}

//
//    登録されている全スレッド情報を破棄
//
void CThreadManager::ReleaseThreadElements()
{
    WaitForSingleObject( m_hMutex, INFINITE );

    while( m_pFirstElement )
    {
        CThreadElement * pDelete = m_pFirstElement;
        m_pFirstElement = m_pFirstElement->m_pNextElement;
        delete pDelete;
    }

    m_pFirstElement = m_pLastElement = NULL;

    ReleaseMutex( m_hMutex );
}

//
//    全スレッドの終了を待機する
//
void CThreadManager::WaitAllThread()
{
    WaitForSingleObject( m_hMutex, INFINITE );

    HANDLE * phThreads = new HANDLE[ m_iThreadCounter ];

    int iIndex = 0;
    for( CThreadElement * pElement = m_pFirstElement
        ; pElement && iIndex < m_iThreadCounter
        ; pElement = pElement->m_pNextElement, iIndex++ )
    {
        phThreads[iIndex] = pElement->GetThreadHandle();
    }

    WaitForMultipleObjects( m_iThreadCounter, phThreads, TRUE, CSetting::Inst()->GetServerThreadWait() );

    delete[] phThreads;

    ReleaseMutex( m_hMutex );
}

//
//    マネージャークラスにサーバースレッドを登録しする
//
void CThreadManager::AddServerThread( CSPWebSocketServerBase * pServer )
{
    CThreadElement * pElement = new CThreadElement();
    pElement->SetServerThread( pServer );

    WaitForSingleObject( m_hMutex, INFINITE );

    if( m_pFirstElement )
    {
        m_pLastElement->m_pNextElement = pElement;
        m_pLastElement = pElement;

        m_iThreadCounter++;
    }
    else
    {
        m_pFirstElement = m_pLastElement = pElement;
        m_iThreadCounter = 1;
    }

    ReleaseMutex( m_hMutex );
}

//
//    マネージャークラスからサーバースレッドを削除する
//
void CThreadManager::DeleteServerThread( CSPWebSocketServerBase * pServer )
{
    CThreadElement * pPrevElement = NULL;

    WaitForSingleObject( m_hMutex, INFINITE );

    for( CThreadElement * pElement = m_pFirstElement ; pElement ; pElement = pElement->m_pNextElement )
    {
        if( pElement->GetServerThread() == pServer )
        {
            if( pPrevElement )
            {
                pPrevElement->m_pNextElement = pElement->m_pNextElement;
                if( pElement->m_pNextElement == NULL )
                {
                    m_pLastElement = pPrevElement;
                }
            }
            else
            {
                m_pFirstElement = pElement->m_pNextElement;
                if( m_pFirstElement == NULL )
                {
                    m_pLastElement = NULL;
                }
                else if( m_pFirstElement->m_pNextElement == NULL )
                {
                    m_pLastElement = m_pFirstElement;
                }
            }

            delete pElement;
            m_iThreadCounter--;
            break;
        }
    }

    ReleaseMutex( m_hMutex );
}

//
//    サーバーが停止処理中かどうか
//
bool CThreadManager::ServerStopped()
{
    return ms_bStopServer;
}

//
//    サーバーを停止する
//
void CThreadManager::StopServer()
{
    ms_bStopServer = true;
}