Reactor 反应堆模式

news/2024/9/28 13:34:05 标签: java, 服务器, 网络

Reactor 反应堆模式

1、概念

Reactor(反应堆)模式是一种事件驱动的设计模式,通常用于处理高并发的I/O操作,尤其是在服务器网络编程中。它基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。

1.1、核心思想

Reactor 模式的核心思想是通过一个事件分发器(reactor)来监听和管理不同的 I/O 事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。


1.2、关键组件

  • 事件分发器 (Reactor)
    负责监听各种事件源(如 socket、文件描述符)并将事件分发给相应的处理器。它通过使用 I/O 多路复用机制(如 selectpollepoll)来同时监听多个 I/O 事件。

  • 事件处理器 (Event Handler)
    针对不同类型的事件(如连接、读、写),每个事件都会有一个对应的处理器,处理器内部会定义如何响应该事件。

  • 资源(Handle)
    代表系统中的 I/O 资源,例如网络 socket。事件分发器监听这些资源上的事件,当有 I/O 事件发生时,调用相应的处理器。

  • 回调函数 (Callback)
    在 Reactor 模式中,处理事件的方式通常是通过回调机制。事件处理器定义了如何处理特定事件,当事件分发器检测到某个事件时,就会触发相应的回调函数。


1.3、工作流程

  • 注册事件
    事件分发器注册需要监听的 I/O 事件(如连接、读写),并关联相应的事件处理器。

  • 事件循环
    事件分发器进入循环,使用 I/O 多路复用机制来监听注册的 I/O 事件。

  • 事件发生
    一旦某个 I/O 事件发生,事件分发器会将该事件分发给对应的事件处理器,事件处理器执行预定义的操作。

  • 处理完毕
    事件处理器完成事件处理后,可能会重新注册事件或关闭连接。


2、Reactor 反应堆模式代码

这里我们用到了 ET 模式

  • Reactor.hpp文件:
#pragma once
#include <string>
#include <unordered_map>
#include "Connection.hpp"
#include "Epoller.hpp"

// TcpServer就是Reactor(反应堆)
// class TcpServer // 对Connection和Epoller管理就行
class Reactor
{
  const static int gnum = 64;

public:
  Reactor() : _is_running(false) {}

  void AddConnection(int sockfd, uint32_t events, func_t recver, func_t sender, func_t excepter)
  {
      // 1.构建Connection对象
      Connection *conn = new Connection(sockfd);
      conn->SetEvent(events);
      conn->Register(recver, sender, excepter);
      conn->SetSelf(this);

      // 2.向内核表示对fd的关心
      _epoller.AddEvent(conn->SockFd(), conn->Events());

      // std::cout << "sockfd : " << sockfd << " , events : " << (events & EPOLLIN) << std::endl;

      // 3.向_connections添加Connection对象
      _connections.insert(std::make_pair(conn->SockFd(), conn));
  }

  bool ConnectionIsExist(int sockfd)
  {
      auto iter = _connections.find(sockfd);

      return iter != _connections.end();
  }

  void EnableReadWrite(int sockfd, bool wr, bool rd)
  {
      uint32_t events = (wr ? EPOLLOUT : 0) | (rd ? EPOLLIN : 0) | EPOLLET;
      if (ConnectionIsExist(sockfd))
      {
          // 修改对事件的关心
          _connections[sockfd]->SetEvent(events);
          // 设置到内核
          _epoller.ModEvent(sockfd, events);
      }
  }

  void RemoveConnection(int sockfd)
  {
      if (!ConnectionIsExist(sockfd))
          return;
      // 解除对文件描述符的关心
      _epoller.DelEvent(sockfd);
      // 关闭文件描述符
      ::close(sockfd);
      // 去除该连接
      delete _connections[sockfd];
      _connections.erase(sockfd);
  }

  // 一次派发
  void LoopOnce(int timeout)
  {
      int n = _epoller.Wait(recv, gnum, timeout); // n个事件就绪
      for (int i = 0; i < n; i++)
      {
          int sockfd = recv[i].data.fd;
          uint32_t revents = recv[i].events;

          // std::cout << "sockfd : " << sockfd << " , revents : " << revents << std::endl;

          // 挂起或者出错了转为读写事件就绪
          if (revents & EPOLLHUP)
              revents |= (EPOLLIN | EPOLLOUT);
          if (revents & EPOLLERR)
              revents |= (EPOLLIN | EPOLLOUT);

          // 读事件就绪
          if (revents & EPOLLIN)
          {
              // 文件描述符得在_connections存在(比如客户端可能退出了,这个文件描述符就没有了)
              if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_recver != nullptr))
                  _connections[sockfd]->_recver(_connections[sockfd]); // 处理读事件就绪,这里_recver已经在AddConnection注册了!
          }
          // 写事件就绪
          if (revents & EPOLLOUT)
          {
              if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_sender != nullptr))
                  _connections[sockfd]->_sender(_connections[sockfd]); // 处理写事件就绪,这里_sender已经在AddConnection注册了!
          }
      }
  }

  // 只负责事件派发
  void Despatcher()
  {
      _is_running = true;
      int timeout = -1; // 阻塞等
      while (true)
      {
          LoopOnce(timeout);
          // 处理其他事情
          Debug();
      }
      _is_running = false;
  }

  void Debug()
  {
      for (auto &connection : _connections)
      {
          std::cout << "------------------------------------" << std::endl;
          std::cout << "fd : " << connection.second->SockFd() << " , ";
          uint32_t events = connection.second->Events();
          if ((events & EPOLLIN) && (events & EPOLLET))
              std::cout << "EPOLLIN | EPOLLET";
          if ((events & EPOLLIN) && (events & EPOLLET))
              std::cout << "EPOLLIN | EPOLLET";
          std::cout << std::endl;
      }
      std::cout << "------------------------------------" << std::endl;
  }
  ~Reactor() {}

private:
  std::unordered_map<int, Connection *> _connections; // 保存fd 和 对应的连接
  Epoller _epoller;

  struct epoll_event recv[gnum];
  bool _is_running;
};
  • Socket.hpp文件:
#pragma once

#include <string.h>
#include <memory>

#include "Log.hpp"
#include "Comm.hpp"

namespace socket_ns
{
  const static int gbacklog = 8;

  class Socket;
  using socket_sptr = std::shared_ptr<Socket>; // 定义智能指针,以便于后面多态

  // 使用
  // std::unique_ptr<Socket> listensocket = std::make_unique<TcpSocket>();
  // listensocket->BuildListenSocket();
  // socket_sptr retsock = listensocket->Accepter();
  // retsock->Recv();
  // retsock->Send();

  // std::unique_ptr<Socket> clientsocket = std::make_unique<TcpSocket>();
  // clientsocket->BuildClientSocket();
  // clientsocket->Send();
  // clientsocket->Recv();

  class Socket
  {
  public:
      virtual void CreateSocketOrDie() = 0;
      virtual void BindSocketOrDie(InetAddr &addr) = 0;
      virtual void ListenSocketOrDie() = 0;
      virtual int Accepter(InetAddr *addr, int *errcode) = 0;
      virtual bool Connector(InetAddr &addr) = 0;
      virtual void SetSocketAddrReuse() = 0;
      virtual int SockFd() = 0;

      virtual ssize_t Recv(std::string *out) = 0;
      virtual ssize_t Send(std::string &in) = 0;
      // virtual void Other() = 0;

  public:
      void BuildListenSocket(InetAddr &addr)
      {
          CreateSocketOrDie();
          SetSocketAddrReuse();
          BindSocketOrDie(addr);
          ListenSocketOrDie();
      }

      bool BuildClientSocket(InetAddr &addr)
      {
          CreateSocketOrDie();
          return Connector(addr);
      }
  };

  class TcpSocket : public Socket
  {
  public:
      TcpSocket(int sockfd = -1) : _socktfd(sockfd)
      {
      }

      virtual void SetSocketAddrReuse() override
      {
          int opt = 1;
          ::setsockopt(_socktfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
      }

      virtual void CreateSocketOrDie() override
      {
          // 创建
          _socktfd = socket(AF_INET, SOCK_STREAM, 0); // 这个就是文件描述符
          if (_socktfd < 0)
          {
              LOG(FATAL, "create sockfd error, error code : %d, error string : %s", errno, strerror(errno));
              exit(CREATE_ERROR);
          }
          LOG(INFO, "create sockfd success");
      }
      virtual void BindSocketOrDie(InetAddr &addr) override
      {
          struct sockaddr_in local;
          bzero(&local, sizeof(local));
          local.sin_family = AF_INET;
          local.sin_port = htons(addr.Port());
          local.sin_addr.s_addr = INADDR_ANY;
          // 绑定
          int n = ::bind(_socktfd, CONV(&local), sizeof(local));
          if (n < 0)
          {
              LOG(FATAL, "bind sockfd error, error code : %d, error string : %s", errno, strerror(errno));
              exit(BIND_ERROR);
          }
          LOG(INFO, "bind sockfd success");
      }
      virtual void ListenSocketOrDie() override
      {
          // 监听
          int ret = ::listen(_socktfd, gbacklog);
          if (ret < 0)
          {
              LOG(FATAL, "listen error, error code : %d , error string : %s", errno, strerror(errno));
              exit(LISTEN_ERROR);
          }
          LOG(INFO, "listen success!");
      }
      virtual int Accepter(InetAddr *addr, int *errcode) override
      {
          struct sockaddr_in peer;
          socklen_t len = sizeof(peer);

          // 获取新连接
          int newsockfd = accept(_socktfd, CONV(&peer), &len); // 建立连接成功,创建新文件描述符进行通信

          *errcode = errno;
          LOG(DEBUG, "errno : ", errno);

          if (newsockfd < 0)
          {
              LOG(WARNING, "accept error, error code : %d , error string : %s", errno, strerror(errno));
              return -1;
          }
          LOG(INFO, "accept success! new sockfd : %d", newsockfd);

          SetNonBlock(_socktfd); // 这里不是newsockfd,这里是对listensock进行非阻塞

          *addr = peer;
          // socket_sptr sock = std::make_shared<TcpSocket>(newsockfd); // 创建新的文件描述符,传出去以便于后面的Recv和Send
          return newsockfd;
      }

      virtual bool Connector(InetAddr &addr) override
      {
          struct sockaddr_in local;
          bzero(&local, sizeof(local));
          local.sin_family = AF_INET;
          local.sin_port = htons(addr.Port());
          local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());

          // 发起连接
          int n = ::connect(_socktfd, CONV(&local), sizeof(local));
          if (n < 0)
          {
              LOG(WARNING, "create connect error, error code : %d, error string : %s", errno, strerror(errno));
              return false;
          }
          LOG(INFO, "create connect success");
          return true;
      }

      virtual int SockFd() override
      {
          return _socktfd;
      }

      virtual ssize_t Recv(std::string *out) override
      {
          char buff[1024];
          ssize_t n = recv(_socktfd, buff, sizeof(buff) - 1, 0);
          if (n > 0)
          {
              buff[n] = 0;
              *out += buff; // 方便当数据到来不是刚好1条数据的时候,进行合并后来的数据
          }
          return n;
      }
      virtual ssize_t Send(std::string &in) override
      {
          ssize_t n = send(_socktfd, in.c_str(), in.size(), 0);
          return n;
      }

  private:
      int _socktfd; // 用同一个_socket
  };
}
  • Connection.hpp文件:
#pragma once
#include <string>
#include <functional>
#include <sys/epoll.h>
#include "InetAddr.hpp"

class Reactor;
class Connection;
using func_t = std::function<void(Connection *)>;

class Connection
{
public:
  Connection(int sockfd) : _sockfd(sockfd), _R(nullptr) {}

  void SetEvent(uint32_t events)
  {
      _events = events;
  }

  void Register(func_t recver, func_t sender, func_t excepter)
  {
      _recver = recver;
      _sender = sender;
      _excepter = excepter;
  }

  void SetSelf(Reactor *R)
  {
      _R = R;
  }

  int SockFd()
  {
      return _sockfd;
  }

  void AppendInbuff(const std::string &buff)
  {
      _inbuffer += buff;
  }

  void AppendOutbuff(const std::string &buff)
  {
      _outbuffer += buff;
  }

  std::string &Inbuffer() // 返回引用,后面Decode得字符串切割
  {
      return _inbuffer;
  }

  std::string &Outbuffer() // 返回引用,后面Decode得字符串切割
  {
      return _outbuffer;
  }

  void OutBufferRemove(int n)
  {
      _outbuffer.erase(0, n);
  }

  uint32_t Events()
  {
      return _events;
  }

  ~Connection() {}

private:
  int _sockfd;

  // 输入输出缓冲区
  std::string _inbuffer;
  std::string _outbuffer;

  // 已经准备好的事件
  uint32_t _events;

  InetAddr _clientaddr;

public:
  // 处理事件
  func_t _recver;
  func_t _sender;
  func_t _excepter;

  Reactor *_R;
};
  • Epoller.hpp文件:
#pragma once
#include <sys/epoll.h>
#include "Log.hpp"

class Epoller
{
  bool EventCore(int sockfd, uint32_t event, int type)
  {
      struct epoll_event ep_event;
      ep_event.data.fd = sockfd;
      ep_event.events = event;
      int n = ::epoll_ctl(_epfd, type, sockfd, &ep_event);
      if (n < 0)
      {
          LOG(ERROR, "epoll_ctl error");
          return false;
      }
      LOG(DEBUG, "epoll_ctl add %d fd success", sockfd);
      return true;
  }

public:
  Epoller()
  {
      _epfd = ::epoll_create(128);
      if (_epfd < 0)
      {
          LOG(FATAL, "create epfd error");
          exit(EPOLL_CREATE_ERROR);
      }
      LOG(DEBUG, "create epfd success, epfd : %d", _epfd);
  }

  bool AddEvent(int sockfd, uint32_t event)
  {
      return EventCore(sockfd, event, EPOLL_CTL_ADD);
  }

  bool ModEvent(int sockfd, uint32_t event)
  {
      return EventCore(sockfd, event, EPOLL_CTL_MOD);
  }

  bool DelEvent(int sockfd)
  {
      return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);
  }

  int Wait(struct epoll_event *recv, int num, int timeout)
  {
      int n = ::epoll_wait(_epfd, recv, num, timeout);
      return n;
  }
  ~Epoller()
  {
      if (_epfd > 0)
          ::close(_epfd);
  }

private:
  int _epfd;
};
  • IOService.hpp文件:
#pragma once
#include "Connection.hpp"
#include "Comm.hpp"

// 处理IO,recver、sender、excepter
class IOService
{
public:
  IOService(func_t func) : _func(func) {}
  void HandlerRecv(Connection *conn)
  {
      // 处理读事件
      errno = 0;
      while (true)
      {
          char buff[1024];
          ssize_t n = ::recv(conn->SockFd(), buff, sizeof(buff) - 1, 0);
          SetNonBlock(conn->SockFd()); // 这里也得非阻塞,不然会阻塞
          if (n > 0)
          {
              buff[n] = 0;
              conn->AppendInbuff(buff);
          }
          else
          {
              if (errno == EWOULDBLOCK || errno == EAGAIN)
              {
                  break;
              }
              else if (errno == EINTR)
              {
                  continue;
              }
              else
              {
                  conn->_excepter(conn); // 统一处理异常
                  return;                // 一定要提前返回
              }
          }
      }
      LOG(DEBUG, "debug");
      _func(conn);
  }

  void HandlerSend(Connection *conn)
  {
      // errno
      errno = 0;

      while (true)
      {
          ssize_t n = ::send(conn->SockFd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);
          if (n > 0)
          {
              // 发送的数据的字节数小于Outbuffer的大小
              // n即实际发了多少
              conn->OutBufferRemove(n);
              if (conn->Outbuffer().empty())
                  break; // 发完了
          }
          else if (n == 0)
          {
              break;
          }
          else
          {
              // 写到文件结尾了
              if (errno == EWOULDBLOCK || errno == EAGAIN)
              {
                  break;
              }
              else if (errno == EINTR)
              {
                  continue;
              }
              else
              {
                  conn->_excepter(conn); // 统一处理异常
                  return;                // 一定要提前返回
              }
          }
      }

      // 一定遇到了缓冲区被写满
      if (!conn->Outbuffer().empty())
      {
          // 开启对写事件关心
          conn->_R->EnableReadWrite(conn->SockFd(), true, true);
      }
      else
      {
          // 重置对写事件的不关心
          conn->_R->EnableReadWrite(conn->SockFd(), false, true);
      }
  }

  void HandlerExcept(Connection *conn)
  {
      conn->_R->RemoveConnection(conn->SockFd());
  }

private:
  func_t _func;
};
  • Listener.hpp文件:
#pragma once
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include "IOService.hpp"

using namespace socket_ns;

class Listener
{
public:
  Listener(uint16_t port, IOService &io) : _port(port), _listensock(std::make_unique<TcpSocket>()), _io(io)
  {
      InetAddr clientaddr("0", port);
      _listensock->BuildListenSocket(clientaddr);
      // LOG(DEBUG,"Listen sock : %d",_listensock->SockFd());
  }

  void Accepter(Connection *conn) // conn一定是listensock
  {
      LOG(DEBUG, "get new link , conn fd : %d", conn->SockFd());

      // 新连接到来
      while (true)
      {
          InetAddr clientaddr;
          int code = 0;

          int listensockfd = _listensock->Accepter(&clientaddr, &code); // 第二次卡住
          // listensockfd = _listensock->Accepter(&clientaddr, &code);

          if (listensockfd >= 0)
          {
              // 添加新连接
              conn->_R->AddConnection(listensockfd, EPOLLIN | EPOLLET,
                                      std::bind(&IOService::HandlerRecv, &_io, std::placeholders::_1),
                                      std::bind(&IOService::HandlerSend, &_io, std::placeholders::_1),
                                      std::bind(&IOService::HandlerExcept, &_io, std::placeholders::_1));
              // 这里就只是添加对应的处理函数(不懂跳过去可以看AddConnection函数),用不用看到时候到的是什么信号(EPOLLIN等)
              // 使用对应的函数会传conn,比如使用_recver(conn)。
          }
          else
          {
              if (code == EWOULDBLOCK || code == EAGAIN)
              {
                  // 读完了所有就绪文件描述符
                  LOG(DEBUG, "ready fd read complete!");
                  break;
              }
              else if (code == EINTR)
              {
                  LOG(DEBUG, "accpet interupt by signal ");
                  continue;
              }
              else
              {
                  LOG(WARNING, "accpet error");
                  break;
              }
          }
      }
  }

  int SockFd()
  {
      return _listensock->SockFd();
  }

  ~Listener()
  {
      ::close(_listensock->SockFd());
  }

private:
  uint16_t _port;
  std::unique_ptr<Socket> _listensock;
  IOService &_io;
};
  • Calculate.hpp文件:
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include "Protocol.hpp"

using namespace protocol_ns;

// 应用层
class Calculate
{
public:
  Calculate()
  {
  }

  std::unique_ptr<Response> Execute(const Request &req)
  {
      std::unique_ptr<Response> resptr = std::make_unique<Response>();

      switch (req._oper)
      {
      case '+':
          resptr->_result = req._x + req._y;
          resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          break;

      case '-':
          resptr->_result = req._x - req._y;
          resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          break;

      case '*':
          resptr->_result = req._x * req._y;
          resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          break;
      case '/':
      {
          if (req._y == 0)
          {
              resptr->_flag = 1;
              resptr->_equation = "除0错误";
          }
          else
          {
              resptr->_result = req._x / req._y;
              resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          }
          break;
      }

      case '%':
      {
          if (req._y == 0)
          {
              resptr->_flag = 2;
              resptr->_equation = "模0错误";
          }
          else
          {
              resptr->_result = req._x % req._y;
              resptr->_equation = std::to_string(req._x) + " " + req._oper + " " + std::to_string(req._y) + " = " + std::to_string(resptr->_result);
          }
          break;
      }
      default:
          resptr->_flag = 3;
          break;
      }
      return resptr;
  }
  ~Calculate() {}

private:
};
  • Comm.hpp文件:
#pragma once
#include <unistd.h>
#include <fcntl.h>

#include "InetAddr.hpp"


enum errorcode
{
  CREATE_ERROR = 1,
  BIND_ERROR,
  LISTEN_ERROR,
  SEND_ERROR,
  RECV_ERROR,
  CONNECT_ERROR,
  FORK_ERROR,
  USAGE_ERROR,
  EPOLL_CREATE_ERROR
};

#define CONV(ADDR) ((struct sockaddr *)ADDR)

std::string CombineIpAndPort(InetAddr addr)
{
  return "[" + addr.Ip() + ":" + std::to_string(addr.Port()) + "] ";
}


void SetNonBlock(int fd)
{
  int f1 = ::fcntl(fd, F_GETFL); // 获取标记位
  if (f1 < 0)
      return;
  ::fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
  • LockGuard.hpp文件:
# pragma once

#include <pthread.h>


class LockGuard
{
public:
   LockGuard(pthread_mutex_t *mutex) : _mutex(mutex)
   {
       pthread_mutex_lock(_mutex); // 构造加锁
   }
   ~LockGuard()
   {
       pthread_mutex_unlock(_mutex); // 析构解锁
   }

private:
   pthread_mutex_t *_mutex;
};

  • Log.hpp文件:
#pragma once

#include <string>
#include <iostream>
#include <fstream>
#include <unistd.h>
#include <stdarg.h>
#include <sys/types.h>
#include "LockGuard.hpp"

using namespace std;

bool isSave = false; // 默认向显示器打印
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define FILEPATH "./log.txt"

enum level
{
   DEBUG = 0,
   INFO,
   WARNING,
   ERROR,
   FATAL
};

void SaveToFile(const string &message)
{
   ofstream out(FILEPATH, ios_base::app);
   if (!out.is_open())
       return;
   out << message;
   out.close();
}

std::string LevelToString(int level)
{
   switch (level)
   {
   case DEBUG:
       return "Debug";
   case INFO:
       return "Info";
   case WARNING:
       return "Warning";
   case ERROR:
       return "Error";
   case FATAL:
       return "Fatal";
   default:
       return "Unknow";
   }
}

std::string GetTimeString()
{
   time_t curr_time = time(nullptr);
   struct tm *format_time = localtime(&curr_time);
   if (format_time == nullptr)
       return "None";
   char buff[1024];
   snprintf(buff, sizeof(buff), "%d-%d-%d %d:%d:%d",
            format_time->tm_year + 1900,
            format_time->tm_mon + 1,
            format_time->tm_mday,
            format_time->tm_hour,
            format_time->tm_min,
            format_time->tm_sec);
   return buff;
}

void LogMessage(const std::string filename, int line, bool issave, int level, const char *format, ...)
{
   std::string levelstr = LevelToString(level);
   std::string timestr = GetTimeString();
   pid_t pid = getpid();

   char buff[1024];
   va_list arg;
   // int vsnprintf(char *str, size_t size, const char *format, va_list ap); // 使用可变参数
   va_start(arg, format);
   vsnprintf(buff, sizeof(buff), format, arg);
   va_end(arg);

   LockGuard lock(&mutex);
   std::string message = "[" + timestr + "]" + "[" + levelstr + "]" + "[pid:" + std::to_string(pid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "] " + buff + '\n';
   if (issave == false)
       std::cout << message;
   else
       SaveToFile(message);
}

// 固定文件名和行数
#define LOG(level, format, ...)                                               \
   do                                                                        \
   {                                                                         \
       LogMessage(__FILE__, __LINE__, isSave, level, format, ##__VA_ARGS__); \
   } while (0)

#define EnableScreen()  \
   do                  \
   {                   \
       isSave = false; \
   } while (0)

#define EnableFile()   \
   do                 \
   {                  \
       isSave = true; \
   } while (0)

void Test(int num, ...)
{
   va_list arg;
   va_start(arg, num);
   while (num--)
   {
       int data = va_arg(arg, int);
       std::cout << data << " ";
   }
   std::cout << std::endl;
   va_end(arg);
}
  • Main.cc文件:
#include <iostream>
#include <string>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>

#include "Comm.hpp"
#include "Reactor.hpp"

#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"

int main(int argc, char *argv[])
{
   if (argc != 2)
   {
       std::cout << "Usage : ./reactor port" << std::endl;
       exit(USAGE_ERROR);
   }
   uint16_t serverport = std::stoi(argv[1]);

   std::unique_ptr<Reactor> svr = std::make_unique<Reactor>(); // 主服务

   IOService io(PackageParse::Parse); // 这里回调函数可以对报文进行解析

   Listener listener(serverport, io); // 负责连接模块

   // 注册进入
   // EPOLLET是添加到uint32_t events中的,不是options
   svr->AddConnection(listener.SockFd(), EPOLLIN | EPOLLET, std::bind(&Listener::Accepter, &listener, std::placeholders::_1), nullptr, nullptr);

   svr->Despatcher();

   return 0;
}
  • PackageParse.hpp文件:
#pragma once
#include "Connection.hpp"
#include "Protocol.hpp"
#include "Calculate.hpp"

using namespace protocol_ns;
class PackageParse
{
public:
   static void Parse(Connection *conn)
   {
       // 对数据进行解析
       // LOG(DEBUG, "inbuff : %s" , conn->Inbuffer().c_str());
       std::string package;
       Request req;
       Calculate cal;
       while (true)
       {
           package = Decode(conn->Inbuffer()); // 可能为空
           if (package.empty())
               break;
           cout << "after Decode recvmessage : " << conn->Inbuffer() << std::endl;

           // 完整的一条有效数据
           std::cout << "server Decode:" << package << std::endl;

           // 3.反序列化
           req.DeSerialize(package); // 把_x,_y,_oper赋值

           // 4.业务处理
           std::unique_ptr<Response> resptr = cal.Execute(req);

           // 5.序列化
           std::string sendmessage;
           resptr->Serialize(&sendmessage);
           std::cout << "server Serialize:" << sendmessage << std::endl;

           // 6.加上报头数据封装
           sendmessage = Encode(sendmessage);
           std::cout << "server Encode:" << sendmessage << std::endl;

           // // 7.发送数据
           // int n = sockfd->Send(sendmessage);

           // 把解析的数据放到Outbuffer -- Outbuffer并不是内核的输出缓冲区!
           conn->AppendOutbuff(sendmessage);
       }

       // 到这里,说明解析完成
       if (!conn->Outbuffer().empty())
       {
           conn->_sender(conn);
       }
   }
};
  • Protocol.hpp文件:
#pragma once

#include <string>
#include <jsoncpp/json/json.h>
#include <iostream>
#include <ctime>
#include <sys/types.h>
#include <string>
#include <unistd.h>

// #define SELF 1; // SELF=1就用自定义的序列化和反序列化,否则用默认的

// 表示层
namespace protocol_ns
{
   const std::string SEP = "\r\n";
   const std::string CAL_SEP = " ";

   // 对发送数据进行封装
   // "len\r\n{有效载荷}\r\n" -- 其中len是有效载荷的长度
   std::string Encode(const std::string &inbuff)
   {
       int inbuff_len = inbuff.size();
       std::string newstr = std::to_string(inbuff_len);
       newstr += SEP;
       newstr += inbuff;
       newstr += SEP;
       return newstr;
   }

   // 解析字符串
   std::string Decode(std::string &outbuff)
   {
       int pos = outbuff.find(SEP);
       if (pos == std::string::npos)
       {
           // 没找到分隔符
           return std::string(); // 返回空串,等待接收到完整数据
       }
       // 找到分隔符
       std::string len_str = outbuff.substr(0, pos);
       if (len_str.empty())
           return std::string(); // 返回空串,等待接收到完整数据
       int data_len = std::stoi(len_str);
       // 判断长度是否符合要求
       int total_len = pos + SEP.size() * 2 + data_len; // 包装好的一条数据的长度
       if (outbuff.size() < total_len)
       {
           return std::string(); // 小于包装好的一条数据的长度,返回空串,等待接收到完整数据
       }
       // 大于等于包装好的一条数据的长度
       std::string message = outbuff.substr(pos + SEP.size(), data_len); // 有效数据
       outbuff.erase(0, total_len);                                      // 数据长度减少包装好的一条数据的长度,从前面开始移除
       return message;
   }

   class Request
   {
   public:
       Request() {}
       Request(int x, int y, char oper)
           : _x(x),
             _y(y),
             _oper(oper)
       {
       }

       // 序列化 -- 转化为字符串发送
       // {"x":_x,"y":_y,"oper":_oper}
       // 这样发送可以吗?不行,不一定一次到达的数据刚好是1条,可能是半条,也可能是2条,因此我们需要对发送的数据进行封装:
       // "len\r\n{有效载荷}\r\n" -- 其中len是有效载荷的长度
       void Serialize(std::string *out) // 要带出来
       {
#ifdef SELF
           // "len\r\nx op y\r\n" -- 自定义序列化和反序列化
           std::string data_x = std::to_string(_x);
           std::string data_y = std::to_string(_y);
           *out = data_x + CAL_SEP + _oper + CAL_SEP + data_y;
#else
           Json::Value root;
           root["x"] = _x;
           root["y"] = _y;
           root["oper"] = _oper;

           Json::FastWriter writer;
           std::string str = writer.write(root);

           *out = str;
#endif
       }

       // 反序列化 -- 解析
       bool DeSerialize(const std::string &in)
       {
#ifdef SELF
           auto left_blank_pos = in.find(CAL_SEP);
           if (left_blank_pos == std::string::npos)
               return false;
           std::string x_str = in.substr(0, left_blank_pos);
           if (x_str.empty())
               return false;
           auto right_blank_pos = in.rfind(CAL_SEP);

           if (right_blank_pos == std::string::npos)
               return false;
           std::string y_str = in.substr(right_blank_pos + 1);
           if (y_str.empty())
               return false;
           if (left_blank_pos + 1 + CAL_SEP.size() != right_blank_pos)
               return false;

           _x = std::stoi(x_str);
           _y = std::stoi(y_str);
           _oper = in[right_blank_pos - 1];
           return true;

#else
           Json::Value root;
           Json::Reader reader;
           if (!reader.parse(in, root))
               return false;
           _x = root["x"].asInt();
           _y = root["y"].asInt();
           _oper = root["oper"].asInt();
           return true;
#endif
       }
       ~Request() {}

   public:
       int _x;
       int _y;
       char _oper; // +-*/% 如果不是这些操作法那就是非法的
   };

   class Response
   {
   public:
       Response() {}
       // 序列化 -- 转化为字符串发送
       void Serialize(std::string *out) // 要带出来
       {
#ifdef SELF
           // "len\r\nresult flag equation\r\n"
           std::string data_res = std::to_string(_result);
           std::string data_flag = std::to_string(_flag);
           *out = data_res + CAL_SEP + data_flag + CAL_SEP + _equation;
#else
           Json::Value root;
           root["result"] = _result;
           root["flag"] = _flag;
           root["equation"] = _equation;

           Json::FastWriter writer;
           std::string str = writer.write(root);

           *out = str;
#endif
       }

       // 反序列化 -- 解析
       bool DeSerialize(const std::string &in)
       {
#ifdef SELF
           // "result flag equation"

           auto left_blank_pos = in.find(CAL_SEP);
           if (left_blank_pos == std::string::npos)
               return false;
           std::string res_str = in.substr(0, left_blank_pos);
           if (res_str.empty())
               return false;

           auto second_blank_pos = in.find(CAL_SEP, left_blank_pos + 1);
           if (second_blank_pos == std::string::npos)
               return false;
           std::string equation = in.substr(second_blank_pos + 1);
           if (equation.empty())
               return false;

           if (left_blank_pos + 1 + CAL_SEP.size() != second_blank_pos)
               return false;
           _result = std::stoi(res_str);
           _flag = in[second_blank_pos - 1] - '0';
           _equation = equation;
           return true;
#else
           Json::Value root;
           Json::Reader reader;
           if (!reader.parse(in, root))
               return false;
           _result = root["result"].asInt();
           _flag = root["flag"].asInt();
           _equation = root["equation"].asString();
           return true;
#endif
       }
       ~Response() {}

   public:
       int _result = 0;
       int _flag = 0;                              // 0表示操作符正确,1表示除0错误,2表示取模0错误,3表示操作符错误
       std::string _equation = "操作符不符合要求"; // 等式
   };

   const std::string opers = "+-*/%&^";

   class CalFactory
   {
   public:
       CalFactory()
       {
           srand(time(nullptr) ^ getpid() ^ 2);
       }
       void Product(Request &req)
       {
           req._x = rand() & 5 + 1;
           usleep(req._x * 20);
           req._y = rand() % 10 + 5;
           // req._y = 0; // 测试
           usleep(req._x * req._y + 20);
           req._oper = opers[(rand() % opers.size())];
       }
       ~CalFactory() {}

   private:
   };
}

  • Makefile文件:
reactor:Main.cc
	g++ -o $@ $^ -std=c++14 -ljsoncpp
clean:
	rm -f reactor

整体代码


OKOK,Reactor 反应堆模式就到这里,如果你对Linux和C++也感兴趣的话,可以看看我的主页哦。下面是我的github主页,里面记录了我的学习代码和leetcode的一些题的题解,有兴趣的可以看看。

Xpccccc的github主页


http://www.niftyadmin.cn/n/5681187.html

相关文章

el-table给列加单位,表头加样式,加斑马纹

<el-table ref"table" class"dataTable" :data"detailList" :header-cell-style"tableHeaderColor" :row-class-name"tableRowClassName" highlight-current-row><el-table-column label"序号" al…

git 基本原理

文章内容来源于视频 举个案例&#xff0c;家族里面有一本记载祖传秘籍的菊花宝典&#xff0c;这本菊花宝典的正本存储在家族祠堂里面&#xff0c;每一个家庭从正本复制一本存在自己家中&#xff0c;称为副本。这个过程称为clone 一个家庭需要再菊花宝典中添加技能&#xff0c…

Django 配置邮箱服务,实现发送信息到指定邮箱

一、这里以qq邮箱为例&#xff0c;打开qq邮箱的SMTP服务 二、django项目目录设置setting.py 文件 setting.py 添加如下内容&#xff1a; # 发送邮件相关配置 EMAIL_BACKEND django.core.mail.backends.smtp.EmailBackend EMAIL_USE_TLS True EMAIL_HOST smtp.qq.com EMAIL…

Vue 技术入门 day1 模版语法、数据绑定、事件处理、计算属性与监视、class和style绑定、条件渲染v-if/v-show、列表渲染v-for

目录 1.Vue 核心 1.1. Vue 简介 1.1.1 介绍与描述 1.1.2 Vue 的特点 1.2 模板语法 1.2.1 模板的分类 1.2.2 插值语法 1.2.3 指令语法 1.2.4 实例 1.3 数据绑定 1.3.1 单向数据绑定 1.3.2 双向数据绑定 1.3.3 MVVM 模型 1.3.4 data与el的2种写法 1.3.5 实例 1.3.…

HTML基础用法介绍一

VS code 如何快速生成HTML骨架注释是什么&#xff1f;为什么要写注释&#xff1f;注释的标签是什么&#xff1f;标题标签段落标签换行标签与水平线标签 (都是单标签&#xff09;文本格式化标签图片标签超链接标签音频标签视频标签 &#x1f698;正片开始 VS code 如何快速生成…

Paddlets时间序列集成模型回测实战:MLPRegressor、NHiTSModel与RNNBlockRegressor

好的,我们继续深入理解代码的每个部分。以下是每个主要模块的详细解释: 1. 导入模块和库 import json import os import glob import pandas as pd from tqdm import tqdm from paddlets.datasets import TSDataset from paddlets.transform import StandardScaler from pa…

C语言开发基础新手快速入门及精通系列学习教程(系统性完整C语言学习笔记整理)

关注我&#xff0c;一起学编程 前言 作为一名拥有多年开发经验的码农&#xff0c;我的职业生涯涵盖了多种编程语言&#xff0c;包括 C 语言、C、C# 和 JavaScript。在这一过程中&#xff0c;我深刻地意识到扎实的基础对于编程学习的重要性&#xff0c;尤其是对于 C 语言…

0x09 瑞友 应用虚拟化系统 GetBSAppUrl SQL注入漏洞 - 复现

参考:瑞友 应用虚拟化系统 GetBSAppUrl SQL注入漏洞 | PeiQi文库 (wgpsec.org) 漏洞描述 瑞友应用虚拟化系统中的 GetBSAppUrl 方法存在 SQL注入漏洞。由于请求参数未经过滤,攻击者可以利用此漏洞执行恶意SQL查询,从而获取数据库中的敏感信息。 漏洞影响 受影响版本:瑞友…