• proactor windows下的线程模型问题 - [ACE]

    2008-01-09

    版权声明:转载时请以超链接形式标明文章原始出处和作者信息及本声明
    http://egeho123.blogbus.com/logs/13564359.html

    我将echoserver的代码贴出供大家研究测试

    首先是 ServerHandler.h

    #ifndef _SERVER_HANDER_H_
    #define _SERVER_HANDER_H_

    #pragma once


    #include "ace/Proactor.h"
    #include "ace/Asynch_IO.h"
    #include "ace/message_block.h"

    class ServerHander :public ACE_Service_Handler
    {
    public:
            ServerHander(void);
            virtual ~ServerHander(void);
            static void SetSleepTime(const DWORD t)
            {
                    m_sleepTime = t;
            };

            virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);

    protected:
            virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);

            virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);

            virtual void  handle_time_out(const ACE_Time_Value &tv, const void *p);

            void DisplayInfo(ACE_HANDLE h, char* str) const;


    private:
            ACE_Asynch_Read_Stream        m_reader;
            ACE_Asynch_Write_Stream m_writer;

            static DWORD                        m_sleepTime;
            time_t                                        m_lastTime;
    };


    #endif

    -----------------------------------------------------------------------------------------------------------------------------------
    ServerHander.cpp

    #include "StdAfx.h"
    #include "ServerHander.h"
    #include "ace/OS_NS_sys_socket.h"
    #include "ace/INET_Addr.h"
    #include "ace/SOCK_SEQPACK_Association.h"
    #include "ace/OS.h"

    #define TIME_OUT 10

    DWORD ServerHander::m_sleepTime = 0;

    ServerHander::ServerHander(void):m_lastTime(0)
    {
    }

    ServerHander::~ServerHander(void)
    {
            //关闭
            if (this->handle() != ace_invalid_handle)               
            {
                    //显示客户端连接地址和端口
                    displayinfo(this->handle(), " disconnected.");

                    ace_proactor::instance()->cancel_timer(*this,1);
                    ace_os::shutdown(this->handle(), sd_both);
                    ace_os::closesocket( this->handle() );
                    this->handle(ace_invalid_handle);
            }
    }

    //客户端连接
    void ServerHander::open(ACE_HANDLE h,
                                                    ACE_Message_Block& _mb)
    {
            this->handle(h);

            //记录时间
            m_lastTime = ACE_OS::time(NULL);

            //ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0), ACE_Time_Value(TIME_OUT));

            //构造I/O流
            if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
            {
                    cout<<"m_reader or m_writer open failed..."<<endl;
                    delete this;
                    return;
            }

            //显示客户端连接地址和端口        
            DisplayInfo(this->handle(), " connected.");

            ACE_Message_Block* mb = NULL;
            ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));        

            //发起读操作
            if( this->m_reader.read( *mb, mb->space() ) != 0 )
            {
                    cout<<"m_reader read failed..."<<endl;
                    mb->release();
                    delete this;
            }
    }

    //读操作完成
    void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
    {
            //记录时间
            m_lastTime = ACE_OS::time(NULL);

            ACE_Message_Block &mb = result.message_block();

            //传输不成功
            if ( (!result.success()) || (result.bytes_transferred() == 0) )
            {
                    cout<<"Read failed..."<<endl;

                    mb.release();
                    delete this;  
            }
            else        //接收完成
            {
                    //等待 模拟过载导致的响应速度变慢
                    Sleep( m_sleepTime );

                    //写回
                    //mb.wr_ptr(0);
                    //mb.wr_ptr()[-2] = 0x03;
                    if (this->m_writer.write( mb, mb.length() ) == -1)
                    {
                            cout<<"Server write failed..."<<endl;
                            mb.release();
                    }

                   
                    else        //写回成功,再继续读下一组数据
                    {
                            ACE_Message_Block *new_mb = NULL;
                            ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));

                            this->m_reader.read(*new_mb, new_mb->space());

                            cout<<"Read again."<<endl;
                    }
            }
    }

    //写操作完成
    void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
    {
            cout<<"Write completed."<<endl;
            //释放消息
            result.message_block().release();
    }

    //超时
    void ServerHander::handle_time_out(const ACE_Time_Value &tv,
                                                                            const void *p)
    {
            time_t curTime = ACE_OS::time(NULL);

            if( curTime - m_lastTime > TIME_OUT )
            {
                    cout<<"TimeOut"<<endl;
                    delete this;
            }
    }

    //显示信息
    void ServerHander::DisplayInfo(ACE_HANDLE h,
                                                               char* str) const
    {
            //获取客户端连接地址和端口
            ACE_INET_Addr addr;
            ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
            size_t addr_size=1;
            ass.get_remote_addr(addr);

            cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
    }

    [ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]

    顶部
    rosebush
    新手上路
    Rank: 1



    UID 32
    精华 0
    积分 0
    帖子 12
    阅读权限 10
    注册 2007-12-13
    状态 离线
    发表于 2007-12-19 11:30  资料  个人空间  短消息  加为好友 
    我将echoserver的代码贴出供大家研究测试2

    多线程的proactor部分

    proactorTask.h
    #ifndef _CPROACTOR_TASK_H_
    #define _CPROACTOR_TASK_H_


    #pragma once

    #include "ace\Task_T.h"
    #include "ace\Thread_Semaphore.h"
    #include "ace\Proactor.h"
    #include "ace\WIN32_Proactor.h"


    class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
    {
    public:
            CProactorTask(void);
            virtual ~CProactorTask(void);

            int Start(const int nMax);
            int Stop(void);
            int Create(void);
            int Release(void);
            virtual int svc(void);

    protected:
            ACE_Thread_Semaphore        m_sem;                                //信号量
            ACE_Proactor*                        m_pProactor;                //完成端口对象指针
    };


    #endif
    ---------------------------------------------------------------------------------------------------
    cpp部分

    #include "StdAfx.h"
    #include "ProactorTask.h"

    CProactorTask::CProactorTask(void)
    {
    }

    CProactorTask::~CProactorTask(void)
    {
    }


    //
    //创建完成端口对象
    //
    int CProactorTask::Create(void)
    {
            ACE_WIN32_Proactor *proactor_impl = 0;

            //新建
            ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);

            //关联
            ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);

            //保存
            ACE_Proactor::instance(this->m_pProactor, 1);

            return 0;
    }


    //
    //启动线程池
    //
    int CProactorTask::Start(const int nMax)        //线程数量
    {
            //创建完成端口对象
            Create();

            //创建线程
            this->activate(THR_NEW_LWP, nMax);

            int i;
            //保证所有线程已启动
            for(i = nMax; i>0; i--)
            {
                    m_sem.acquire();        //Block the thread until the semaphore count becomes greater than 0, then decrement it.
            }

            cout<<"Start."<<endl;
            return 0;
    }


    //
    //删除线程池
    //
    int CProactorTask::Stop(void)
    {
            ACE_Proactor::end_event_loop();
            this->wait();
            return 0;
    }


    //
    //每个线程调用
    //
    int CProactorTask::svc(void)
    {
            ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));

            //Increment the semaphore by 1
            m_sem.release(1);
            ACE_Proactor::run_event_loop();
            return 0;
    }


    //
    //释放
    //
    int CProactorTask::Release(void)
    {
            ACE_Proactor::close_singleton();
            m_pProactor = 0;

            cout<<"Release."<<endl;
            return 0;
    }
    --------------------------------------------------------------------------------------------------------

    main.cpp部分

    // EchoServer.cpp : Defines the entry point for the console application.
    //
    // 2007.12.6
    // Echo Server Proactor模式


    #include "Winbase.h"
    #include "stdafx.h"

    #include "ace/INET_Addr.h"
    #include "ace/Asynch_Acceptor.h"
    #include "ServerHander.h"
    #include "ProactorTask.h"

    int _tmain(int argc, _TCHAR* argv[])
    {
            cout<<"******* Echo Server *******"<<endl<<endl;

            //获得CPU数量
            SYSTEM_INFO sysInfo;
            GetSystemInfo(&sysInfo);
            int threadNum = sysInfo.dwNumberOfProcessors<<1;        // CPU * 2

            //开启线程
            CProactorTask task;
            task.Start( threadNum );

            ACE_Asynch_Acceptor<ServerHander> MyAcceptor;

            ACE_INET_Addr addr(5050);

            if(MyAcceptor.open(addr) == -1)
            {
                    cout<<"acceptor open failed..."<<endl;
                    return 1;
            }

            cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;

            DWORD sleepTime = 0;

            while(1)
            {
                    cin>>sleepTime;
                    ServerHander::SetSleepTime(sleepTime);
                    cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
            }

            return 0;
    }

    收藏到:Del.icio.us




    评论

  • 为什么不能执行显示出错啊