• ACE中使用完成端口 - [ACE]

    2008-01-09

    版权声明:转载时请以超链接形式标明文章原始出处和作者信息及本声明
    http://egeho123.blogbus.com/logs/13566792.html

    导读:
    使用ACE中的Proactor的话,会要比我们使用我们直接写的要来得简单。
    在说Proactor之前我们需要了解Windows里的完成端口的工作原理。
    完成端口是WinNT内核里的一个框架。我们可以为我们的一些异步的操作
    新建一个完成端口,然后这个完成端口会有几个工作线程来处理。我们
    可以将socket,或是一个文件读写,或是一个串口的收发数据的句柄,
    梆定到这个完成端口之上,当一个读或是写的事件完成之后,完成端口
    机制将会自动将一个完成消息放到完成队列中,完成端口的工作线程池
    将会被触发调用。回调的时候我们梆定时将一些基础的信息也梆在其中,
    当工作线程也会通过一种叫做完成项的指针返回给你。就是说,你可能
    梆定了多个socket或是文件都是没有问题的。按微软人写的文档里说的
    可以面对百万个这样的异步对象。
    这里我就不再使用WinAPI写完成端口了。
    现在是使用ACE框架来写一个。
    使用他来做一个完成端口步骤也是一样的。
    开始的时候需要一个完成端口,还有完成端口的工作线程池。在ACE框架
    里提供了一种叫ACE_Task的线程池模块类
    和一般的线程类一样,它的工作时调用的函数是
    virtual int svc (void);
    只是如何使用呢。无非是开启线程与关闭线程两个操作。
    在此类中定义一个ACE_Thread_Semaphore sem_;变量
    然后开户n个线程的过程就是这样的:
    int Proactor_Task::star(int nMax)
    {
            ...
            this->activate (THR_NEW_LWP, nMax);
            for (;nMax>0;nMax--)
            {
            sem_.acquire();
            }
            return 0;
    }
    一个是创建,二个是一个一个的触发。让这一些线程都工作.
    当然工作线程都要释放自己:
    int Proactor_Task::svc()
    {
            ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
            sem_.release(1);
            ...
            return 0;
    }
    好了。这个线程池开始工作了。接下来,我们要做将完成端口对象给创建出来:
    在这个线程池里定义一个完成端口对象指针:
    ACE_Proactor * proactor_;
    创建的过程是这样的。
    //是在Win32下,就使用这个Proactor的实现
    ACE_WIN32_Proactor *proactor_impl = new ACE_WIN32_Proactor(); //新建proactor的实现
    proactor_=new ACE_Proactor(proactor_impl,1); //与proactor关联
    ACE_Proactor::instance (this->proactor_, 1); //将新建出来的proactor保存在静态框架里
    如何删除呢。
    ACE_Proactor::end_event_loop();
    this->wait();
    之后来写线程池里的函数
            ACE_Proactor::run_event_loop();
    只要写一句就OK了。
    这就完成了一个完成端口对象的创建过程。我们只要做一下封装就OK了。
    给它一个工作线程的大小。之后它就会自动的新建一个完成端口在ACE_Proactor::instance里。
    接下来我们要做Acceptor与recv。
    实计上ACE里已经为我们写好了。它们就是:
    ACE_Asynch_Acceptor类ACE_Service_Handler类
    class Accepte : public ACE_Asynch_Acceptor
    class Receive : public ACE_Service_Handler
    这样一继承,工作就已经完成了。
    如果我们想得到这一些网络事件的话,可以做一些继承就OK了。
    他们内部调用的过程是这样的:
    当有一个新的用户连接上来之后。
    Accepte会有一个函数回调。
    virtual HANDLER *make_handler (void);
    这个函数里,我们必需写一个new Receive对象。
    new完成之后Receive的open函数将会回调
    open函数调用的时候,此接收对象的socket句柄就得到了。我们就在这个时候需要将一个
    读、写的流梆定在其中。还有就是做一步异步的接收数据。这个如果你写过重叠方式的话就会
    比较的了解,这也叫做异步的I/O的投递。等这个recv完成之后就会回调。
    好了。这就算完成了。现在把代码贴出来。
    我不知道如何做一个下载点。不好意思只有大家自己复制下来
    我使用的VC6.0,ACE5.4.1的编程环境
    // Accepte.h: interface for the Accepte class.
    //
    //////////////////////////////////////////////////////////////////////
    #if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
    #define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_
    #if _MSC_VER >1000
    #pragma once
    #endif // _MSC_VER >1000
    #include
    #include "Receive.h"
    class Accepte : public ACE_Asynch_Acceptor
    {
    public:
            Receive* make_handler (void);
            Accepte();
            virtual ~Accepte();
    };
    #endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
    // Accepte.cpp: implementation of the Accepte class.
    //
    //////////////////////////////////////////////////////////////////////
    #include "Accepte.h"
    //////////////////////////////////////////////////////////////////////
    // Construction/Destruction
    //////////////////////////////////////////////////////////////////////
    Accepte::Accepte()
    {
    }
    Accepte::~Accepte()
    {
    }
    Receive* Accepte::make_handler(void)
    {
            return new Receive();
    }
    // Proactor_Task.h: interface for the Proactor_Task class.
    //
    //////////////////////////////////////////////////////////////////////
    #if !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
    #define AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_
    #if _MSC_VER >1000
    #pragma once
    #endif // _MSC_VER >1000
    #include "ace\Task_T.h"
    #include "ace\Thread_Semaphore.h"
    #include "ace\Proactor.h"
    #include "ace\WIN32_Proactor.h"
    class Proactor_Task : public ACE_Task
    {
    public:
            Proactor_Task();
            virtual ~Proactor_Task();
            int star(int nMax);
            int stop();
            virtual int svc (void);
            int create_proactor();
            int release_proactor();
            ACE_Thread_Semaphore sem_;
            ACE_Proactor * proactor_;
    };
    #endif // !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
    // Proactor_Task.cpp: implementation of the Proactor_Task class.
    //
    //////////////////////////////////////////////////////////////////////
    #include "Proactor_Task.h"
    //////////////////////////////////////////////////////////////////////
    // Construction/Destruction
    //////////////////////////////////////////////////////////////////////
    Proactor_Task::Proactor_Task()
    {
    }
    Proactor_Task::~Proactor_Task()
    {
    }
    int Proactor_Task::star(int nMax)
    {
            create_proactor();
            this->activate (THR_NEW_LWP, nMax);
            for (;nMax>0;nMax--)
            {
            sem_.acquire();
            }
            return 0;
    }
    int Proactor_Task::stop()
    {
            ACE_Proactor::end_event_loop();
            this->wait();
            return 0;
    }
    int Proactor_Task::release_proactor()
    {
            ACE_Proactor::close_singleton ();
            proactor_ = 0;
            return 0;
    }
    int Proactor_Task::create_proactor()
    {
            ACE_WIN32_Proactor *proactor_impl = 0;
            ACE_NEW_RETURN (proactor_impl,
            ACE_WIN32_Proactor,
            -1);
            // always delete implementation 1 , not !(proactor_impl == 0)
            ACE_NEW_RETURN (this->proactor_,
            ACE_Proactor (proactor_impl, 1 ),
            -1);
            // Set new singleton and delete it in close_singleton()
            ACE_Proactor::instance (this->proactor_, 1);
            return 0;
    }
    int Proactor_Task::svc()
    {
            ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));
            sem_.release(1);
            ACE_Proactor::run_event_loop();
            return 0;
    }
    // Receive.h: interface for the Receive class.
    //
    //////////////////////////////////////////////////////////////////////
    #if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
    #define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_
    #if _MSC_VER >1000
    #pragma once
    #endif // _MSC_VER >1000
    #include
    #include
    #include
    #include
    class Receive : public ACE_Service_Handler
    {
    public:
            Receive();
            virtual ~Receive()
            {
            if (this->handle() != ACE_INVALID_HANDLE )
            {
            closesocket(SOCKET(this->handle()));
            }
            }
            virtual void open(ACE_HANDLE h,ACE_Message_Block& );
            virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
            virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
    private:
            ACE_Asynch_Write_Stream write_;
            ACE_Asynch_Read_Stream reader_;
    };
    #endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
    // Receive.cpp: implementation of the Receive class.
    //
    //////////////////////////////////////////////////////////////////////
    #include "Receive.h"
    //////////////////////////////////////////////////////////////////////
    // Construction/Destruction
    //////////////////////////////////////////////////////////////////////
    Receive::Receive()
    {
    }
    void Receive::open(ACE_HANDLE h,ACE_Message_Block& )
    {
            this->handle(h);
            if (this->write_.open(*this)!=0 ||
            this->reader_.open(*this) != 0 )
            {
            delete this;
            return ;
            }
            ACE_Message_Block *mb;
            ACE_NEW_NORETURN(mb,ACE_Message_Block(1024));
            if ( this->reader_.read(*mb,mb->space()) != 0)
            {
            ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p.")));
            mb->release();
            delete this;
            return;
            }
    }
    void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
    {
            ACE_Message_Block &mb = result.message_block();
            if ( !result.success() || result.bytes_transferred() == 0)
            {
            mb.release();
            delete this;
            }
            else
            {
            ACE_Message_Block* new_mb;
            ACE_NEW_NORETURN(new_mb,ACE_Message_Block(1024));
            this->reader_.read(*new_mb,new_mb->space());
            }
            return ;
    }
    void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
    {
            result.message_block().release();
            return ;
    }
    ////////////////////////////////////////////////////////////////////////////////////////////////////
    //main.cpp
    #ifdef _DEBUG
            #pragma comment(lib,"aced")
    #else
            #pragma comment(lib,"ace")
    #endif
    #include
    #include "Accepte.h"
    #include "Proactor_Task.h"
    int ACE_TMAIN(int ,char*[])
    {
            Proactor_Task task;
            task.star(3);
            Accepte accepte;
            accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,ACE_Proactor::instance());
            int nExit=0;
            while (nExit==0)
            scanf("%d",&nExit);
            return 0;
    }

    收藏到:Del.icio.us




    评论

  • 我在使用 proactor 时,经常会收到无效的数据,如我要预计收 1024个字节的数据,结果经常收到 0 个字节的数据,这个怎么回事呢?请大师分析下,是否和字符串的'\0' 有关系呢?那我如何避免这个问题?