SRS2.0一次writev多个packet支持10000客户端。
listen 1935; vhost __defaultVhost__ { tcp_nodelay on; min_latency on; play { gop_cache off; queue_length 10; mw_latency 100; } publish { mr off; } }
Nagle算法,不再缓存发送
srs_error_t SrsTcpConnection::set_tcp_nodelay(bool v) { srs_error_t err = srs_success; int r0 = 0; socklen_t nb_v = sizeof(int); int fd = srs_netfd_fileno(stfd); int ov = 0; if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v)) != 0) { return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0); } #ifndef SRS_PERF_TCP_NODELAY srs_warn("ignore TCP_NODELAY, fd=%d, ov=%d", fd, ov); return err; #endif int iv = (v? 1:0); if ((r0 = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, nb_v)) != 0) { return srs_error_new(ERROR_SOCKET_NO_NODELAY, "setsockopt fd=%d, r0=%d", fd, r0); } if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, &nb_v)) != 0) { return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0); } srs_trace("set fd=%d TCP_NODELAY %d=>%d", fd, ov, iv); return err; }
vhost mrw.srs.com { # whether enable min delay mode for vhost. # for min latence mode: # 1. disable the publish.mr for vhost. # 2. use timeout for cond wait for consumer queue. # @see https://github.com/ossrs/srs/issues/257 # default: off min_latency off; }
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); publish_edge->set_queue_size(queue_size); ... void SrsMessageQueue::set_queue_size(srs_utime_t queue_size) { max_queue_size = queue_size; } ... srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) { srs_error_t err = srs_success; msgs.push_back(msg); // If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the // keyframes even if there is not overflow packets in queue, so we must ignore the zero timestamps, please // @see https://github.com/ossrs/srs/pull/2186#issuecomment- if (msg->is_av() && msg->timestamp != 0) { if (av_start_time == -1) { av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } if (max_queue_size <= 0) { return err; } while (av_end_time - av_start_time > max_queue_size) { // notice the caller queue already overflow and shrinked. if (is_overflow) { *is_overflow = true; } shrink(); } return err; } ... srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_msg->copy(); if (!atc) { if ((err = jitter->correct(msg, ag)) != srs_success) { return srs_error_wrap(err, "consume message"); } } if ((err = queue->enqueue(msg, NULL)) != srs_success) { return srs_error_wrap(err, "enqueue message"); } return err; }
srs_error_t SrsRtmpConn::stream_service_cycle() { ... bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str()); source->set_cache(enabled_cache); ... void SrsLiveSource::set_cache(bool enabled) { gop_cache->set(enabled); } ... void SrsGopCache::set(bool v) { enable_gop_cache = v; if (!v) { clear(); return; } } ... srs_error_t SrsGopCache::cache(SrsSharedPtrMessage* shared_msg) { srs_error_t err = srs_success; if (!enable_gop_cache) { return err; } // the gop cache know when to gop it. SrsSharedPtrMessage* msg = shared_msg; // got video, update the video count if acceptable if (msg->is_video()) { // drop video when not h.264 if (!SrsFlvVideo::h264(msg->payload, msg->size)) { return err; } cached_video_count++; audio_after_last_video_count = 0; } // no acceptable video or pure audio, disable the cache. if (pure_audio()) { return err; } // ok, gop cache enabled, and got an audio. if (msg->is_audio()) { audio_after_last_video_count++; } // clear gop cache when pure audio count overflow if (audio_after_last_video_count > SRS_PURE_AUDIO_GUESS_COUNT) { srs_warn("clear gop cache for guess pure audio overflow"); clear(); return err; } // clear gop cache when got key frame if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) { clear(); // curent msg is video frame, so we set to 1. cached_video_count = 1; } // cache the frame. gop_cache.push_back(msg->copy()); return err; }
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { ... srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); if ((err = rohc->set_socket_buffer(mw_sleep)) != srs_success) { return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep); } ... srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd) { ... mw_sleep = _srs_config->get_mw_sleep(req->vhost); skt->set_socket_buffer(mw_sleep); ... srs_error_t SrsTcpConnection::set_socket_buffer(srs_utime_t buffer_v) { srs_error_t err = srs_success; int r0 = 0; int fd = srs_netfd_fileno(stfd); socklen_t nb_v = sizeof(int); int ov = 0; if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ov, &nb_v)) != 0) { return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0); } // the bytes: // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, // 128KB=, 256KB=, 512KB= // the buffer should set to sleep*kbps/8, // for example, your system delivery stream in 1000kbps, // sleep 800ms for small bytes, the buffer should set to: // 800*1000/8=B(about 128KB). // other examples: // 2000*3000/8=B(about 732KB). // 2000*5000/8=B(about 1220KB). int kbps = 4000; int iv = srsu2ms(buffer_v) * kbps / 8; // socket send buffer, system will double it. iv = iv / 2; // set the socket send buffer when required larger buffer if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, nb_v) < 0) { return srs_error_new(ERROR_SOCKET_SNDBUF, "setsockopt fd=%d, r0=%d", fd, r0); } if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, &nb_v)) != 0) { return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0); } srs_trace("set fd=%d, SO_SNDBUF=%d=>%d, buffer=%dms", fd, ov, iv, srsu2ms(buffer_v)); return err; }
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid) : trd(this, rtmp_sdk, tm, parent_cid) { ... mr_sleep = _srs_config->get_mr_sleep(req->vhost); ... void SrsPublishRecvThread::on_start() { // we donot set the auto response to false, // for the main thread never send message. #ifdef SRS_PERF_MERGED_READ if (mr) { // set underlayer buffer size set_socket_buffer(mr_sleep); // disable the merge read rtmp->set_merge_read(true, this); } #endif } ... void SrsPublishRecvThread::set_socket_buffer(srs_utime_t sleep_v) { // the bytes: // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, // 128KB=, 256KB=, 512KB= // the buffer should set to sleep*kbps/8, // for example, your system delivery stream in 1000kbps, // sleep 800ms for small bytes, the buffer should set to: // 800*1000/8=B(about 128KB). // other examples: // 2000*3000/8=B(about 732KB). // 2000*5000/8=B(about 1220KB). int kbps = 5000; int socket_buffer_size = srsu2msi(sleep_v) * kbps / 8; int fd = mr_fd; int onb_rbuf = 0; socklen_t sock_buf_size = sizeof(int); getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &onb_rbuf, &sock_buf_size); // socket recv buffer, system will double it. int nb_rbuf = socket_buffer_size / 2; if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) { srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf); } getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size); srs_trace("mr change sleep %d=>%d, erbuf=%d, rbuf %d=>%d, sbytes=%d, realtime=%d", srsu2msi(mr_sleep), srsu2msi(sleep_v), socket_buffer_size, onb_rbuf, nb_rbuf, SRS_MR_SMALL_BYTES, realtime); rtmp->set_recv_buffer(nb_rbuf); }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/208187.html原文链接:https://javaforall.net
