add janus video room demp

master
zcy 2021-12-06 00:43:06 +08:00
parent 40e97d314b
commit 37f7640556
7 changed files with 1034 additions and 5 deletions

View File

@ -0,0 +1,63 @@
// usocket_test.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <iostream>
#include "defaults.h"
#include "uWS.h"
#include <iostream>
#include <chrono>
#include <cmath>
#include <thread>
#include <fstream>
#include <vector>
#include <set>
#include <unordered_set>
#include <unordered_map>
#include <map>
#include <atomic>
#include "peer_connection_wsclient.h"
#include "rtc_base/json.h"
#include "JanusHandle.h"
#include "JanusTransaction.h"
#include <list>
#include "video_room_client.h"
#include "defaults.h"
#include "peer_connection.h"
#if defined(WEBRTC_WIN)
#include "rtc_base/win32.h"
#endif // WEBRTC_WIN
int main()
{
rtc::EnsureWinsockInit();
rtc::Win32SocketServer w32_ss;
rtc::Win32Thread w32_thread(&w32_ss);
rtc::ThreadManager::Instance()->SetCurrentThread(&w32_thread);
auto network_thread_ = rtc::Thread::CreateWithSocketServer();
network_thread_->SetName("network_thread", nullptr);
network_thread_->Start();
auto worker_thread_ = rtc::Thread::Create();
worker_thread_->SetName("worker_thread", nullptr);
worker_thread_->Start();
std::unique_ptr<Thread> signaling_thread_ = rtc::Thread::Create();
signaling_thread_->SetName("signaling_thread", nullptr);
signaling_thread_->Start();
VideoRoomClient*client = new VideoRoomClient();
client->ConectToServer("janusdemo.com", 8188);
while (true) {
Sleep(1000);
}
}

View File

@ -80,7 +80,6 @@ void PeerConnection::OnRemoveTrack(
rtc::scoped_refptr<webrtc::RtpReceiverInterface> receiver) {
RTC_LOG(INFO) << __FUNCTION__ << " " << receiver->id();
m_pConductorCallback->PCQueueUIThreadCallback(TRACK_REMOVED, receiver->track().release());
//main_wnd_->QueueUIThreadCallback(TRACK_REMOVED, receiver->track().release());
}
void PeerConnection::OnIceCandidate(const webrtc::IceCandidateInterface* candidate) {

View File

@ -22,7 +22,6 @@ PeerConnectionWsClient::~PeerConnectionWsClient() {
}
int PeerConnectionWsClient::id() const {
return my_id_;
}
@ -100,7 +99,8 @@ void PeerConnectionWsClient::Connect(const std::string& server,
}
});
this->m_hub.onDisconnection([](uWS::WebSocket<uWS::CLIENT> *ws, int code, char *message, size_t length) {
this->m_hub.onDisconnection([](uWS::WebSocket<uWS::CLIENT> *ws,
int code, char *message, size_t length) {
RTC_LOG(WARNING) << "Client got disconnected";
PeerConnectionWsClient* pws = (PeerConnectionWsClient*)(ws->getUserData());
pws->state_ = NOT_CONNECTED;
@ -109,7 +109,8 @@ void PeerConnectionWsClient::Connect(const std::string& server,
std::cout << "Client got disconnected with data: " << ws->getUserData() << ", code: " << code << ", message: <" << std::string(message, length) << ">" << std::endl;
});
this->m_hub.onMessage([](uWS::WebSocket<uWS::CLIENT> *ws, char *message, size_t length, uWS::OpCode opCode) {
this->m_hub.onMessage([](uWS::WebSocket<uWS::CLIENT> *ws,
char *message, size_t length, uWS::OpCode opCode) {
PeerConnectionWsClient* pws = (PeerConnectionWsClient*)(ws->getUserData());
if (nullptr == pws) {
return;

View File

@ -0,0 +1,9 @@
### 介绍
janus video room插件客户端实现。
依赖:
- uwebsocket库
- jsoncpp
- webrtc

View File

@ -24,6 +24,7 @@
<ProjectGuid>{7921778a-8d53-4646-917c-c32d58d691b6}</ProjectGuid>
<RootNamespace>usockettest</RootNamespace>
<WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion>
<ProjectName>video_room_client</ProjectName>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
@ -149,7 +150,8 @@
<ClCompile Include="..\janus_win\JanusTransaction.cpp" />
<ClCompile Include="peer_connection.cpp" />
<ClCompile Include="peer_connection_wsclient.cpp" />
<ClCompile Include="usocket_test.cpp" />
<ClCompile Include="main.cpp" />
<ClCompile Include="video_room_client.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\janus_win\conductor_ws.h" />
@ -157,6 +159,7 @@
<ClInclude Include="..\janus_win\JanusTransaction.h" />
<ClInclude Include="..\janus_win\peer_connection.h" />
<ClInclude Include="..\janus_win\peer_connection_wsclient.h" />
<ClInclude Include="video_room_client.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">

View File

@ -0,0 +1,824 @@
#include "video_room_client.h"
static std::string OptString(std::string message, list<string> keyList) {
//parse json
Json::Reader reader;
Json::Value jmessage;
if (!reader.parse(message, jmessage)) {
RTC_LOG(WARNING) << "Received unknown message. " << message;
return std::string("");//FIXME should return by another param with type enum
}
Json::Value jvalue = jmessage;
Json::Value jvalue2;
for (auto key : keyList) {
if (rtc::GetValueFromJsonObject(jvalue, key,
&jvalue2)) {
jvalue = jvalue2;
}
else {
return std::string("");
}
}
std::string tmp_str;
rtc::GetStringFromJson(jvalue, &tmp_str);
//std::string tmp_str = rtc::JsonValueToString(jvalue);//this result sdp parse error beacause /r/n
return tmp_str;
}
static Json::Value optJSONValue(std::string message, list<string> keyList) {
//parse json
Json::Reader reader;
Json::Value jmessage;
if (!reader.parse(message, jmessage)) {
RTC_LOG(WARNING) << "Received unknown message. " << message;
return NULL;//FIXME should return by another param with type enum
}
Json::Value jvalue = jmessage;
Json::Value jvalue2;
for (auto key : keyList) {
if (rtc::GetValueFromJsonObject(jvalue, key,
&jvalue2)) {
jvalue = jvalue2;
}
else {
return NULL;
}
}
return jvalue;
}
static long long int OptLLInt(std::string message, list<string> keyList) {
//parse json
Json::Reader reader;
Json::Value jmessage;
if (!reader.parse(message, jmessage)) {
RTC_LOG(WARNING) << "Received unknown message. " << message;
return 0;//FIXME should return by another param with type enum
}
Json::Value jvalue = jmessage;
Json::Value jvalue2;
for (auto key : keyList) {
if (rtc::GetValueFromJsonObject(jvalue, key,
&jvalue2)) {
jvalue = jvalue2;
}
else {
return 0;
}
}
std::string tmp_str = rtc::JsonValueToString(jvalue);
return std::stoll(tmp_str);
}
VideoRoomClient::VideoRoomClient() {
mWsClient = new PeerConnectionWsClient();
m_network_thread = rtc::Thread::CreateWithSocketServer();
m_network_thread->SetName("network_thread", nullptr);
m_network_thread->Start();
m_worker_thread = rtc::Thread::Create();
m_worker_thread->SetName("worker_thread", nullptr);
m_worker_thread->Start();
m_signaling_thread = rtc::Thread::Create();
m_signaling_thread->SetName("signaling_thread", nullptr);
m_signaling_thread->Start();
}
void VideoRoomClient::SendBitrateConstraint(long long int handleId) {
std::string transactionID = RandomString(12);
std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
jt->transactionId = transactionID;
jt->Success = [=](std::string message) {
};
jt->Event = [=](std::string message) {
list<string> resultList = { "plugindata","data","result" };
std::string jsep_str = OptString(message, resultList);
if (jsep_str != "ok") {
//ûÓÐÉèÖóɹ¦
}
};
jt->Error = [=](std::string, std::string) {
RTC_LOG(INFO) << "CreateHandle error:";
};
m_transactionMap[transactionID] = jt;
Json::StyledWriter writer;
Json::Value jmessage;
Json::Value jbody;
jbody["bitrate"] = 128000;
jbody["request"] = "configure";
jmessage["janus"] = "message";
jmessage["body"] = jbody;
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
//client_->SendToJanusAsync(writer.write(jmessage));
}
std::unique_ptr<cricket::VideoCapturer> VideoRoomClient::OpenVideoCaptureDevice() {
std::vector<std::string> device_names;
{
std::unique_ptr<webrtc::VideoCaptureModule::DeviceInfo> info(
webrtc::VideoCaptureFactory::CreateDeviceInfo());
if (!info) {
return nullptr;
}
int num_devices = info->NumberOfDevices();
for (int i = 0; i < num_devices; ++i) {
const uint32_t kSize = 256;
char name[kSize] = { 0 };
char id[kSize] = { 0 };
if (info->GetDeviceName(i, name, kSize, id, kSize) != -1) {
device_names.push_back(name);
}
}
}
cricket::WebRtcVideoDeviceCapturerFactory factory;
std::unique_ptr<cricket::VideoCapturer> capturer;
for (const auto& name : device_names) {
capturer = factory.Create(cricket::Device(name, 0));
if (capturer) {
break;
}
}
return capturer;
}
void VideoRoomClient::AddTracks(long long int handleId) {
if (!m_peer_connection_map[handleId]->peer_connection_->GetSenders().empty()) {
return; // Already added tracks.
}
rtc::scoped_refptr<webrtc::AudioTrackInterface> audio_track(
peer_connection_factory_->CreateAudioTrack(
kAudioLabel, peer_connection_factory_->CreateAudioSource(
cricket::AudioOptions())));
auto result_or_error = m_peer_connection_map[handleId]->peer_connection_->AddTrack(audio_track, { kStreamId });
if (!result_or_error.ok()) {
RTC_LOG(LS_ERROR) << "Failed to add audio track to PeerConnection: "
<< result_or_error.error().message();
}
std::unique_ptr<cricket::VideoCapturer> video_device =
OpenVideoCaptureDevice();
if (video_device) {
webrtc::FakeConstraints constraints;
std::list<std::string> keyList = { webrtc::MediaConstraintsInterface::kMinWidth, webrtc::MediaConstraintsInterface::kMaxWidth,
webrtc::MediaConstraintsInterface::kMinHeight, webrtc::MediaConstraintsInterface::kMaxHeight,
webrtc::MediaConstraintsInterface::kMinFrameRate, webrtc::MediaConstraintsInterface::kMaxFrameRate,
webrtc::MediaConstraintsInterface::kMinAspectRatio, webrtc::MediaConstraintsInterface::kMaxAspectRatio };
//set media constraints
std::map<std::string, std::string> opts;
opts[webrtc::MediaConstraintsInterface::kMaxFrameRate] = 18;
opts[webrtc::MediaConstraintsInterface::kMaxWidth] = 1280;
opts[webrtc::MediaConstraintsInterface::kMaxHeight] = 720;
for (auto key : keyList) {
if (opts.find(key) != opts.end()) {
constraints.AddMandatory(key, opts.at(key));
}
}
rtc::scoped_refptr<webrtc::VideoTrackInterface> video_track_(
peer_connection_factory_->CreateVideoTrack(
kVideoLabel, peer_connection_factory_->CreateVideoSource(
std::move(video_device), nullptr)));
//main_wnd_->StartLocalRenderer(video_track_);
//
//m_peer_connection_map[handleId]->StartRenderer(MainWnd_, video_track_);
result_or_error = m_peer_connection_map[handleId]->peer_connection_->AddTrack(video_track_, { kStreamId });
if (!result_or_error.ok()) {
RTC_LOG(LS_ERROR) << "Failed to add video track to PeerConnection: "
<< result_or_error.error().message();
}
}
else {
RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed";
}
}
bool VideoRoomClient::CreatePeerConnection(long long int handleId, bool dtls) {
RTC_DCHECK(peer_connection_factory_);
if (m_peer_connection_map.find(handleId) != m_peer_connection_map.end()) {
//existed
return false;
}
webrtc::PeerConnectionInterface::RTCConfiguration config;
config.tcp_candidate_policy = webrtc::PeerConnectionInterface::TcpCandidatePolicy::kTcpCandidatePolicyDisabled;
config.bundle_policy = webrtc::PeerConnectionInterface::BundlePolicy::kBundlePolicyMaxBundle;
config.rtcp_mux_policy = webrtc::PeerConnectionInterface::RtcpMuxPolicy::kRtcpMuxPolicyRequire;
config.continual_gathering_policy = webrtc::PeerConnectionInterface::ContinualGatheringPolicy::GATHER_CONTINUALLY;
config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
config.enable_dtls_srtp = dtls;
//additonal setting
if (!config.prerenderer_smoothing()) {
config.set_prerenderer_smoothing(true);
}
config.disable_ipv6 = true;
config.enable_rtp_data_channel = false;
webrtc::PeerConnectionInterface::IceServer server;
server.uri = GetPeerConnectionString();
config.servers.push_back(server);
webrtc::PeerConnectionInterface::BitrateParameters bitrateParam;
bitrateParam.min_bitrate_bps = absl::optional<int>(512000);
bitrateParam.current_bitrate_bps = absl::optional<int>(1256000);
bitrateParam.max_bitrate_bps = absl::optional<int>(1512000);
rtc::scoped_refptr<PeerConnection> peer_connection(
new rtc::RefCountedObject<PeerConnection>());
peer_connection->peer_connection_ = peer_connection_factory_->CreatePeerConnection(
config, nullptr, nullptr, peer_connection);
//set max/min bitrate
peer_connection->peer_connection_->SetBitrate(bitrateParam);
//add to the map
peer_connection->RegisterObserver(this);
peer_connection->SetHandleId(handleId);
m_peer_connection_map[handleId] = peer_connection;
return m_peer_connection_map[handleId]->peer_connection_ != nullptr;
}
bool VideoRoomClient::InitializePeerConnection(long long int handleId, bool bPublisher) {
if (m_peer_connection_map.find(handleId) != m_peer_connection_map.end()) {
//existing
return false;
}
if (!peer_connection_factory_) {
peer_connection_factory_ = webrtc::CreatePeerConnectionFactory(
this->m_network_thread.get() /* network_thread */,
this->m_worker_thread.get() /* worker_thread */,
this->m_signaling_thread.get() /* signaling_thread */,
nullptr /* default_adm */,
webrtc::CreateBuiltinAudioEncoderFactory(),
webrtc::CreateBuiltinAudioDecoderFactory(),
webrtc::CreateBuiltinVideoEncoderFactory(),
webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */,
nullptr /* audio_processing */);
}
if (!peer_connection_factory_) {
std::cout << "Failed to initialize PeerConnectionFactory" << std::endl;
DeletePeerConnection(handleId);
return false;
}
if (!CreatePeerConnection(handleId,/*dtls=*/true)) {
std::cout << "CreatePeerConnection failed" << std::endl;
DeletePeerConnection(handleId);
}
//subscriber no need local tracks(audio and video)
if (bPublisher) {
AddTracks(handleId);
}
m_peer_connection_map[handleId]->b_publisher_ = true;
return m_peer_connection_map[handleId]->peer_connection_ != nullptr;
}
void VideoRoomClient::DeletePeerConnection(long long int handleId) {
m_peer_connection_map[handleId]->StopRenderer();
m_peer_connection_map[handleId]->peer_connection_ = nullptr;
//peer_connection_factory_ = nullptr; //TODO should destroy before quit
}
void VideoRoomClient::ConectToServer(std::string ip, int port) {
auto ws_server = std::string("ws://") + ip + std::string(":") + std::to_string(port);
mWsClient->RegisterObserver(this);
mWsClient->Connect(ws_server, "1111");
}
void VideoRoomClient::UIThreadCallback(int msg_id, void* data) {
switch (msg_id) {
case PEER_CONNECTION_CLOSED:
RTC_LOG(INFO) << "PEER_CONNECTION_CLOSED";
for (auto& key : m_peer_connection_map) {
DeletePeerConnection(key.first);
}
case NEW_TRACK_ADDED: {
NEW_TRACK* pTrack = (NEW_TRACK*)data;
long long int handleId = pTrack->handleId;
auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(pTrack->pInterface);
if (track->kind() == webrtc::MediaStreamTrackInterface::kVideoKind) {
auto* video_track = static_cast<webrtc::VideoTrackInterface*>(track);
//main_wnd_->StartRemoteRenderer(video_track);
//m_peer_connection_map[handleId]->StartRenderer(MainWnd_, video_track);
}
track->Release();
delete pTrack;
break;
}
case TRACK_REMOVED: {
// Remote peer stopped sending a track.
auto* track = reinterpret_cast<webrtc::MediaStreamTrackInterface*>(data);
track->Release();
break;
}
case CREATE_OFFER: {
long long int* pHandleId = (long long int*)(data);
long long int handleId = *pHandleId;
if (InitializePeerConnection(handleId, true)) {
m_peer_connection_map[handleId]->CreateOffer();
}
else {
std::cout << "Error" << "Failed to initialize PeerConnection";
}
break;
}
case SET_REMOTE_ANSWER: {
REMOTE_SDP_INFO* pInfo = (REMOTE_SDP_INFO*)(data);
long long int handleId = pInfo->handleId;
std::string jsep_str = pInfo->jsep_str;
std::unique_ptr<webrtc::SessionDescriptionInterface> session_description =
webrtc::CreateSessionDescription(webrtc::SdpType::kAnswer, jsep_str);
//auto* session_description = reinterpret_cast<webrtc::SessionDescriptionInterface*>(data);
//auto* session_description = reinterpret_cast<webrtc::SessionDescriptionInterface*>(pInfo->pInterface);
m_peer_connection_map[handleId]->SetRemoteDescription(session_description.release());
//delete pInfo;
//TODO fixme suitable here?
SendBitrateConstraint(handleId);
break;
}
case SET_REMOTE_OFFER: {
REMOTE_SDP_INFO* pInfo = (REMOTE_SDP_INFO*)(data);
long long int handleId = pInfo->handleId;
std::string jsep_str = pInfo->jsep_str;
std::unique_ptr<webrtc::SessionDescriptionInterface> session_description =
webrtc::CreateSessionDescription(webrtc::SdpType::kOffer, jsep_str);
//as subscriber
if (InitializePeerConnection(handleId, false)) {
m_peer_connection_map[handleId]->SetRemoteDescription(session_description.release());
m_peer_connection_map[handleId]->CreateAnswer();
}
else {
std::cout << "Error" << "Failed to initialize PeerConnection";
}
//peerConnection.setRemoteDescription(sdpObserver, sdp);
//peerConnection.createAnswer(connection.sdpObserver, sdpMediaConstraints);
break;
}
default:
RTC_NOTREACHED();
break;
}
}
void VideoRoomClient::SendOffer(long long int handleId, std::string sdp_type, std::string sdp_desc) {
std::string transactionID = RandomString(12);
std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
jt->transactionId = transactionID;
jt->Event = [=](std::string message) {
list<string> resultList = { "jsep","sdp" };
std::string jsep_str = OptString(message, resultList);
REMOTE_SDP_INFO* pInfo = new REMOTE_SDP_INFO;
pInfo->handleId = handleId;
pInfo->jsep_str = jsep_str;
UIThreadCallback(SET_REMOTE_ANSWER, pInfo);
};
m_transactionMap[transactionID] = jt;
Json::StyledWriter writer;
Json::Value jmessage;
Json::Value jbody;
Json::Value jjsep;
jbody["request"] = "configure";
jbody["audio"] = true;
jbody["video"] = true;
jjsep["type"] = sdp_type;
jjsep["sdp"] = sdp_desc;
jmessage["body"] = jbody;
jmessage["jsep"] = jjsep;
jmessage["janus"] = "message";
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
//beacause the thread is on UI,so shift thread to ws thread
mWsClient->SendToJanusAsync(writer.write(jmessage));
}
void VideoRoomClient::SendAnswer(long long int handleId, std::string sdp_type, std::string sdp_desc) {
std::string transactionID = RandomString(12);
std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
jt->transactionId = transactionID;
jt->Event = [=](std::string message) {
};
m_transactionMap[transactionID] = jt;
Json::StyledWriter writer;
Json::Value jmessage;
Json::Value jbody;
Json::Value jjsep;
jbody["request"] = "start";
jbody["room"] = "1234";
jjsep["type"] = sdp_type;
jjsep["sdp"] = sdp_desc;
jmessage["body"] = jbody;
jmessage["jsep"] = jjsep;
jmessage["janus"] = "message";
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
//beacause the thread is on UI,so shift thread to ws thread
mWsClient->SendToJanusAsync(writer.write(jmessage));
}
void VideoRoomClient::trickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate) {
std::string transactionID = RandomString(12);
Json::StyledWriter writer;
Json::Value jmessage;
Json::Value jcandidate;
std::string sdp;
if (!candidate->ToString(&sdp)) {
RTC_LOG(LS_ERROR) << "Failed to serialize candidate";
return;
}
jcandidate["sdpMid"] = candidate->sdp_mid();
jcandidate["sdpMLineIndex"] = candidate->sdp_mline_index();
jcandidate["candidate"] = sdp;
jmessage["janus"] = "trickle";
jmessage["candidate"] = jcandidate;
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
mWsClient->SendToJanusAsync(writer.write(jmessage));
}
void VideoRoomClient::trickleCandidateComplete(long long int handleId) {
std::string transactionID = RandomString(12);
Json::StyledWriter writer;
Json::Value jmessage;
Json::Value jcandidate;
jcandidate["completed"] = true;
jmessage["janus"] = "trickle";
jmessage["candidate"] = jcandidate;
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
mWsClient->SendToJanusAsync(writer.write(jmessage));
}
void VideoRoomClient::PCSendSDP(long long int handleId, std::string sdpType, std::string sdp) {
if (sdpType == "offer") {
SendOffer(handleId, sdpType, sdp);
}
else {
SendAnswer(handleId, sdpType, sdp);
}
}
void VideoRoomClient::PCQueueUIThreadCallback(int msg_id, void* data) {
std::cout << msg_id << std::endl;
UIThreadCallback(msg_id,
data);
}
void VideoRoomClient::PCTrickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate) {
std::cout << "PCTrickleCandidate" << std::endl;
trickleCandidate(handleId, candidate);
}
void VideoRoomClient::PCTrickleCandidateComplete(long long int handleId) {
std::cout << "PCTrickleCandidateComplete" << std::endl;
trickleCandidateComplete(handleId);
}
void VideoRoomClient::OnSignedIn() {
std::cout << "siginedIn called\r\n";
}
void VideoRoomClient::OnDisconnected() {
std::cout << "OnDisconnected\r\n";
}
void VideoRoomClient::OnPeerConnected(int id, const std::string& name) {
std::cout << "peerconnect" << id << name << "\r\n";
}
void VideoRoomClient::OnMessageFromJanus(int peer_id, const std::string& message) {
RTC_DCHECK(!message.empty());
RTC_LOG(INFO) << "got msg: " << message;
//TODO make sure in right state
//parse json
Json::Reader reader;
Json::Value jmessage;
if (!reader.parse(message, jmessage)) {
RTC_LOG(WARNING) << "Received unknown message. " << message;
return;
}
std::string janus_str;
std::string json_object;
rtc::GetStringFromJsonObject(jmessage, "janus",
&janus_str);
if (!janus_str.empty()) {
if (janus_str == "ack") {
// Just an ack, we can probably ignore
RTC_LOG(INFO) << "Got an ack on session. ";
}
else if (janus_str == "success") {
rtc::GetStringFromJsonObject(jmessage, "transaction",
&janus_str);
std::shared_ptr<JanusTransaction> jt = m_transactionMap.at(janus_str);
//call signal
if (jt) {
jt->Success(message);//handle_id not ready yet
}
m_transactionMap.erase(janus_str);
}
else if (janus_str == "trickle") {
RTC_LOG(INFO) << "Got a trickle candidate from Janus. ";
}
else if (janus_str == "webrtcup") {
RTC_LOG(INFO) << "The PeerConnection with the gateway is up!";
}
else if (janus_str == "hangup") {
RTC_LOG(INFO) << "A plugin asked the core to hangup a PeerConnection on one of our handles! ";
}
else if (janus_str == "detached") {
RTC_LOG(INFO) << "A plugin asked the core to detach one of our handles! ";
}
else if (janus_str == "media") {
RTC_LOG(INFO) << "Media started/stopped flowing. ";
}
else if (janus_str == "slowlink") {
RTC_LOG(INFO) << "Got a slowlink event! ";
}
else if (janus_str == "error") {
RTC_LOG(INFO) << "Got an error. ";
// Oops, something wrong happened
rtc::GetStringFromJsonObject(jmessage, "transaction",
&janus_str);
std::shared_ptr<JanusTransaction> jt = m_transactionMap.at(janus_str);
//call signal
if (jt) {
jt->Error("123", "456");//TODO need to parse the error code and desc
}
m_transactionMap.erase(janus_str);
}
else {
if (janus_str == "event") {
RTC_LOG(INFO) << "Got a plugin event! ";
//get publishers
list<string> str_publishers{ "plugindata" ,"data","publishers" };
Json::Value value_publishers = optJSONValue(message, str_publishers);
std::vector<Json::Value> PublisherVec;
rtc::JsonArrayToValueVector(value_publishers, &PublisherVec);
//constrain the max publishers count to 5
for (auto pub : PublisherVec) {
std::string str_feedid;
std::string display;
Json::Value jvalue;
rtc::GetValueFromJsonObject(pub, "id", &jvalue);
rtc::GetStringFromJsonObject(pub, "display", &display);
str_feedid = rtc::JsonValueToString(jvalue);
long long int feedId = std::stoll(str_feedid);
CreateHandle("janus.plugin.videoroom", feedId, display);
}
bool bSuccess = rtc::GetStringFromJsonObject(jmessage, "transaction",
&janus_str);
if (bSuccess) {
std::shared_ptr<JanusTransaction> jt = m_transactionMap.at(janus_str);
if (jt) {
jt->Event(message);
}
}
}
}
}
}
void VideoRoomClient::OnMessageSent(int err) {
std::cout << "message sent" << err << "\r\n";
}
void VideoRoomClient::OnServerConnectionFailure() {
std::cout << "OnServerConnectionFailure" << "\r\n";
}
void VideoRoomClient::OnJanusConnected() {
std::cout << "OnJanusConnected" << "\r\n";
// janus session¶Ô½Ó
this->CreateSession();
}
void VideoRoomClient::OnJanusDisconnected() {
std::cout << "OnJanusDisconnected" << "\r\n";
}
void VideoRoomClient::OnSendKeepAliveToJanus() {
std::cout << "OnSendKeepAliveToJanus" << "\r\n";
KeepAlive();
}
void VideoRoomClient::KeepAlive()
{
auto signaling_thread_ = rtc::Thread::Create();
signaling_thread_->SetName("signaling_thread", nullptr);
signaling_thread_->Start();
if (m_SessionId > 0) {
std::string transactionID = RandomString(12);
Json::StyledWriter writer;
Json::Value jmessage;
jmessage["janus"] = "keepalive";
jmessage["session_id"] = m_SessionId;
jmessage["transaction"] = transactionID;
mWsClient->SendToJanus(writer.write(jmessage));
}
}
void VideoRoomClient::JoinRoom(std::string pluginName, long long int handleId, long long int feedId) {
//rtcEvents.onPublisherJoined(handle.handleId);
std::string transactionID = RandomString(12);
std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
jt->transactionId = transactionID;
jt->Event = [=](std::string message) {
//get sender
list<string> senderList = { "sender" };
long long int sender = OptLLInt(message, senderList);
//get room
list<string> resultList = { "plugindata","data","result" };
list<string> roomList = { "plugindata","data","videoroom" };
list<string> publisherList = { "plugindata","data","videoroom" };
//echotest return result=ok
std::string result = OptString(message, resultList);
if (result == "ok") {
RTC_LOG(WARNING) << "echotest negotiation ok! ";
}
std::string videoroom = OptString(message, roomList);
//joined the room as a publisher
if (videoroom == "joined") {
UIThreadCallback(CREATE_OFFER, (void*)(&handleId));
//for each search every publisher and create handle to attach them
}
//joined the room as a subscriber
if (videoroom == "attached") {
//TODO make sure this sdp is offer from remote peer
list<string> resultList = { "jsep","sdp" };
std::string jsep_str = OptString(message, resultList);
REMOTE_SDP_INFO* pInfo = new REMOTE_SDP_INFO;
pInfo->handleId = handleId;
pInfo->jsep_str = jsep_str;
UIThreadCallback(SET_REMOTE_OFFER, pInfo);
}
};
m_transactionMap[transactionID] = jt;
Json::StyledWriter writer;
Json::Value jmessage;
Json::Value jbody;
if (pluginName == "janus.plugin.videoroom") {
jbody["request"] = "join";
jbody["room"] = 1234;//FIXME should be variable
if (feedId == 0) {
jbody["ptype"] = "publisher";
jbody["display"] = "pcg";//FIXME should be variable
}
else {
jbody["ptype"] = "subscriber";
jbody["feed"] = feedId;
jbody["private_id"] = 0;//FIXME should be variable
}
jmessage["body"] = jbody;
jmessage["janus"] = "message";
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
mWsClient->SendToJanus(writer.write(jmessage));
//After joined,Then create offer
}
else if (pluginName == "janus.plugin.audiobridge") {
}
else if (pluginName == "janus.plugin.echotest") {
jbody["audio"] = true;
jbody["video"] = true;
jmessage["body"] = jbody;
jmessage["janus"] = "message";
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
jmessage["handle_id"] = handleId;
mWsClient->SendToJanus(writer.write(jmessage));
//shift the process to UI thread to createOffer
UIThreadCallback(CREATE_OFFER, (void*)(handleId));
}
}
void VideoRoomClient::CreateHandle(std::string pluginName, long long int feedId, std::string display) {
std::string transactionID = RandomString(12);
std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
jt->transactionId = transactionID;
jt->Success = [=](std::string message) {
list<string> handleList = { "data","id" };
long long int handle_id = OptLLInt(message, handleList);
//add handle to the map
std::shared_ptr<JanusHandle> jh(new JanusHandle());
jh->handleId = handle_id;
jh->display = display;
jh->feedId = feedId;
m_handleMap[handle_id] = jh;
JoinRoom(pluginName, handle_id, feedId);//TODO feedid means nothing in echotest,else?
};
jt->Event = [=](std::string message) {
};
jt->Error = [=](std::string, std::string) {
RTC_LOG(INFO) << "CreateHandle error:";
};
m_transactionMap[transactionID] = jt;
Json::StyledWriter writer;
Json::Value jmessage;
jmessage["janus"] = "attach";
jmessage["plugin"] = pluginName;
jmessage["transaction"] = transactionID;
jmessage["session_id"] = m_SessionId;
mWsClient->SendToJanus(writer.write(jmessage));
}
void VideoRoomClient::CreateSession() {
int rev_tid1 = GetCurrentThreadId();
std::string transactionID = RandomString(12);
std::shared_ptr<JanusTransaction> jt(new JanusTransaction());
jt->transactionId = transactionID;
//TODO Is it possible for lamda expression here?
jt->Success = [=](std::string message) mutable {
list<string> sessionList = { "data","id" };
m_SessionId = OptLLInt(message, sessionList);
//lauch the timer for keep alive breakheart
//Then Create the handle
CreateHandle("janus.plugin.videoroom", 0, "pcg");
};
jt->Error = [=](std::string code, std::string reason) {
RTC_LOG(INFO) << "Ooops: " << code << " " << reason;
};
m_transactionMap[transactionID] = jt;
Json::StyledWriter writer;
Json::Value jmessage;
jmessage["janus"] = "create";
jmessage["transaction"] = transactionID;
mWsClient->SendToJanus(writer.write(jmessage));
}

View File

@ -0,0 +1,130 @@
#pragma once
#include <iostream>
#include "defaults.h"
#include "uWS.h"
#include <iostream>
#include <chrono>
#include <cmath>
#include <thread>
#include <fstream>
#include <vector>
#include <set>
#include <unordered_set>
#include <unordered_map>
#include <map>
#include <atomic>
#include "peer_connection_wsclient.h"
#include "rtc_base/json.h"
#include "JanusHandle.h"
#include "JanusTransaction.h"
#include <list>
#include "rtc_base/checks.h"
#include "rtc_base/json.h"
#include "rtc_base/logging.h"
#include "api/audio_codecs/builtin_audio_decoder_factory.h"
#include "api/audio_codecs/builtin_audio_encoder_factory.h"
#include "api/video_codecs/builtin_video_decoder_factory.h"
#include "api/video_codecs/builtin_video_encoder_factory.h"
#include "api/test/fakeconstraints.h"
#include "defaults.h"
#include "media/engine/webrtcvideocapturerfactory.h"
#include "modules/audio_device/include/audio_device.h"
#include "modules/audio_processing/include/audio_processing.h"
#include "modules/video_capture/video_capture_factory.h"
#include "peer_connection.h"
#include "rtc_base/arraysize.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/stringutils.h"
#include "rtc_base/win32socketserver.h"
#include "rtc_base/win32socketinit.h"
#include "third_party/libyuv/include/libyuv/convert_argb.h"
#if defined(WEBRTC_WIN)
#include "rtc_base/win32.h"
#endif
using namespace std;
using namespace rtc;
struct REMOTE_SDP_INFO {
long long int handleId;
std::string jsep_str;
};
class ImplPeerConnectionWsClientObserver :
public sigslot::has_slots<>,
public PeerConnectionWsClientObserver {
public:
ImplPeerConnectionWsClientObserver() {
};
};
class VideoRoomClient :
public PeerConnectionWsClientObserver,
public PeerConnectionCallback,
public sigslot::has_slots<> {
public:
VideoRoomClient();
void SendBitrateConstraint(long long int handleId);
std::unique_ptr<cricket::VideoCapturer> OpenVideoCaptureDevice();
void AddTracks(long long int handleId);
bool CreatePeerConnection(long long int handleId, bool dtls);
bool InitializePeerConnection(long long int handleId, bool bPublisher);
void DeletePeerConnection(long long int handleId);
void ConectToServer(std::string ip, int port);
void UIThreadCallback(int msg_id, void* data);
void SendOffer(long long int handleId, std::string sdp_type, std::string sdp_desc);
void SendAnswer(long long int handleId, std::string sdp_type, std::string sdp_desc);
void trickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate);
void trickleCandidateComplete(long long int handleId);
protected:
virtual void PCSendSDP(long long int handleId, std::string sdpType, std::string sdp);
virtual void PCQueueUIThreadCallback(int msg_id, void* data);
virtual void PCTrickleCandidate(long long int handleId, const webrtc::IceCandidateInterface* candidate);
virtual void PCTrickleCandidateComplete(long long int handleId);
virtual void OnSignedIn();;
virtual void OnDisconnected();
virtual void OnPeerConnected(int id, const std::string& name);
virtual void OnMessageFromJanus(int peer_id, const std::string& message);
virtual void OnMessageSent(int err);
virtual void OnServerConnectionFailure();
virtual void OnJanusConnected();
virtual void OnJanusDisconnected();
virtual void OnSendKeepAliveToJanus();
private:
void KeepAlive();
void JoinRoom(std::string pluginName, long long int handleId, long long int feedId);
void CreateHandle(std::string pluginName, long long int feedId, std::string display);
void CreateSession();;
PeerConnectionWsClient* mWsClient;
long long int m_SessionId = 0;
std::map<std::string, std::shared_ptr<JanusTransaction>> m_transactionMap;
std::map<long long int, std::shared_ptr<JanusHandle>> m_handleMap;
std::map<long long int, rtc::scoped_refptr<PeerConnection>> m_peer_connection_map;
rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> peer_connection_factory_;
std::unique_ptr<Thread> m_signaling_thread;
std::unique_ptr<Thread> m_worker_thread;
std::unique_ptr<Thread> m_network_thread;
};