From 37f764055625a23f058166a4d5b042ff53daecc2 Mon Sep 17 00:00:00 2001 From: zcy <290198252@qq.com> Date: Mon, 6 Dec 2021 00:43:06 +0800 Subject: [PATCH] add janus video room demp --- .../janus_gateway_win/usocket_test/main.cpp | 63 ++ .../usocket_test/peer_connection.cpp | 1 - .../usocket_test/peer_connection_wsclient.cpp | 7 +- .../janus_gateway_win/usocket_test/readme.md | 9 + .../usocket_test/usocket_test.vcxproj | 5 +- .../usocket_test/video_room_client.cpp | 824 ++++++++++++++++++ .../usocket_test/video_room_client.h | 130 +++ 7 files changed, 1034 insertions(+), 5 deletions(-) create mode 100644 client/janus_gateway_win/usocket_test/main.cpp create mode 100644 client/janus_gateway_win/usocket_test/readme.md create mode 100644 client/janus_gateway_win/usocket_test/video_room_client.cpp create mode 100644 client/janus_gateway_win/usocket_test/video_room_client.h diff --git a/client/janus_gateway_win/usocket_test/main.cpp b/client/janus_gateway_win/usocket_test/main.cpp new file mode 100644 index 0000000..56736c5 --- /dev/null +++ b/client/janus_gateway_win/usocket_test/main.cpp @@ -0,0 +1,63 @@ +// usocket_test.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 +// + +#include +#include "defaults.h" +#include "uWS.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "peer_connection_wsclient.h" +#include "rtc_base/json.h" +#include "JanusHandle.h" +#include "JanusTransaction.h" +#include +#include "video_room_client.h" + +#include "defaults.h" +#include "peer_connection.h" + + +#if defined(WEBRTC_WIN) +#include "rtc_base/win32.h" +#endif // WEBRTC_WIN + + + +int main() +{ + rtc::EnsureWinsockInit(); + rtc::Win32SocketServer w32_ss; + rtc::Win32Thread w32_thread(&w32_ss); + rtc::ThreadManager::Instance()->SetCurrentThread(&w32_thread); + + auto network_thread_ = rtc::Thread::CreateWithSocketServer(); + network_thread_->SetName("network_thread", nullptr); + network_thread_->Start(); + + auto worker_thread_ = rtc::Thread::Create(); + worker_thread_->SetName("worker_thread", nullptr); + worker_thread_->Start(); + + std::unique_ptr signaling_thread_ = rtc::Thread::Create(); + signaling_thread_->SetName("signaling_thread", nullptr); + signaling_thread_->Start(); + + VideoRoomClient*client = new VideoRoomClient(); + + client->ConectToServer("janusdemo.com", 8188); + while (true) { + Sleep(1000); + + } + +} + diff --git a/client/janus_gateway_win/usocket_test/peer_connection.cpp b/client/janus_gateway_win/usocket_test/peer_connection.cpp index a48e3f1..98b42f0 100644 --- a/client/janus_gateway_win/usocket_test/peer_connection.cpp +++ b/client/janus_gateway_win/usocket_test/peer_connection.cpp @@ -80,7 +80,6 @@ void PeerConnection::OnRemoveTrack( rtc::scoped_refptr receiver) { RTC_LOG(INFO) << __FUNCTION__ << " " << receiver->id(); m_pConductorCallback->PCQueueUIThreadCallback(TRACK_REMOVED, receiver->track().release()); - //main_wnd_->QueueUIThreadCallback(TRACK_REMOVED, receiver->track().release()); } void PeerConnection::OnIceCandidate(const webrtc::IceCandidateInterface* candidate) { diff --git a/client/janus_gateway_win/usocket_test/peer_connection_wsclient.cpp b/client/janus_gateway_win/usocket_test/peer_connection_wsclient.cpp index 53ab629..f4f3a3d 100644 --- a/client/janus_gateway_win/usocket_test/peer_connection_wsclient.cpp +++ b/client/janus_gateway_win/usocket_test/peer_connection_wsclient.cpp @@ -22,7 +22,6 @@ PeerConnectionWsClient::~PeerConnectionWsClient() { } - int PeerConnectionWsClient::id() const { return my_id_; } @@ -100,7 +99,8 @@ void PeerConnectionWsClient::Connect(const std::string& server, } }); - this->m_hub.onDisconnection([](uWS::WebSocket *ws, int code, char *message, size_t length) { + this->m_hub.onDisconnection([](uWS::WebSocket *ws, + int code, char *message, size_t length) { RTC_LOG(WARNING) << "Client got disconnected"; PeerConnectionWsClient* pws = (PeerConnectionWsClient*)(ws->getUserData()); pws->state_ = NOT_CONNECTED; @@ -109,7 +109,8 @@ void PeerConnectionWsClient::Connect(const std::string& server, std::cout << "Client got disconnected with data: " << ws->getUserData() << ", code: " << code << ", message: <" << std::string(message, length) << ">" << std::endl; }); - this->m_hub.onMessage([](uWS::WebSocket *ws, char *message, size_t length, uWS::OpCode opCode) { + this->m_hub.onMessage([](uWS::WebSocket *ws, + char *message, size_t length, uWS::OpCode opCode) { PeerConnectionWsClient* pws = (PeerConnectionWsClient*)(ws->getUserData()); if (nullptr == pws) { return; diff --git a/client/janus_gateway_win/usocket_test/readme.md b/client/janus_gateway_win/usocket_test/readme.md new file mode 100644 index 0000000..38d5077 --- /dev/null +++ b/client/janus_gateway_win/usocket_test/readme.md @@ -0,0 +1,9 @@ +### 介绍 +janus video room插件客户端实现。 + +依赖: +- uwebsocket库 +- jsoncpp +- webrtc + + diff --git a/client/janus_gateway_win/usocket_test/usocket_test.vcxproj b/client/janus_gateway_win/usocket_test/usocket_test.vcxproj index 9c257f7..c5b08ac 100644 --- a/client/janus_gateway_win/usocket_test/usocket_test.vcxproj +++ b/client/janus_gateway_win/usocket_test/usocket_test.vcxproj @@ -24,6 +24,7 @@ {7921778a-8d53-4646-917c-c32d58d691b6} usockettest 10.0 + video_room_client @@ -149,7 +150,8 @@ - + + @@ -157,6 +159,7 @@ + diff --git a/client/janus_gateway_win/usocket_test/video_room_client.cpp b/client/janus_gateway_win/usocket_test/video_room_client.cpp new file mode 100644 index 0000000..eaecc2b --- /dev/null +++ b/client/janus_gateway_win/usocket_test/video_room_client.cpp @@ -0,0 +1,824 @@ +#include "video_room_client.h" + +static std::string OptString(std::string message, list keyList) { + //parse json + Json::Reader reader; + Json::Value jmessage; + if (!reader.parse(message, jmessage)) { + RTC_LOG(WARNING) << "Received unknown message. " << message; + return std::string("");//FIXME should return by another param with type enum + } + Json::Value jvalue = jmessage; + Json::Value jvalue2; + for (auto key : keyList) { + if (rtc::GetValueFromJsonObject(jvalue, key, + &jvalue2)) { + jvalue = jvalue2; + } + else { + return std::string(""); + } + } + std::string tmp_str; + rtc::GetStringFromJson(jvalue, &tmp_str); + //std::string tmp_str = rtc::JsonValueToString(jvalue);//this result sdp parse error beacause /r/n + return tmp_str; +} +static Json::Value optJSONValue(std::string message, list keyList) { + //parse json + Json::Reader reader; + Json::Value jmessage; + if (!reader.parse(message, jmessage)) { + RTC_LOG(WARNING) << "Received unknown message. " << message; + return NULL;//FIXME should return by another param with type enum + } + Json::Value jvalue = jmessage; + Json::Value jvalue2; + for (auto key : keyList) { + if (rtc::GetValueFromJsonObject(jvalue, key, + &jvalue2)) { + jvalue = jvalue2; + } + else { + return NULL; + } + } + return jvalue; +} + + +static long long int OptLLInt(std::string message, list keyList) { + //parse json + Json::Reader reader; + Json::Value jmessage; + if (!reader.parse(message, jmessage)) { + RTC_LOG(WARNING) << "Received unknown message. " << message; + return 0;//FIXME should return by another param with type enum + } + Json::Value jvalue = jmessage; + Json::Value jvalue2; + for (auto key : keyList) { + if (rtc::GetValueFromJsonObject(jvalue, key, + &jvalue2)) { + jvalue = jvalue2; + } + else { + return 0; + } + } + std::string tmp_str = rtc::JsonValueToString(jvalue); + return std::stoll(tmp_str); +} + + + VideoRoomClient::VideoRoomClient() { + mWsClient = new PeerConnectionWsClient(); + + m_network_thread = rtc::Thread::CreateWithSocketServer(); + m_network_thread->SetName("network_thread", nullptr); + m_network_thread->Start(); + + m_worker_thread = rtc::Thread::Create(); + m_worker_thread->SetName("worker_thread", nullptr); + m_worker_thread->Start(); + + m_signaling_thread = rtc::Thread::Create(); + m_signaling_thread->SetName("signaling_thread", nullptr); + m_signaling_thread->Start(); +} + + void VideoRoomClient::SendBitrateConstraint(long long int handleId) { + std::string transactionID = RandomString(12); + std::shared_ptr jt(new JanusTransaction()); + jt->transactionId = transactionID; + jt->Success = [=](std::string message) { + + }; + + jt->Event = [=](std::string message) { + list resultList = { "plugindata","data","result" }; + std::string jsep_str = OptString(message, resultList); + if (jsep_str != "ok") { + //ûóɹ + } + }; + + jt->Error = [=](std::string, std::string) { + RTC_LOG(INFO) << "CreateHandle error:"; + }; + + + m_transactionMap[transactionID] = jt; + + Json::StyledWriter writer; + Json::Value jmessage; + Json::Value jbody; + + jbody["bitrate"] = 128000; + jbody["request"] = "configure"; + + jmessage["janus"] = "message"; + jmessage["body"] = jbody; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + //client_->SendToJanusAsync(writer.write(jmessage)); +} + + std::unique_ptr VideoRoomClient::OpenVideoCaptureDevice() { + std::vector device_names; + { + std::unique_ptr info( + webrtc::VideoCaptureFactory::CreateDeviceInfo()); + if (!info) { + return nullptr; + } + int num_devices = info->NumberOfDevices(); + for (int i = 0; i < num_devices; ++i) { + const uint32_t kSize = 256; + char name[kSize] = { 0 }; + char id[kSize] = { 0 }; + if (info->GetDeviceName(i, name, kSize, id, kSize) != -1) { + device_names.push_back(name); + } + } + } + + cricket::WebRtcVideoDeviceCapturerFactory factory; + std::unique_ptr capturer; + for (const auto& name : device_names) { + capturer = factory.Create(cricket::Device(name, 0)); + if (capturer) { + break; + } + } + return capturer; +} + + void VideoRoomClient::AddTracks(long long int handleId) { + if (!m_peer_connection_map[handleId]->peer_connection_->GetSenders().empty()) { + return; // Already added tracks. + } + + rtc::scoped_refptr audio_track( + peer_connection_factory_->CreateAudioTrack( + kAudioLabel, peer_connection_factory_->CreateAudioSource( + cricket::AudioOptions()))); + auto result_or_error = m_peer_connection_map[handleId]->peer_connection_->AddTrack(audio_track, { kStreamId }); + if (!result_or_error.ok()) { + RTC_LOG(LS_ERROR) << "Failed to add audio track to PeerConnection: " + << result_or_error.error().message(); + } + + std::unique_ptr video_device = + OpenVideoCaptureDevice(); + if (video_device) { + webrtc::FakeConstraints constraints; + std::list keyList = { webrtc::MediaConstraintsInterface::kMinWidth, webrtc::MediaConstraintsInterface::kMaxWidth, + webrtc::MediaConstraintsInterface::kMinHeight, webrtc::MediaConstraintsInterface::kMaxHeight, + webrtc::MediaConstraintsInterface::kMinFrameRate, webrtc::MediaConstraintsInterface::kMaxFrameRate, + webrtc::MediaConstraintsInterface::kMinAspectRatio, webrtc::MediaConstraintsInterface::kMaxAspectRatio }; + + //set media constraints + std::map opts; + opts[webrtc::MediaConstraintsInterface::kMaxFrameRate] = 18; + opts[webrtc::MediaConstraintsInterface::kMaxWidth] = 1280; + opts[webrtc::MediaConstraintsInterface::kMaxHeight] = 720; + + for (auto key : keyList) { + if (opts.find(key) != opts.end()) { + constraints.AddMandatory(key, opts.at(key)); + } + } + rtc::scoped_refptr video_track_( + peer_connection_factory_->CreateVideoTrack( + kVideoLabel, peer_connection_factory_->CreateVideoSource( + std::move(video_device), nullptr))); + //main_wnd_->StartLocalRenderer(video_track_); + // + //m_peer_connection_map[handleId]->StartRenderer(MainWnd_, video_track_); + + result_or_error = m_peer_connection_map[handleId]->peer_connection_->AddTrack(video_track_, { kStreamId }); + if (!result_or_error.ok()) { + RTC_LOG(LS_ERROR) << "Failed to add video track to PeerConnection: " + << result_or_error.error().message(); + } + } + else { + RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed"; + } +} + + bool VideoRoomClient::CreatePeerConnection(long long int handleId, bool dtls) { + RTC_DCHECK(peer_connection_factory_); + if (m_peer_connection_map.find(handleId) != m_peer_connection_map.end()) { + //existed + return false; + } + + webrtc::PeerConnectionInterface::RTCConfiguration config; + config.tcp_candidate_policy = webrtc::PeerConnectionInterface::TcpCandidatePolicy::kTcpCandidatePolicyDisabled; + config.bundle_policy = webrtc::PeerConnectionInterface::BundlePolicy::kBundlePolicyMaxBundle; + config.rtcp_mux_policy = webrtc::PeerConnectionInterface::RtcpMuxPolicy::kRtcpMuxPolicyRequire; + config.continual_gathering_policy = webrtc::PeerConnectionInterface::ContinualGatheringPolicy::GATHER_CONTINUALLY; + config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan; + config.enable_dtls_srtp = dtls; + //additonal setting + if (!config.prerenderer_smoothing()) { + config.set_prerenderer_smoothing(true); + } + config.disable_ipv6 = true; + config.enable_rtp_data_channel = false; + webrtc::PeerConnectionInterface::IceServer server; + server.uri = GetPeerConnectionString(); + config.servers.push_back(server); + + webrtc::PeerConnectionInterface::BitrateParameters bitrateParam; + bitrateParam.min_bitrate_bps = absl::optional(512000); + bitrateParam.current_bitrate_bps = absl::optional(1256000); + bitrateParam.max_bitrate_bps = absl::optional(1512000); + + rtc::scoped_refptr peer_connection( + new rtc::RefCountedObject()); + + peer_connection->peer_connection_ = peer_connection_factory_->CreatePeerConnection( + config, nullptr, nullptr, peer_connection); + //set max/min bitrate + peer_connection->peer_connection_->SetBitrate(bitrateParam); + //add to the map + peer_connection->RegisterObserver(this); + peer_connection->SetHandleId(handleId); + m_peer_connection_map[handleId] = peer_connection; + + return m_peer_connection_map[handleId]->peer_connection_ != nullptr; +} + + bool VideoRoomClient::InitializePeerConnection(long long int handleId, bool bPublisher) { + if (m_peer_connection_map.find(handleId) != m_peer_connection_map.end()) { + //existing + return false; + } + + if (!peer_connection_factory_) { + peer_connection_factory_ = webrtc::CreatePeerConnectionFactory( + this->m_network_thread.get() /* network_thread */, + this->m_worker_thread.get() /* worker_thread */, + this->m_signaling_thread.get() /* signaling_thread */, + nullptr /* default_adm */, + webrtc::CreateBuiltinAudioEncoderFactory(), + webrtc::CreateBuiltinAudioDecoderFactory(), + webrtc::CreateBuiltinVideoEncoderFactory(), + webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */, + nullptr /* audio_processing */); + } + + + if (!peer_connection_factory_) { + std::cout << "Failed to initialize PeerConnectionFactory" << std::endl; + DeletePeerConnection(handleId); + return false; + } + + if (!CreatePeerConnection(handleId,/*dtls=*/true)) { + std::cout << "CreatePeerConnection failed" << std::endl; + DeletePeerConnection(handleId); + } + //subscriber no need local tracks(audio and video) + if (bPublisher) { + AddTracks(handleId); + } + m_peer_connection_map[handleId]->b_publisher_ = true; + + return m_peer_connection_map[handleId]->peer_connection_ != nullptr; +} + + void VideoRoomClient::DeletePeerConnection(long long int handleId) { + m_peer_connection_map[handleId]->StopRenderer(); + m_peer_connection_map[handleId]->peer_connection_ = nullptr; + //peer_connection_factory_ = nullptr; //TODO should destroy before quit +} + + void VideoRoomClient::ConectToServer(std::string ip, int port) { + auto ws_server = std::string("ws://") + ip + std::string(":") + std::to_string(port); + mWsClient->RegisterObserver(this); + mWsClient->Connect(ws_server, "1111"); +} + + void VideoRoomClient::UIThreadCallback(int msg_id, void* data) { + switch (msg_id) { + case PEER_CONNECTION_CLOSED: + RTC_LOG(INFO) << "PEER_CONNECTION_CLOSED"; + for (auto& key : m_peer_connection_map) { + DeletePeerConnection(key.first); + } + + + case NEW_TRACK_ADDED: { + NEW_TRACK* pTrack = (NEW_TRACK*)data; + long long int handleId = pTrack->handleId; + auto* track = reinterpret_cast(pTrack->pInterface); + if (track->kind() == webrtc::MediaStreamTrackInterface::kVideoKind) { + auto* video_track = static_cast(track); + //main_wnd_->StartRemoteRenderer(video_track); + //m_peer_connection_map[handleId]->StartRenderer(MainWnd_, video_track); + } + track->Release(); + delete pTrack; + break; + } + + case TRACK_REMOVED: { + // Remote peer stopped sending a track. + auto* track = reinterpret_cast(data); + track->Release(); + break; + } + + case CREATE_OFFER: { + long long int* pHandleId = (long long int*)(data); + long long int handleId = *pHandleId; + if (InitializePeerConnection(handleId, true)) { + m_peer_connection_map[handleId]->CreateOffer(); + } + else { + std::cout << "Error" << "Failed to initialize PeerConnection"; + } + break; + } + + case SET_REMOTE_ANSWER: { + REMOTE_SDP_INFO* pInfo = (REMOTE_SDP_INFO*)(data); + long long int handleId = pInfo->handleId; + std::string jsep_str = pInfo->jsep_str; + std::unique_ptr session_description = + webrtc::CreateSessionDescription(webrtc::SdpType::kAnswer, jsep_str); + //auto* session_description = reinterpret_cast(data); + //auto* session_description = reinterpret_cast(pInfo->pInterface); + m_peer_connection_map[handleId]->SetRemoteDescription(session_description.release()); + //delete pInfo; + //TODO fixme suitable here? + SendBitrateConstraint(handleId); + break; + } + + case SET_REMOTE_OFFER: { + REMOTE_SDP_INFO* pInfo = (REMOTE_SDP_INFO*)(data); + long long int handleId = pInfo->handleId; + std::string jsep_str = pInfo->jsep_str; + std::unique_ptr session_description = + webrtc::CreateSessionDescription(webrtc::SdpType::kOffer, jsep_str); + //as subscriber + if (InitializePeerConnection(handleId, false)) { + m_peer_connection_map[handleId]->SetRemoteDescription(session_description.release()); + m_peer_connection_map[handleId]->CreateAnswer(); + + } + else { + std::cout << "Error" << "Failed to initialize PeerConnection"; + + } + //peerConnection.setRemoteDescription(sdpObserver, sdp); + //peerConnection.createAnswer(connection.sdpObserver, sdpMediaConstraints); + break; + } + + default: + RTC_NOTREACHED(); + break; + } +} + + void VideoRoomClient::SendOffer(long long int handleId, std::string sdp_type, std::string sdp_desc) { + std::string transactionID = RandomString(12); + std::shared_ptr jt(new JanusTransaction()); + jt->transactionId = transactionID; + + jt->Event = [=](std::string message) { + list resultList = { "jsep","sdp" }; + std::string jsep_str = OptString(message, resultList); + REMOTE_SDP_INFO* pInfo = new REMOTE_SDP_INFO; + pInfo->handleId = handleId; + pInfo->jsep_str = jsep_str; + UIThreadCallback(SET_REMOTE_ANSWER, pInfo); + }; + + m_transactionMap[transactionID] = jt; + + Json::StyledWriter writer; + Json::Value jmessage; + Json::Value jbody; + Json::Value jjsep; + + jbody["request"] = "configure"; + jbody["audio"] = true; + jbody["video"] = true; + + jjsep["type"] = sdp_type; + jjsep["sdp"] = sdp_desc; + + jmessage["body"] = jbody; + jmessage["jsep"] = jjsep; + jmessage["janus"] = "message"; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + //beacause the thread is on UI,so shift thread to ws thread + mWsClient->SendToJanusAsync(writer.write(jmessage)); +} + + void VideoRoomClient::SendAnswer(long long int handleId, std::string sdp_type, std::string sdp_desc) { + std::string transactionID = RandomString(12); + std::shared_ptr jt(new JanusTransaction()); + jt->transactionId = transactionID; + + jt->Event = [=](std::string message) { + + }; + + m_transactionMap[transactionID] = jt; + + Json::StyledWriter writer; + Json::Value jmessage; + Json::Value jbody; + Json::Value jjsep; + + jbody["request"] = "start"; + jbody["room"] = "1234"; + + jjsep["type"] = sdp_type; + jjsep["sdp"] = sdp_desc; + + jmessage["body"] = jbody; + jmessage["jsep"] = jjsep; + jmessage["janus"] = "message"; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + //beacause the thread is on UI,so shift thread to ws thread + mWsClient->SendToJanusAsync(writer.write(jmessage)); +} + + void VideoRoomClient::trickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate) { + std::string transactionID = RandomString(12); + Json::StyledWriter writer; + Json::Value jmessage; + Json::Value jcandidate; + + std::string sdp; + if (!candidate->ToString(&sdp)) { + RTC_LOG(LS_ERROR) << "Failed to serialize candidate"; + return; + } + + jcandidate["sdpMid"] = candidate->sdp_mid(); + jcandidate["sdpMLineIndex"] = candidate->sdp_mline_index(); + jcandidate["candidate"] = sdp; + + jmessage["janus"] = "trickle"; + jmessage["candidate"] = jcandidate; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + mWsClient->SendToJanusAsync(writer.write(jmessage)); +} + + void VideoRoomClient::trickleCandidateComplete(long long int handleId) { + std::string transactionID = RandomString(12); + Json::StyledWriter writer; + Json::Value jmessage; + Json::Value jcandidate; + + jcandidate["completed"] = true; + + jmessage["janus"] = "trickle"; + jmessage["candidate"] = jcandidate; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + mWsClient->SendToJanusAsync(writer.write(jmessage)); +} + + void VideoRoomClient::PCSendSDP(long long int handleId, std::string sdpType, std::string sdp) { + if (sdpType == "offer") { + SendOffer(handleId, sdpType, sdp); + } + else { + SendAnswer(handleId, sdpType, sdp); + } +} + + void VideoRoomClient::PCQueueUIThreadCallback(int msg_id, void* data) { + std::cout << msg_id << std::endl; + UIThreadCallback(msg_id, + data); +} + + void VideoRoomClient::PCTrickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate) { + std::cout << "PCTrickleCandidate" << std::endl; + trickleCandidate(handleId, candidate); + +} + + void VideoRoomClient::PCTrickleCandidateComplete(long long int handleId) { + std::cout << "PCTrickleCandidateComplete" << std::endl; + trickleCandidateComplete(handleId); +} + + void VideoRoomClient::OnSignedIn() { + std::cout << "siginedIn called\r\n"; +} + + void VideoRoomClient::OnDisconnected() { + std::cout << "OnDisconnected\r\n"; +} + + void VideoRoomClient::OnPeerConnected(int id, const std::string& name) { + std::cout << "peerconnect" << id << name << "\r\n"; +} + + void VideoRoomClient::OnMessageFromJanus(int peer_id, const std::string& message) { + RTC_DCHECK(!message.empty()); + RTC_LOG(INFO) << "got msg: " << message; + //TODO make sure in right state + //parse json + Json::Reader reader; + Json::Value jmessage; + if (!reader.parse(message, jmessage)) { + RTC_LOG(WARNING) << "Received unknown message. " << message; + return; + } + std::string janus_str; + std::string json_object; + + rtc::GetStringFromJsonObject(jmessage, "janus", + &janus_str); + if (!janus_str.empty()) { + if (janus_str == "ack") { + // Just an ack, we can probably ignore + RTC_LOG(INFO) << "Got an ack on session. "; + } + else if (janus_str == "success") { + rtc::GetStringFromJsonObject(jmessage, "transaction", + &janus_str); + std::shared_ptr jt = m_transactionMap.at(janus_str); + //call signal + if (jt) { + jt->Success(message);//handle_id not ready yet + } + m_transactionMap.erase(janus_str); + } + else if (janus_str == "trickle") { + RTC_LOG(INFO) << "Got a trickle candidate from Janus. "; + } + else if (janus_str == "webrtcup") { + RTC_LOG(INFO) << "The PeerConnection with the gateway is up!"; + } + else if (janus_str == "hangup") { + RTC_LOG(INFO) << "A plugin asked the core to hangup a PeerConnection on one of our handles! "; + } + else if (janus_str == "detached") { + RTC_LOG(INFO) << "A plugin asked the core to detach one of our handles! "; + } + else if (janus_str == "media") { + RTC_LOG(INFO) << "Media started/stopped flowing. "; + } + else if (janus_str == "slowlink") { + RTC_LOG(INFO) << "Got a slowlink event! "; + } + else if (janus_str == "error") { + RTC_LOG(INFO) << "Got an error. "; + // Oops, something wrong happened + rtc::GetStringFromJsonObject(jmessage, "transaction", + &janus_str); + std::shared_ptr jt = m_transactionMap.at(janus_str); + //call signal + if (jt) { + jt->Error("123", "456");//TODO need to parse the error code and desc + } + m_transactionMap.erase(janus_str); + } + else { + + if (janus_str == "event") { + RTC_LOG(INFO) << "Got a plugin event! "; + //get publishers + list str_publishers{ "plugindata" ,"data","publishers" }; + Json::Value value_publishers = optJSONValue(message, str_publishers); + std::vector PublisherVec; + + rtc::JsonArrayToValueVector(value_publishers, &PublisherVec); + //constrain the max publishers count to 5 + for (auto pub : PublisherVec) { + std::string str_feedid; + std::string display; + Json::Value jvalue; + rtc::GetValueFromJsonObject(pub, "id", &jvalue); + rtc::GetStringFromJsonObject(pub, "display", &display); + str_feedid = rtc::JsonValueToString(jvalue); + long long int feedId = std::stoll(str_feedid); + CreateHandle("janus.plugin.videoroom", feedId, display); + } + + bool bSuccess = rtc::GetStringFromJsonObject(jmessage, "transaction", + &janus_str); + if (bSuccess) { + std::shared_ptr jt = m_transactionMap.at(janus_str); + if (jt) { + jt->Event(message); + } + } + } + } + } +} + + void VideoRoomClient::OnMessageSent(int err) { + std::cout << "message sent" << err << "\r\n"; +} + + void VideoRoomClient::OnServerConnectionFailure() { + std::cout << "OnServerConnectionFailure" << "\r\n"; +} + + void VideoRoomClient::OnJanusConnected() { + std::cout << "OnJanusConnected" << "\r\n"; + // janus sessionԽ + this->CreateSession(); + + +} + + void VideoRoomClient::OnJanusDisconnected() { + std::cout << "OnJanusDisconnected" << "\r\n"; +} + + void VideoRoomClient::OnSendKeepAliveToJanus() { + std::cout << "OnSendKeepAliveToJanus" << "\r\n"; + KeepAlive(); +} + + void VideoRoomClient::KeepAlive() +{ + auto signaling_thread_ = rtc::Thread::Create(); + signaling_thread_->SetName("signaling_thread", nullptr); + signaling_thread_->Start(); + if (m_SessionId > 0) { + std::string transactionID = RandomString(12); + Json::StyledWriter writer; + Json::Value jmessage; + + jmessage["janus"] = "keepalive"; + jmessage["session_id"] = m_SessionId; + jmessage["transaction"] = transactionID; + mWsClient->SendToJanus(writer.write(jmessage)); + } +} + + void VideoRoomClient::JoinRoom(std::string pluginName, long long int handleId, long long int feedId) { + //rtcEvents.onPublisherJoined(handle.handleId); + std::string transactionID = RandomString(12); + std::shared_ptr jt(new JanusTransaction()); + jt->transactionId = transactionID; + + jt->Event = [=](std::string message) { + //get sender + list senderList = { "sender" }; + long long int sender = OptLLInt(message, senderList); + //get room + list resultList = { "plugindata","data","result" }; + list roomList = { "plugindata","data","videoroom" }; + list publisherList = { "plugindata","data","videoroom" }; + + //echotest return result=ok + std::string result = OptString(message, resultList); + if (result == "ok") { + RTC_LOG(WARNING) << "echotest negotiation ok! "; + } + + std::string videoroom = OptString(message, roomList); + //joined the room as a publisher + if (videoroom == "joined") { + UIThreadCallback(CREATE_OFFER, (void*)(&handleId)); + //for each search every publisher and create handle to attach them + + } + //joined the room as a subscriber + if (videoroom == "attached") { + //TODO make sure this sdp is offer from remote peer + list resultList = { "jsep","sdp" }; + std::string jsep_str = OptString(message, resultList); + REMOTE_SDP_INFO* pInfo = new REMOTE_SDP_INFO; + pInfo->handleId = handleId; + pInfo->jsep_str = jsep_str; + UIThreadCallback(SET_REMOTE_OFFER, pInfo); + } + }; + + m_transactionMap[transactionID] = jt; + + Json::StyledWriter writer; + Json::Value jmessage; + Json::Value jbody; + if (pluginName == "janus.plugin.videoroom") { + jbody["request"] = "join"; + jbody["room"] = 1234;//FIXME should be variable + if (feedId == 0) { + jbody["ptype"] = "publisher"; + jbody["display"] = "pcg";//FIXME should be variable + } + else { + jbody["ptype"] = "subscriber"; + jbody["feed"] = feedId; + jbody["private_id"] = 0;//FIXME should be variable + } + jmessage["body"] = jbody; + jmessage["janus"] = "message"; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + mWsClient->SendToJanus(writer.write(jmessage)); + //After joined,Then create offer + } + else if (pluginName == "janus.plugin.audiobridge") { + + } + else if (pluginName == "janus.plugin.echotest") { + jbody["audio"] = true; + jbody["video"] = true; + jmessage["body"] = jbody; + jmessage["janus"] = "message"; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + jmessage["handle_id"] = handleId; + mWsClient->SendToJanus(writer.write(jmessage)); + //shift the process to UI thread to createOffer + UIThreadCallback(CREATE_OFFER, (void*)(handleId)); + } + + +} + + void VideoRoomClient::CreateHandle(std::string pluginName, long long int feedId, std::string display) { + std::string transactionID = RandomString(12); + std::shared_ptr jt(new JanusTransaction()); + jt->transactionId = transactionID; + jt->Success = [=](std::string message) { + list handleList = { "data","id" }; + long long int handle_id = OptLLInt(message, handleList); + //add handle to the map + std::shared_ptr jh(new JanusHandle()); + jh->handleId = handle_id; + jh->display = display; + jh->feedId = feedId; + m_handleMap[handle_id] = jh; + JoinRoom(pluginName, handle_id, feedId);//TODO feedid means nothing in echotest,else? + }; + + jt->Event = [=](std::string message) { + + }; + + jt->Error = [=](std::string, std::string) { + RTC_LOG(INFO) << "CreateHandle error:"; + }; + + m_transactionMap[transactionID] = jt; + + Json::StyledWriter writer; + Json::Value jmessage; + jmessage["janus"] = "attach"; + jmessage["plugin"] = pluginName; + jmessage["transaction"] = transactionID; + jmessage["session_id"] = m_SessionId; + mWsClient->SendToJanus(writer.write(jmessage)); +} + + void VideoRoomClient::CreateSession() { + + int rev_tid1 = GetCurrentThreadId(); + std::string transactionID = RandomString(12); + std::shared_ptr jt(new JanusTransaction()); + jt->transactionId = transactionID; + + //TODO Is it possible for lamda expression here? + jt->Success = [=](std::string message) mutable { + list sessionList = { "data","id" }; + m_SessionId = OptLLInt(message, sessionList); + //lauch the timer for keep alive breakheart + //Then Create the handle + CreateHandle("janus.plugin.videoroom", 0, "pcg"); + }; + + jt->Error = [=](std::string code, std::string reason) { + RTC_LOG(INFO) << "Ooops: " << code << " " << reason; + }; + + + m_transactionMap[transactionID] = jt; + Json::StyledWriter writer; + Json::Value jmessage; + jmessage["janus"] = "create"; + jmessage["transaction"] = transactionID; + mWsClient->SendToJanus(writer.write(jmessage)); +} + diff --git a/client/janus_gateway_win/usocket_test/video_room_client.h b/client/janus_gateway_win/usocket_test/video_room_client.h new file mode 100644 index 0000000..fd55dfd --- /dev/null +++ b/client/janus_gateway_win/usocket_test/video_room_client.h @@ -0,0 +1,130 @@ +#pragma once + +#include +#include "defaults.h" +#include "uWS.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "peer_connection_wsclient.h" +#include "rtc_base/json.h" +#include "JanusHandle.h" +#include "JanusTransaction.h" +#include +#include "rtc_base/checks.h" +#include "rtc_base/json.h" +#include "rtc_base/logging.h" + +#include "api/audio_codecs/builtin_audio_decoder_factory.h" +#include "api/audio_codecs/builtin_audio_encoder_factory.h" +#include "api/video_codecs/builtin_video_decoder_factory.h" +#include "api/video_codecs/builtin_video_encoder_factory.h" +#include "api/test/fakeconstraints.h" +#include "defaults.h" +#include "media/engine/webrtcvideocapturerfactory.h" +#include "modules/audio_device/include/audio_device.h" +#include "modules/audio_processing/include/audio_processing.h" +#include "modules/video_capture/video_capture_factory.h" +#include "peer_connection.h" + + +#include "rtc_base/arraysize.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/stringutils.h" +#include "rtc_base/win32socketserver.h" +#include "rtc_base/win32socketinit.h" + +#include "third_party/libyuv/include/libyuv/convert_argb.h" + +#if defined(WEBRTC_WIN) +#include "rtc_base/win32.h" + +#endif + + + +using namespace std; +using namespace rtc; + +struct REMOTE_SDP_INFO { + long long int handleId; + std::string jsep_str; +}; + + + + +class ImplPeerConnectionWsClientObserver : + public sigslot::has_slots<>, + public PeerConnectionWsClientObserver { +public: + ImplPeerConnectionWsClientObserver() { + + }; + +}; + +class VideoRoomClient : + public PeerConnectionWsClientObserver, + public PeerConnectionCallback, + public sigslot::has_slots<> { +public: + VideoRoomClient(); + void SendBitrateConstraint(long long int handleId); + std::unique_ptr OpenVideoCaptureDevice(); + void AddTracks(long long int handleId); + bool CreatePeerConnection(long long int handleId, bool dtls); + bool InitializePeerConnection(long long int handleId, bool bPublisher); + + void DeletePeerConnection(long long int handleId); + void ConectToServer(std::string ip, int port); + void UIThreadCallback(int msg_id, void* data); + void SendOffer(long long int handleId, std::string sdp_type, std::string sdp_desc); + void SendAnswer(long long int handleId, std::string sdp_type, std::string sdp_desc); + void trickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate); + void trickleCandidateComplete(long long int handleId); + +protected: + virtual void PCSendSDP(long long int handleId, std::string sdpType, std::string sdp); + virtual void PCQueueUIThreadCallback(int msg_id, void* data); + virtual void PCTrickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate); + + virtual void PCTrickleCandidateComplete(long long int handleId); + + virtual void OnSignedIn();; + virtual void OnDisconnected(); + virtual void OnPeerConnected(int id, const std::string& name); + virtual void OnMessageFromJanus(int peer_id, const std::string& message); + virtual void OnMessageSent(int err); + virtual void OnServerConnectionFailure(); + virtual void OnJanusConnected(); + virtual void OnJanusDisconnected(); + virtual void OnSendKeepAliveToJanus(); + +private: + + void KeepAlive(); + void JoinRoom(std::string pluginName, long long int handleId, long long int feedId); + void CreateHandle(std::string pluginName, long long int feedId, std::string display); + void CreateSession();; + + PeerConnectionWsClient* mWsClient; + long long int m_SessionId = 0; + std::map> m_transactionMap; + std::map> m_handleMap; + std::map> m_peer_connection_map; + rtc::scoped_refptr peer_connection_factory_; + std::unique_ptr m_signaling_thread; + std::unique_ptr m_worker_thread; + std::unique_ptr m_network_thread; + +}; \ No newline at end of file