IO 模型_netty reactor模型

IO 模型_netty reactor模型//IOCP2.cpp:Definestheentrypointfortheconsoleapplication.//#include”stdafx.h”#include<WinSock2.h>#include<MSWSock.h>#include<Windows.h>#include&lt…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

// IOCP2.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#include <process.h>
#pragma comment(lib, "WS2_32.lib")

#define MAX_BUFFER    256
#define MAX_TIMEOUT 1000
#define MAX_SOCKET  1024
#define MAX_THREAD    64
#define MAX_ACCEPT  5

typedef enum _OPERATION_INFO_
{
    OP_NULL,
    OP_ACCEPT,
    OP_READ,
    OP_WRITE
}OPERATIONINFO;

typedef struct _PER_HANDLE_DATA_
{
public:
    _PER_HANDLE_DATA_()
    {
        clean();
    }
    ~_PER_HANDLE_DATA_()
    {
        clean();
    }
protected:
    void clean()
    {
        sock = INVALID_SOCKET;
        memset(&addr, 0, sizeof(addr));
        addr.sin_addr.S_un.S_addr = INADDR_ANY;
        addr.sin_port = htons(0);
        addr.sin_family = AF_INET;
    }
public:    
    SOCKET sock;
    SOCKADDR_IN addr;
}PERHANDLEDATA, *PPERHANDLEDATA;

typedef struct _PER_IO_DTATA_
{
public: 
    _PER_IO_DTATA_()
    {
        clean();
    }
    ~_PER_IO_DTATA_()
    {
        clean();
    }
    void clean()
    {
        ZeroMemory(&ol, sizeof(ol));
        memset(buf, 0, sizeof(buf));
        sAccept = INVALID_SOCKET;
        sListen = INVALID_SOCKET;
        wsaBuf.buf = buf;
        wsaBuf.len = MAX_BUFFER;
        opType =  OP_NULL;
    }
public:
    WSAOVERLAPPED ol;
    SOCKET sAccept; // Only valid with AcceptEx
    SOCKET sListen; // Only valid with AcceptEx
    WSABUF wsaBuf;
    char buf[MAX_BUFFER];
    OPERATIONINFO opType;
}PERIODATA, *PPERIODATA;

HANDLE hThread[MAX_THREAD] = {
   
   0};
PERIODATA* pAcceptData[MAX_ACCEPT] = {
   
   0};
int g_nThread = 0;
BOOL g_bExitThread = FALSE;
LPFN_ACCEPTEX lpfnAcceptEx = NULL;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;

unsigned __stdcall ThreadProc(LPVOID lParam);
BOOL PostAccept(PERIODATA* pIoData);

int _tmain(int argc, _TCHAR* argv[])
{
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSADATA wsaData;
    if(0 != WSAStartup(wVersionRequested, &wsaData))
    {
        printf("WSAStartup failed with error code: %d/n", GetLastError());
        return EXIT_FAILURE;
    }

    if(2 != HIBYTE(wsaData.wVersion) || 2 != LOBYTE(wsaData.wVersion))
    {
        printf("Socket version not supported./n");
        WSACleanup();
        return EXIT_FAILURE;
    }

    // Create IOCP
    HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
    if(NULL == hIOCP)
    {
        printf("CreateIoCompletionPort failed with error code: %d/n", WSAGetLastError());
        WSACleanup();
        return EXIT_FAILURE;
    }

    // Create worker thread
    SYSTEM_INFO si = {
   
   0};
    GetSystemInfo(&si);
    for(int i = 0; i < (int)si.dwNumberOfProcessors+2; i++)
    {
        hThread[g_nThread] = (HANDLE)_beginthreadex(NULL, 0, ThreadProc, (LPVOID)hIOCP, 0, NULL);
        if(NULL == hThread[g_nThread])
        {
            printf("_beginthreadex failed with error code: %d/n", GetLastError());
            continue;
        }
        ++g_nThread;

        if(g_nThread > MAX_THREAD)
        {
            break;
        }
    }

    // Create listen SOCKET
    SOCKET sListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if(INVALID_SOCKET == sListen)
    {
        printf("WSASocket failed with error code: %d/n", WSAGetLastError());
        goto EXIT_CODE;
    }

    // Associate SOCKET with IOCP
    if(NULL == CreateIoCompletionPort((HANDLE)sListen, hIOCP, NULL, 0))
    {
        printf("CreateIoCompletionPort failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Bind SOCKET
    SOCKADDR_IN addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    addr.sin_port = htons(5050);
    if(SOCKET_ERROR == bind(sListen, (LPSOCKADDR)&addr, sizeof(addr)))
    {
        printf("bind failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Start Listen
    if(SOCKET_ERROR == listen(sListen, 200))
    {
        printf("listen failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    printf("Server start, wait for client to connect .../n");

    DWORD dwBytes = 0;
    if(SOCKET_ERROR == WSAIoctl(sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &lpfnAcceptEx,
        sizeof(lpfnAcceptEx), &dwBytes, NULL, NULL))
    {
        printf("WSAIoctl failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    if(SOCKET_ERROR == WSAIoctl(sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs, 
        sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs), 
        &dwBytes, NULL, NULL))
    {
        printf("WSAIoctl failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Post MAX_ACCEPT accept
    for(int i=0; i<MAX_ACCEPT; i++)
    {
        pAcceptData[i] = new PERIODATA;
        pAcceptData[i]->sListen = sListen;
        PostAccept(pAcceptData[i]);
    }
    // After 1 hour later, Server shutdown.
    Sleep(1000 * 60 *60);

EXIT_CODE:
    g_bExitThread = TRUE;

    PostQueuedCompletionStatus(hIOCP, 0, NULL, NULL);
    WaitForMultipleObjects(g_nThread, hThread, TRUE, INFINITE);
    for(int i = 0; i < g_nThread; i++)
    {
        CloseHandle(hThread[g_nThread]);
    }

    for(int i=0; i<MAX_ACCEPT; i++)
    {
        if(pAcceptData[i])
        {
            delete pAcceptData[i];
            pAcceptData[i] = NULL;
        }
    }

    if(INVALID_SOCKET != sListen)
    {
        closesocket(sListen);
        sListen = INVALID_SOCKET;
    }
    CloseHandle(hIOCP); // Close IOCP

    WSACleanup();
    return 0;
}

BOOL PostAccept(PERIODATA* pIoData)
{
    if(INVALID_SOCKET == pIoData->sListen)
    {
        return FALSE;
    }

    DWORD dwBytes = 0;
    pIoData->opType = OP_ACCEPT;
    pIoData->sAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if(INVALID_SOCKET == pIoData->sAccept)
    {
        printf("WSASocket failed with error code: %d/n", WSAGetLastError());
        return FALSE;
    }

    if(FALSE == lpfnAcceptEx(pIoData->sListen, pIoData->sAccept, pIoData->wsaBuf.buf, pIoData->wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2), 
        sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, &(pIoData->ol)))
    {
        if(WSA_IO_PENDING != WSAGetLastError())
        {
            printf("lpfnAcceptEx failed with error code: %d/n", WSAGetLastError());

            return FALSE;
        }
    }
    return TRUE;
}

unsigned __stdcall ThreadProc(LPVOID lParam)
{
    HANDLE hIOCP = (HANDLE)lParam;

    PERHANDLEDATA* pPerHandleData = NULL;
    PERIODATA* pPerIoData = NULL;
    WSAOVERLAPPED* lpOverlapped = NULL;
    DWORD dwTrans = 0;
    DWORD dwFlags = 0;
    while(!g_bExitThread)
    {
        BOOL bRet = GetQueuedCompletionStatus(hIOCP, &dwTrans, (PULONG_PTR)&pPerHandleData, &lpOverlapped, MAX_TIMEOUT);
        if(!bRet)
        {
            // Timeout and exit thread
            if(WAIT_TIMEOUT == GetLastError())
            {
                continue;
            }
            // Error
            printf("GetQueuedCompletionStatus failed with error: %d/n", GetLastError());
            continue;
        }
        else
        {
            pPerIoData = CONTAINING_RECORD(lpOverlapped, PERIODATA, ol);
            if(NULL == pPerIoData)
            {
                // Exit thread
                break;
            }

            if((0 == dwTrans) && (OP_READ == pPerIoData->opType || OP_WRITE == pPerIoData->opType))
            {
                // Client leave.
                printf("Client: <%s : %d> leave./n", inet_ntoa(pPerHandleData->addr.sin_addr), ntohs(pPerHandleData->addr.sin_port));
                closesocket(pPerHandleData->sock);
                delete pPerHandleData;
                delete pPerIoData;
                continue;
            }
            else
            {
                switch(pPerIoData->opType)
                {
                case OP_ACCEPT: // Accept
                    {    
                        SOCKADDR_IN* remote = NULL;
                        SOCKADDR_IN* local = NULL;
                        int remoteLen = sizeof(SOCKADDR_IN);
                        int localLen = sizeof(SOCKADDR_IN);
                        lpfnGetAcceptExSockAddrs(pPerIoData->wsaBuf.buf, pPerIoData->wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
                            sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&local, &localLen, (LPSOCKADDR*)&remote, &remoteLen);
                        printf("Client <%s : %d> come in./n", inet_ntoa(remote->sin_addr), ntohs(remote->sin_port));
                        printf("Recv Data: <%s : %d> %s./n", inet_ntoa(remote->sin_addr), ntohs(remote->sin_port), pPerIoData->wsaBuf.buf);

                        if(NULL != pPerHandleData)
                        {
                            delete pPerHandleData;
                            pPerHandleData = NULL;
                        }
                        pPerHandleData = new PERHANDLEDATA;
                        pPerHandleData->sock = pPerIoData->sAccept;

                        PERHANDLEDATA* pPerHandle = new PERHANDLEDATA;
                        pPerHandle->sock = pPerIoData->sAccept;
                        PERIODATA* pPerIo = new PERIODATA;
                        pPerIo->opType = OP_WRITE;
                        strcpy_s(pPerIo->buf, MAX_BUFFER, pPerIoData->buf);
                        DWORD dwTrans = strlen(pPerIo->buf);
                        memcpy(&(pPerHandleData->addr), remote, sizeof(SOCKADDR_IN));
                        // Associate with IOCP
                        if(NULL == CreateIoCompletionPort((HANDLE)(pPerHandleData->sock), hIOCP, (ULONG_PTR)pPerHandleData, 0))
                        {
                            printf("CreateIoCompletionPort failed with error code: %d/n", GetLastError());
                            closesocket(pPerHandleData->sock);
                            delete pPerHandleData;
                            continue;
                        }

                        // Post Accept
                        memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                        PostAccept(pPerIoData);

                        // Post Receive                        
                        DWORD dwFlags = 0;
                        if(SOCKET_ERROR == WSASend(pPerHandle->sock, &(pPerIo->wsaBuf), 1, 
                            &dwTrans, dwFlags, &(pPerIo->ol), NULL))
                        {
                            if(WSA_IO_PENDING != WSAGetLastError())
                            {
                                printf("WSASend failed with error code: %d/n", WSAGetLastError());
                                closesocket(pPerHandle->sock);
                                delete pPerHandle;
                                delete pPerIo;
                                continue;
                            }
                        }
                    }
                    break;

                case OP_READ: // Read
                    printf("recv client <%s : %d> data: %s/n", inet_ntoa(pPerHandleData->addr.sin_addr), ntohs(pPerHandleData->addr.sin_port), pPerIoData->buf);
                    pPerIoData->opType = OP_WRITE;
                    memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                    if(SOCKET_ERROR == WSASend(pPerHandleData->sock, &(pPerIoData->wsaBuf), 1, &dwTrans, dwFlags, &(pPerIoData->ol), NULL))
                    {
                        if(WSA_IO_PENDING != WSAGetLastError())
                        {
                            printf("WSASend failed with error code: %d./n", WSAGetLastError());
                            continue;
                        }
                    }
                    break;

                case OP_WRITE: // Write
                    {
                        pPerIoData->opType = OP_READ;
                        dwFlags = 0;
                        memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                        memset(pPerIoData->buf, 0, sizeof(pPerIoData->buf));
                        pPerIoData->wsaBuf.buf = pPerIoData->buf;
                        dwTrans = pPerIoData->wsaBuf.len = MAX_BUFFER;
                        if(SOCKET_ERROR == WSARecv(pPerHandleData->sock, &(pPerIoData->wsaBuf), 1, &dwTrans, &dwFlags, &(pPerIoData->ol), NULL))
                        {
                            if(WSA_IO_PENDING != WSAGetLastError())
                            {
                                printf("WSARecv failed with error code: %d./n", WSAGetLastError());
                                continue;
                            }
                        }
                    }
                    break;

                default:
                    break;
                }
            }
        }
    }
    return 0;
}

 

转载于:https://www.cnblogs.com/thbCode/p/4434848.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/188840.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Quartus II 使用详解

    Quartus II 使用详解今天早上做了《计算机组成原理》课的第一次实验。在这介绍一下QuartusⅡ如何使用,希望能帮到有需要的人。1、新建工程项目。2、填写项目存储路径和工程名,不要出现中文路径。3、添加已存在文件(可选),在【Filename】下选择已经存在的工程项目,利用【Add】或【Addall】命令添加文件到新工程,点击【Next】4、选择设备系列,并在【de…

    2022年10月16日
    4
  • 每天一道算法_3_487-3279_对电话号码格式化统计批处理

    早上弄了一道求高精度幂的算法,偷懒用了内部类,总觉得过意不去,所以今天重新做了一道算法题,做完心里舒服好多。题目如下: Description企业喜欢用容易被记住的电话号码。让电话号码容易被记住的一个办法是将它写成一个容易记住的单词或者短语。例如,你需要给滑铁卢大学打电话时,可以拨打TUT-GLOP。有时,只将电话号码中部分数字拼写成单词。当你晚上回到酒店,可以通过拨打310-GI

    2022年3月10日
    357
  • Azkaban简介

    Azkaban简介一 Azkaban 概论 1 1Azkaban 是什么 1 Azkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器 2 主要用于在一个工作流内以一个特定的顺序运行一组工作和流程 它的配置是通过简单的 key value 对的方式 3 通过配置中的 dependencies 来设置依赖关系 这个依赖关系必须是无环的 否则会被视为无效的工作流 4 Azkaban 使用 job 配置文件建立任务之间的依赖关系 并提供一个易于使用的 web 用户界面维护和跟踪你的工作流 1 2 为什么需要工作流调度

    2025年7月24日
    2
  • kafuka生产者和消费者及配置

    kafuka生产者和消费者及配置#kafka生产者配置#kafka集群kafka.bootstrap.servers=ip:端口#发送端确认模式kafka.acks=all#发送失败重试次数kafka.retries=10#批处理条数kafka.batch.size=16384#延迟统一收集,产生聚合,然后批量发送kafka.linger.ms=100#批处理缓冲区kafka.buffer.memo…

    2022年6月3日
    50
  • 现有的人脸数据库介绍及下载链接

    现有的人脸数据库介绍及下载链接

    2021年9月15日
    43
  • 离散数学总复习精华版(最全 最简单易懂)已完结

    离散数学总复习精华版(最全 最简单易懂)已完结离散数学期末总复习精华版P1命题逻辑的基本概念虽然是不确定但是可以是命题就是无法判断真假优先级P2命题逻辑等值演算第一种方法:真值表求第二种用等值演算求P3命题逻辑推理理论下面给出例题后面的可以写成前提引入T12下面给出反证法附加前提证明:…………

    2022年6月18日
    22

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号