IO 模型_netty reactor模型

IO 模型_netty reactor模型//IOCP2.cpp:Definestheentrypointfortheconsoleapplication.//#include”stdafx.h”#include<WinSock2.h>#include<MSWSock.h>#include<Windows.h>#include&lt…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

// IOCP2.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#include <process.h>
#pragma comment(lib, "WS2_32.lib")

#define MAX_BUFFER    256
#define MAX_TIMEOUT 1000
#define MAX_SOCKET  1024
#define MAX_THREAD    64
#define MAX_ACCEPT  5

typedef enum _OPERATION_INFO_
{
    OP_NULL,
    OP_ACCEPT,
    OP_READ,
    OP_WRITE
}OPERATIONINFO;

typedef struct _PER_HANDLE_DATA_
{
public:
    _PER_HANDLE_DATA_()
    {
        clean();
    }
    ~_PER_HANDLE_DATA_()
    {
        clean();
    }
protected:
    void clean()
    {
        sock = INVALID_SOCKET;
        memset(&addr, 0, sizeof(addr));
        addr.sin_addr.S_un.S_addr = INADDR_ANY;
        addr.sin_port = htons(0);
        addr.sin_family = AF_INET;
    }
public:    
    SOCKET sock;
    SOCKADDR_IN addr;
}PERHANDLEDATA, *PPERHANDLEDATA;

typedef struct _PER_IO_DTATA_
{
public: 
    _PER_IO_DTATA_()
    {
        clean();
    }
    ~_PER_IO_DTATA_()
    {
        clean();
    }
    void clean()
    {
        ZeroMemory(&ol, sizeof(ol));
        memset(buf, 0, sizeof(buf));
        sAccept = INVALID_SOCKET;
        sListen = INVALID_SOCKET;
        wsaBuf.buf = buf;
        wsaBuf.len = MAX_BUFFER;
        opType =  OP_NULL;
    }
public:
    WSAOVERLAPPED ol;
    SOCKET sAccept; // Only valid with AcceptEx
    SOCKET sListen; // Only valid with AcceptEx
    WSABUF wsaBuf;
    char buf[MAX_BUFFER];
    OPERATIONINFO opType;
}PERIODATA, *PPERIODATA;

HANDLE hThread[MAX_THREAD] = {
   
   0};
PERIODATA* pAcceptData[MAX_ACCEPT] = {
   
   0};
int g_nThread = 0;
BOOL g_bExitThread = FALSE;
LPFN_ACCEPTEX lpfnAcceptEx = NULL;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;

unsigned __stdcall ThreadProc(LPVOID lParam);
BOOL PostAccept(PERIODATA* pIoData);

int _tmain(int argc, _TCHAR* argv[])
{
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSADATA wsaData;
    if(0 != WSAStartup(wVersionRequested, &wsaData))
    {
        printf("WSAStartup failed with error code: %d/n", GetLastError());
        return EXIT_FAILURE;
    }

    if(2 != HIBYTE(wsaData.wVersion) || 2 != LOBYTE(wsaData.wVersion))
    {
        printf("Socket version not supported./n");
        WSACleanup();
        return EXIT_FAILURE;
    }

    // Create IOCP
    HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
    if(NULL == hIOCP)
    {
        printf("CreateIoCompletionPort failed with error code: %d/n", WSAGetLastError());
        WSACleanup();
        return EXIT_FAILURE;
    }

    // Create worker thread
    SYSTEM_INFO si = {
   
   0};
    GetSystemInfo(&si);
    for(int i = 0; i < (int)si.dwNumberOfProcessors+2; i++)
    {
        hThread[g_nThread] = (HANDLE)_beginthreadex(NULL, 0, ThreadProc, (LPVOID)hIOCP, 0, NULL);
        if(NULL == hThread[g_nThread])
        {
            printf("_beginthreadex failed with error code: %d/n", GetLastError());
            continue;
        }
        ++g_nThread;

        if(g_nThread > MAX_THREAD)
        {
            break;
        }
    }

    // Create listen SOCKET
    SOCKET sListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if(INVALID_SOCKET == sListen)
    {
        printf("WSASocket failed with error code: %d/n", WSAGetLastError());
        goto EXIT_CODE;
    }

    // Associate SOCKET with IOCP
    if(NULL == CreateIoCompletionPort((HANDLE)sListen, hIOCP, NULL, 0))
    {
        printf("CreateIoCompletionPort failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Bind SOCKET
    SOCKADDR_IN addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    addr.sin_port = htons(5050);
    if(SOCKET_ERROR == bind(sListen, (LPSOCKADDR)&addr, sizeof(addr)))
    {
        printf("bind failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Start Listen
    if(SOCKET_ERROR == listen(sListen, 200))
    {
        printf("listen failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    printf("Server start, wait for client to connect .../n");

    DWORD dwBytes = 0;
    if(SOCKET_ERROR == WSAIoctl(sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &lpfnAcceptEx,
        sizeof(lpfnAcceptEx), &dwBytes, NULL, NULL))
    {
        printf("WSAIoctl failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    if(SOCKET_ERROR == WSAIoctl(sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs, 
        sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs), 
        &dwBytes, NULL, NULL))
    {
        printf("WSAIoctl failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Post MAX_ACCEPT accept
    for(int i=0; i<MAX_ACCEPT; i++)
    {
        pAcceptData[i] = new PERIODATA;
        pAcceptData[i]->sListen = sListen;
        PostAccept(pAcceptData[i]);
    }
    // After 1 hour later, Server shutdown.
    Sleep(1000 * 60 *60);

EXIT_CODE:
    g_bExitThread = TRUE;

    PostQueuedCompletionStatus(hIOCP, 0, NULL, NULL);
    WaitForMultipleObjects(g_nThread, hThread, TRUE, INFINITE);
    for(int i = 0; i < g_nThread; i++)
    {
        CloseHandle(hThread[g_nThread]);
    }

    for(int i=0; i<MAX_ACCEPT; i++)
    {
        if(pAcceptData[i])
        {
            delete pAcceptData[i];
            pAcceptData[i] = NULL;
        }
    }

    if(INVALID_SOCKET != sListen)
    {
        closesocket(sListen);
        sListen = INVALID_SOCKET;
    }
    CloseHandle(hIOCP); // Close IOCP

    WSACleanup();
    return 0;
}

BOOL PostAccept(PERIODATA* pIoData)
{
    if(INVALID_SOCKET == pIoData->sListen)
    {
        return FALSE;
    }

    DWORD dwBytes = 0;
    pIoData->opType = OP_ACCEPT;
    pIoData->sAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if(INVALID_SOCKET == pIoData->sAccept)
    {
        printf("WSASocket failed with error code: %d/n", WSAGetLastError());
        return FALSE;
    }

    if(FALSE == lpfnAcceptEx(pIoData->sListen, pIoData->sAccept, pIoData->wsaBuf.buf, pIoData->wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2), 
        sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, &(pIoData->ol)))
    {
        if(WSA_IO_PENDING != WSAGetLastError())
        {
            printf("lpfnAcceptEx failed with error code: %d/n", WSAGetLastError());

            return FALSE;
        }
    }
    return TRUE;
}

unsigned __stdcall ThreadProc(LPVOID lParam)
{
    HANDLE hIOCP = (HANDLE)lParam;

    PERHANDLEDATA* pPerHandleData = NULL;
    PERIODATA* pPerIoData = NULL;
    WSAOVERLAPPED* lpOverlapped = NULL;
    DWORD dwTrans = 0;
    DWORD dwFlags = 0;
    while(!g_bExitThread)
    {
        BOOL bRet = GetQueuedCompletionStatus(hIOCP, &dwTrans, (PULONG_PTR)&pPerHandleData, &lpOverlapped, MAX_TIMEOUT);
        if(!bRet)
        {
            // Timeout and exit thread
            if(WAIT_TIMEOUT == GetLastError())
            {
                continue;
            }
            // Error
            printf("GetQueuedCompletionStatus failed with error: %d/n", GetLastError());
            continue;
        }
        else
        {
            pPerIoData = CONTAINING_RECORD(lpOverlapped, PERIODATA, ol);
            if(NULL == pPerIoData)
            {
                // Exit thread
                break;
            }

            if((0 == dwTrans) && (OP_READ == pPerIoData->opType || OP_WRITE == pPerIoData->opType))
            {
                // Client leave.
                printf("Client: <%s : %d> leave./n", inet_ntoa(pPerHandleData->addr.sin_addr), ntohs(pPerHandleData->addr.sin_port));
                closesocket(pPerHandleData->sock);
                delete pPerHandleData;
                delete pPerIoData;
                continue;
            }
            else
            {
                switch(pPerIoData->opType)
                {
                case OP_ACCEPT: // Accept
                    {    
                        SOCKADDR_IN* remote = NULL;
                        SOCKADDR_IN* local = NULL;
                        int remoteLen = sizeof(SOCKADDR_IN);
                        int localLen = sizeof(SOCKADDR_IN);
                        lpfnGetAcceptExSockAddrs(pPerIoData->wsaBuf.buf, pPerIoData->wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
                            sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&local, &localLen, (LPSOCKADDR*)&remote, &remoteLen);
                        printf("Client <%s : %d> come in./n", inet_ntoa(remote->sin_addr), ntohs(remote->sin_port));
                        printf("Recv Data: <%s : %d> %s./n", inet_ntoa(remote->sin_addr), ntohs(remote->sin_port), pPerIoData->wsaBuf.buf);

                        if(NULL != pPerHandleData)
                        {
                            delete pPerHandleData;
                            pPerHandleData = NULL;
                        }
                        pPerHandleData = new PERHANDLEDATA;
                        pPerHandleData->sock = pPerIoData->sAccept;

                        PERHANDLEDATA* pPerHandle = new PERHANDLEDATA;
                        pPerHandle->sock = pPerIoData->sAccept;
                        PERIODATA* pPerIo = new PERIODATA;
                        pPerIo->opType = OP_WRITE;
                        strcpy_s(pPerIo->buf, MAX_BUFFER, pPerIoData->buf);
                        DWORD dwTrans = strlen(pPerIo->buf);
                        memcpy(&(pPerHandleData->addr), remote, sizeof(SOCKADDR_IN));
                        // Associate with IOCP
                        if(NULL == CreateIoCompletionPort((HANDLE)(pPerHandleData->sock), hIOCP, (ULONG_PTR)pPerHandleData, 0))
                        {
                            printf("CreateIoCompletionPort failed with error code: %d/n", GetLastError());
                            closesocket(pPerHandleData->sock);
                            delete pPerHandleData;
                            continue;
                        }

                        // Post Accept
                        memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                        PostAccept(pPerIoData);

                        // Post Receive                        
                        DWORD dwFlags = 0;
                        if(SOCKET_ERROR == WSASend(pPerHandle->sock, &(pPerIo->wsaBuf), 1, 
                            &dwTrans, dwFlags, &(pPerIo->ol), NULL))
                        {
                            if(WSA_IO_PENDING != WSAGetLastError())
                            {
                                printf("WSASend failed with error code: %d/n", WSAGetLastError());
                                closesocket(pPerHandle->sock);
                                delete pPerHandle;
                                delete pPerIo;
                                continue;
                            }
                        }
                    }
                    break;

                case OP_READ: // Read
                    printf("recv client <%s : %d> data: %s/n", inet_ntoa(pPerHandleData->addr.sin_addr), ntohs(pPerHandleData->addr.sin_port), pPerIoData->buf);
                    pPerIoData->opType = OP_WRITE;
                    memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                    if(SOCKET_ERROR == WSASend(pPerHandleData->sock, &(pPerIoData->wsaBuf), 1, &dwTrans, dwFlags, &(pPerIoData->ol), NULL))
                    {
                        if(WSA_IO_PENDING != WSAGetLastError())
                        {
                            printf("WSASend failed with error code: %d./n", WSAGetLastError());
                            continue;
                        }
                    }
                    break;

                case OP_WRITE: // Write
                    {
                        pPerIoData->opType = OP_READ;
                        dwFlags = 0;
                        memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                        memset(pPerIoData->buf, 0, sizeof(pPerIoData->buf));
                        pPerIoData->wsaBuf.buf = pPerIoData->buf;
                        dwTrans = pPerIoData->wsaBuf.len = MAX_BUFFER;
                        if(SOCKET_ERROR == WSARecv(pPerHandleData->sock, &(pPerIoData->wsaBuf), 1, &dwTrans, &dwFlags, &(pPerIoData->ol), NULL))
                        {
                            if(WSA_IO_PENDING != WSAGetLastError())
                            {
                                printf("WSARecv failed with error code: %d./n", WSAGetLastError());
                                continue;
                            }
                        }
                    }
                    break;

                default:
                    break;
                }
            }
        }
    }
    return 0;
}

 

转载于:https://www.cnblogs.com/thbCode/p/4434848.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/188840.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • TCPIP协议

    TCPIP协议TCP/IP协议1.链路层:数据链路层或网络接口层(网络接口层和硬件层),通常包括操作系统中的设备驱动程序和计算机中对应的网络接口卡。处理与电缆(或其他任何传输媒介)的物理接口细节。转换IP层和网络接口层使用的地址。2.网络层:处理分组在网络中的活动,例如分组的选路。IP是一种网络层协议,提供的是一种不可靠的服务,它只是尽可能快地把分组从源结点送到目的结点,但是并不提供任何可靠性保证。…

    2022年6月25日
    43
  • ASP.NET Session详解[通俗易懂]

    ASP.NET Session详解[通俗易懂] (一)描述当用户在Web应用程序中导航ASP.NET页时,ASP.NET会话状态使您能够存储和检索用户的值。HTTP是一种无状态协议。这意味着Web服务器会将针对页面的每个HTTP请求作为独立的请求进行处理。服务器不会保留以前的请求过程中所使用的变量值的任何信息。ASP.NET会话状态将来自限定时间范围内的同一浏览器的请求标识为一个会话,当每个用户首次与这台W

    2022年7月15日
    12
  • 中文情感词汇本体库_数据语言

    中文情感词汇本体库_数据语言基于情感词典的情感分析应该是最简单传统的情感分析方法。

    2022年8月23日
    4
  • Java爬虫系列四:使用selenium-java爬取js异步请求的数据[通俗易懂]

    在之前的系列文章中介绍了如何使用httpclient抓取页面html以及如何用jsoup分析html源文件内容得到我们想要的数据,但是有时候通过这两种方式不能正常抓取到我们想要的数据,比如看如下例子。

    2022年2月16日
    65
  • NodeJS环境下使用axios上传文件

    NodeJS环境下使用axios上传文件最近有个需求,需要在nodejs后端上传图片到云存储服务器,刚好对axios这个库比较熟悉,因此便开始在网上查资料,但是网上大多的都是用axios在前端上传文件的代码,即是基于浏览器环境的。后来找到了基于Nodejs环境的axios上传代码,一番copy后便开始了测试,本以为会一帆风顺,没想到服务器那边却总是返回如下错误,也就是说我们的请求并没有以multipart/form-data的形式封装好…

    2022年6月16日
    106
  • html5 sexteen,Prosecutors: Berlusconi had sex with teen 13 times

    html5 sexteen,Prosecutors: Berlusconi had sex with teen 13 timesPremierSilvioBerlusconipaidforsexwithanunder-ageMoroccanteen13timesathisvillanearMilan,prosecutorssaidinadocumentfiledTuesdayseekingindictmentsagainstthreeaidesforallege…

    2022年5月24日
    36

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号