// // Created by 29019 on 2020/4/18. // #define _WSPIAPI_H_ #define _WINSOCKAPI_ #include "tcp_client.h" #include #include #include #include using namespace std::chrono; static void conn_writecb(struct bufferevent *, void *); static void conn_readcb(struct bufferevent *, void *); static void conn_eventcb(struct bufferevent *, short, void *); void delay(int ms); int ThreadRun(TcpClientLibevent *p); void conn_writecb(struct bufferevent *bev, void *user_data) { } // 运行线程 int ThreadRun(TcpClientLibevent *p) { if (nullptr != p) { int ret = p->Dispatch(); if (0 > ret){ } while ((p->mStatus != TcpClientLibevent::STOP)) { if ((p->mStatus == TcpClientLibevent::FAIL) || (p->mStatus == TcpClientLibevent::UNCONNECTED)){ //连接失败,如果有设置自动重连就一直重连 p->ConnectServer(); #ifdef _WIN32 Sleep(100); #else //todo linux版本sleep #endif }else { std::cout << "p->Dispatch()"; ret = p->Dispatch(); } } } std::cout << "p->Dispatch() finished"; return 0; } void conn_readcb(struct bufferevent *bev, void *user_data) { TcpClientLibevent *server = (TcpClientLibevent*)user_data; struct evbuffer *input = bufferevent_get_input(bev); size_t sz = evbuffer_get_length(input); if (sz > 0) { uint8_t *msg = new uint8_t[sz + 1]; int ret = bufferevent_read(bev, msg, sz); printf("%s\n", msg); msg[sz] = '\0'; if(server->mObserver != nullptr){ server->mObserver->OnData(msg, ret); } delete[] msg; } } void conn_eventcb(struct bufferevent *bev, short events, void *user_data) { TcpClientLibevent *p; p = (TcpClientLibevent *)user_data; if (p == nullptr) { return; } if (events & BEV_EVENT_EOF) { if (nullptr != p->mObserver) p->mObserver->OnDisConnected("服务器主动断开连接"); if (p != nullptr) if (p->mStatus != TcpClientLibevent::STOP) p->mStatus = TcpClientLibevent::UNCONNECTED; printf("Connection closed\n"); } else if (events & BEV_EVENT_ERROR) { printf("Got an error on the connection: %s\n", strerror(errno)); if (nullptr != p) { if (nullptr != p->mObserver) p->mObserver->OnDisConnected("连接失败"); if(p->mStatus != TcpClientLibevent::STOP) p->mStatus = TcpClientLibevent::FAIL; } } else if (events & BEV_EVENT_CONNECTED) { p->mSocketFD = (uint64_t)event_get_fd(&(bev->ev_read)); //客户端链接成功后,给服务器发送第一条消息 std::cout << "socket fd" << p->mSocketFD; if (nullptr != p->mObserver) p->mObserver->OnConnected(); p->mStatus = TcpClientLibevent::CONNECTED; return; } bufferevent_free(bev); } void delay(int ms) { clock_t start = clock(); while (clock() - start < ms); } bool TcpClientLibevent::Connected() { return (((mStatus != UNCONNECTED)&& (mStatus != FAIL) )?true : false); } TcpClientLibevent::TcpClientLibevent(std::string addrinfo, int port, TcpClientLibevent::TcpClientObserver *p) : mStatus(UNCONNECTED), mObserver(nullptr) { memset(&mSrv, 0, sizeof(mSrv)); #ifdef linux mSrv.sin_addr.s_addr = inet_addr(addrinfo.c_str()); mSrv.sin_family = AF_INET; #endif #ifdef _WIN32 mSrv.sin_addr.S_un.S_addr = inet_addr(addrinfo.c_str()); mSrv.sin_family = AF_INET; #endif mSrv.sin_port = htons(port); mBase = event_base_new(); if (!mBase) { printf("Could not initialize libevent\n"); } #ifdef WIN32 evthread_use_windows_threads(); #else evthread_use_pthreads(); #endif this->mThread = new thread(ThreadRun, this); this->mObserver = p; mByteRecv = 0; mByteSend = 0; } int TcpClientLibevent::ConnectServer() { printf("server conecting...\r\n"); evthread_make_base_notifiable(mBase); mMux.lock(); mBev = bufferevent_socket_new(mBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); if (nullptr == mBev) { this->mStatus = TcpClientLibevent::FAIL; return - 1; } bufferevent_setcb(mBev, conn_readcb, conn_writecb, conn_eventcb, this); int flag = bufferevent_socket_connect(mBev, (struct sockaddr *)&mSrv, sizeof(mSrv)); bufferevent_enable(mBev, EV_READ | EV_WRITE); if (-1 == flag) { this->mStatus = TcpClientLibevent::FAIL; bufferevent_free(mBev); mBev = nullptr; printf("Connect failed\n"); mMux.unlock(); return -1; } this->mStatus = TcpClientLibevent::CONNECTING; mMux.unlock(); return 0; } int TcpClientLibevent::ConnectServerSync() { mMux.lock(); evthread_make_base_notifiable(mBase); if (nullptr != mBev) { delete mBev; } mBev = bufferevent_socket_new(mBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); if (nullptr == mBev) { this->mStatus = TcpClientLibevent::FAIL; mMux.unlock(); return -1; } bufferevent_setcb(mBev, conn_readcb, conn_writecb, conn_eventcb, this); int flag = bufferevent_socket_connect(mBev, (struct sockaddr*)&mSrv, sizeof(mSrv)); bufferevent_enable(mBev, EV_READ | EV_WRITE); if (-1 == flag) { this->mStatus = TcpClientLibevent::FAIL; bufferevent_free(mBev); mBev = nullptr; printf("Connect failed\n"); mMux.unlock(); return -1; } this->mStatus = TcpClientLibevent::CONNECTING; auto start = system_clock::to_time_t(system_clock::now()); while (this->mStatus != TcpClientLibevent::CONNECTED) { auto end = system_clock::to_time_t(system_clock::now()); if ((end - start) > 5) { this->mStatus = TcpClientLibevent::FAIL; break; } } mMux.unlock(); return 0; } int TcpClientLibevent::SetReconnect(bool reconn) { this->mReConnect = reconn; return 0; } int TcpClientLibevent::SetObserver(TcpClientLibevent::TcpClientObserver *ob) { this->mObserver = ob; return 0; } int TcpClientLibevent::Dispatch() { std::cout << "Dispatch\r\n"; return event_base_dispatch(mBase);; } int TcpClientLibevent::Close() { event_base_free(mBase); return 0; } int TcpClientLibevent::SendDataAsync(const char* data, int len) { return bufferevent_write(this->mBev, data, len); } uint64_t TcpClientLibevent::SocketFd() { return mSocketFD; }