#include "tcp_server_libevent.h" extern "C" { #include "event2/bufferevent.h" #include "event2/buffer.h" #include "event2/listener.h" #include "event2/util.h" #include "event2/event.h" #include "event2/thread.h" }; void defaultConnRead(char* p, uint32_t len) { std::cout << p; return; } class ServerCallbacks { public: static void cb_listener(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* addr, int len, void* ptr); static void server_run(TcpServerLibevent* p); }; ConnectionLibevent::ConnectionLibevent(TcpServerLibevent* p, struct bufferevent* ev, uint32_t fd, struct sockaddr_in* p1) : m_parent_server(nullptr), m_event(nullptr), m_fd(-1), m_addr(nullptr), m_recv_handle(defaultConnRead) { m_parent_server = p; m_event = ev; m_fd = fd; m_addr = p1; } ConnectionLibevent::ConnectionLibevent(struct bufferevent* ev, uint32_t fd, struct sockaddr_in* p1) : m_parent_server(nullptr), m_event(nullptr), m_fd(-1), m_addr(nullptr) { m_event = ev; m_fd = fd; m_addr = p1; std::cout << "\r\n " << m_addr << " ConnectionLibevent " << inet_ntoa(m_addr->sin_addr) << std::endl; } void defaultConnAccept(ConnectionLibevent*p1) { std::cout << "defaultConnAccept " << p1->Close(); return ; } void defaultConnClose(ConnectionLibevent*p) { std::cout << "defaultConnClose close connection " << p->IpAddress() << p->SocketFd()<m_fd << " " << this->IpAddress() << std::endl; return 0; } int ConnectionLibevent::OnWrite() { return 0; } int ConnectionLibevent::WriteData(const char* p, uint16_t len) { if (nullptr == p) { return -1; } return bufferevent_write(this->m_event, p, len); } uint32_t ConnectionLibevent::SocketFd() { return m_fd; } int ConnectionLibevent::Close() { if (m_event != nullptr) { bufferevent_free(this->m_event); } return 0; } int ConnectionLibevent::SetRecvHandler(OnRecvHandle p) { if (nullptr == p) return -1; this->m_recv_handle = p; return 0; } int ConnectionLibevent::SetServer(TcpServerLibevent* p) { if (nullptr != p) { this->m_parent_server = p; return 0; } return -1; } string ConnectionLibevent::IpAddress() { std::cout<< m_addr << " IpAddress: " << inet_ntoa(m_addr->sin_addr)<<"port " << htons(m_addr->sin_port) << std::endl; return string(inet_ntoa(this->m_addr->sin_addr)); } TcpServerLibevent* ConnectionLibevent::Server() { return m_parent_server; } void read_cb(struct bufferevent* bev, void* arg) { char buf[1024] = { 0 }; ConnectionLibevent* conn = static_cast (arg); bufferevent_read(bev, buf, sizeof(buf)); cout << "client " << conn->IpAddress().c_str() << " say:" << buf << endl; conn->OnRecv(buf, sizeof(buf)); } void write_cb(struct bufferevent* bev, void* arg) { ConnectionLibevent* conn = (ConnectionLibevent*)arg; std::cout << "connection " << conn->IpAddress() << " sended data " << std::endl; } void event_cb(struct bufferevent* bev, short events, void* arg) { ConnectionLibevent* conn = (ConnectionLibevent*)(arg); TcpServerLibevent* server = conn->Server(); if (events & BEV_EVENT_EOF) { conn->OnClose(); cout << "connection closed BEV_EVENT_EOF: " << conn->IpAddress() << " " << conn->SocketFd() << endl; bufferevent_free(bev); server->RemoveConnection(conn->SocketFd()); } else if (events & BEV_EVENT_ERROR) { cout << "BEV_EVENT_ERROR !" << endl; conn->OnClose(); cout << "connection closed: " << conn->IpAddress() << " " << conn->SocketFd() << endl; bufferevent_free(bev); server->RemoveConnection(conn->SocketFd()); } //delete conn; } void ServerCallbacks::cb_listener(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* addr, int len, void* ptr) { struct sockaddr_in* client = new(struct sockaddr_in); memcpy(client, addr, sizeof(struct sockaddr)); cout << "connect new client: " << inet_ntoa(client->sin_addr) << " port: " << " ::" << ntohs(client->sin_port) << endl; TcpServerLibevent* server = (TcpServerLibevent*)ptr; if (server != nullptr) { std::cout << "null 2" << std::endl; struct bufferevent* bev = nullptr; bev = bufferevent_socket_new(server->m_event_base, fd, BEV_OPT_CLOSE_ON_FREE); std::cout << "null 4" << bev << std::endl; // 这是一个bad design, ConnectionLibevent* conn = new ConnectionLibevent(bev, ntohs(client->sin_port), client); conn->SetServer(server); server->AddConnection(ntohs(client->sin_port), conn); bufferevent_setcb(bev, read_cb, write_cb, event_cb, conn); server->m_handle_accept(conn); bufferevent_enable(bev, EV_READ | EV_WRITE); } else { std::cout << "null 1" << std::endl; } } int test_tcp_server() { #ifdef WIN32 WORD wVersionRequested; WSADATA wsaData; wVersionRequested = MAKEWORD(2, 2); (void)WSAStartup(wVersionRequested, &wsaData); #endif // init server struct sockaddr_in serv; memset(&serv, 0, sizeof(serv)); serv.sin_family = AF_INET; serv.sin_port = htons(8888); serv.sin_addr.s_addr = htonl(INADDR_ANY); struct event_base* base; base = event_base_new(); struct evconnlistener* listener; listener = evconnlistener_new_bind(base, &ServerCallbacks::cb_listener, base, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, 30000, (struct sockaddr*)&serv, sizeof(serv)); if (NULL != listener) { event_base_dispatch(base); evconnlistener_free(listener); event_base_free(base); return 0; } else { return -1; } } void ServerCallbacks::server_run(TcpServerLibevent* p) { if (nullptr != p) { if (p->m_status == TcpServerLibevent::STOP) { p->m_status = TcpServerLibevent::RUNNING; event_base_dispatch(p->m_event_base); if(!p->m_event_listener) evconnlistener_free(p->m_event_listener); event_base_free(p->m_event_base); } } } /** * @description: * @param {*} * @return {*} */ TcpServerLibevent::SERVER_STATUS TcpServerLibevent::Status() { return m_status; } int TcpServerLibevent::AddConnection(uint32_t fd, ConnectionLibevent* p) { if (m_map_client.find(fd) == m_map_client.end()) { if (nullptr != p) m_map_client[fd] = p; else return -1; } return 0; } int TcpServerLibevent::RemoveConnection(uint32_t fd) { if (m_map_client.find(fd) != m_map_client.end()) { auto pClient = m_map_client[fd]; m_map_client.erase(fd); this->m_handle_disconnect(pClient); delete pClient; return 0; } else { return -1; } } int TcpServerLibevent::ConnectionCount() { return m_map_client.size(); } int TcpServerLibevent::SetNewConnectionHandle(OnAccept p) { m_handle_accept = p; return 0; } int TcpServerLibevent::SetConnectionLeaveHandle(OnDisconnect p) { m_handle_disconnect = p; return 0; } /** * @description: * @param {int} ports * @param {string} bindip * @return {*} */ TcpServerLibevent::TcpServerLibevent(int port, string bindip) : m_thread(nullptr), m_event_base(nullptr), m_event_listener(nullptr) { m_handle_accept = defaultConnAccept; m_handle_disconnect = defaultConnClose; m_backlog = 10000; this->m_bind_ip = bindip; this->m_port = port; memset(&m_server_addr, 0, sizeof(m_server_addr)); m_server_addr.sin_family = AF_INET; m_server_addr.sin_port = htons(port); m_server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 创建 event_base m_event_base = event_base_new(); if (NULL == m_event_base) { return; } m_event_listener = evconnlistener_new_bind(m_event_base, &ServerCallbacks::cb_listener, this, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, m_backlog, (struct sockaddr*)&m_server_addr, sizeof(m_server_addr)); if (NULL == m_event_listener) { m_status = FAIL; } m_status = STOP; } /** * @description: start server synchronous * @param {*} * @return {*} */ int TcpServerLibevent::StartServerAndRunSync() { if (m_status == STOP) { m_status = RUNNING; event_base_dispatch(m_event_base); evconnlistener_free(m_event_listener); event_base_free(m_event_base); return 0; } return -1; } /** * @description: start server asynchronous * @param {*} * @return {*} */ int TcpServerLibevent::StartServerAsync() { if (m_status == STOP) { #ifdef WIN32 evthread_use_windows_threads(); #endif #ifdef linux evthread_use_pthreads(); #endif m_thread = new thread(ServerCallbacks::server_run, this); m_thread->detach(); return 0; } return -1; } TcpServerLibevent::~TcpServerLibevent() { if (this->m_status == RUNNING) { m_thread->detach(); event_base_loopbreak(m_event_base); this->m_status = STOP; } } uint64_t TcpServerLibevent::SocketFd() { return mSocketFD; }