|
| 1 | +#include "selfdrive/loggerd/loggerd.h" |
| 2 | + |
| 3 | +ExitHandler do_exit; |
| 4 | + |
| 5 | +struct EncoderdState { |
| 6 | + int max_waiting = 0; |
| 7 | + |
| 8 | + // Sync logic for startup |
| 9 | + std::atomic<int> encoders_ready = 0; |
| 10 | + std::atomic<uint32_t> start_frame_id = 0; |
| 11 | + bool camera_ready[WideRoadCam + 1] = {}; |
| 12 | + bool camera_synced[WideRoadCam + 1] = {}; |
| 13 | +}; |
| 14 | + |
| 15 | +// Handle initial encoder syncing by waiting for all encoders to reach the same frame id |
| 16 | +bool sync_encoders(EncoderdState *s, CameraType cam_type, uint32_t frame_id) { |
| 17 | + if (s->camera_synced[cam_type]) return true; |
| 18 | + |
| 19 | + if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) { |
| 20 | + // add a small margin to the start frame id in case one of the encoders already dropped the next frame |
| 21 | + update_max_atomic(s->start_frame_id, frame_id + 2); |
| 22 | + if (std::exchange(s->camera_ready[cam_type], true) == false) { |
| 23 | + ++s->encoders_ready; |
| 24 | + LOGD("camera %d encoder ready", cam_type); |
| 25 | + } |
| 26 | + return false; |
| 27 | + } else { |
| 28 | + if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id); |
| 29 | + bool synced = frame_id >= s->start_frame_id; |
| 30 | + s->camera_synced[cam_type] = synced; |
| 31 | + if (!synced) LOGD("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id); |
| 32 | + return synced; |
| 33 | + } |
| 34 | +} |
| 35 | + |
| 36 | + |
| 37 | +void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) { |
| 38 | + util::set_thread_name(cam_info.filename); |
| 39 | + |
| 40 | + std::vector<Encoder *> encoders; |
| 41 | + VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); |
| 42 | + |
| 43 | + int cur_seg = 0; |
| 44 | + while (!do_exit) { |
| 45 | + if (!vipc_client.connect(false)) { |
| 46 | + util::sleep_for(5); |
| 47 | + continue; |
| 48 | + } |
| 49 | + |
| 50 | + // init encoders |
| 51 | + if (encoders.empty()) { |
| 52 | + VisionBuf buf_info = vipc_client.buffers[0]; |
| 53 | + LOGD("encoder init %dx%d", buf_info.width, buf_info.height); |
| 54 | + |
| 55 | + // main encoder |
| 56 | + encoders.push_back(new Encoder(cam_info.filename, cam_info.type, buf_info.width, buf_info.height, |
| 57 | + cam_info.fps, cam_info.bitrate, cam_info.is_h265, |
| 58 | + buf_info.width, buf_info.height, false)); |
| 59 | + // qcamera encoder |
| 60 | + if (cam_info.has_qcamera) { |
| 61 | + encoders.push_back(new Encoder(qcam_info.filename, cam_info.type, buf_info.width, buf_info.height, |
| 62 | + qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, |
| 63 | + qcam_info.frame_width, qcam_info.frame_height, false)); |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + for (int i = 0; i < encoders.size(); ++i) { |
| 68 | + encoders[i]->encoder_open(NULL); |
| 69 | + } |
| 70 | + |
| 71 | + bool lagging = false; |
| 72 | + while (!do_exit) { |
| 73 | + VisionIpcBufExtra extra; |
| 74 | + VisionBuf* buf = vipc_client.recv(&extra); |
| 75 | + if (buf == nullptr) continue; |
| 76 | + |
| 77 | + // detect loop around and drop the frames |
| 78 | + if (buf->get_frame_id() != extra.frame_id) { |
| 79 | + if (!lagging) { |
| 80 | + LOGE("encoder %s lag buffer id: %d extra id: %d", cam_info.filename, buf->get_frame_id(), extra.frame_id); |
| 81 | + lagging = true; |
| 82 | + } |
| 83 | + continue; |
| 84 | + } |
| 85 | + lagging = false; |
| 86 | + |
| 87 | + if (cam_info.trigger_rotate) { |
| 88 | + if (!sync_encoders(s, cam_info.type, extra.frame_id)) { |
| 89 | + continue; |
| 90 | + } |
| 91 | + if (do_exit) break; |
| 92 | + } |
| 93 | + |
| 94 | + // encode a frame |
| 95 | + for (int i = 0; i < encoders.size(); ++i) { |
| 96 | + int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, |
| 97 | + buf->width, buf->height, &extra); |
| 98 | + |
| 99 | + if (out_id == -1) { |
| 100 | + LOGE("Failed to encode frame. frame_id: %d", extra.frame_id); |
| 101 | + } |
| 102 | + } |
| 103 | + |
| 104 | + const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; |
| 105 | + if (cur_seg >= 0 && extra.frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) { |
| 106 | + for (auto &e : encoders) { |
| 107 | + e->encoder_close(); |
| 108 | + e->encoder_open(NULL); |
| 109 | + } |
| 110 | + ++cur_seg; |
| 111 | + } |
| 112 | + } |
| 113 | + } |
| 114 | + |
| 115 | + LOG("encoder destroy"); |
| 116 | + for(auto &e : encoders) { |
| 117 | + e->encoder_close(); |
| 118 | + delete e; |
| 119 | + } |
| 120 | +} |
| 121 | + |
| 122 | +void encoderd_thread() { |
| 123 | + EncoderdState s; |
| 124 | + |
| 125 | + std::vector<std::thread> encoder_threads; |
| 126 | + for (const auto &cam : cameras_logged) { |
| 127 | + if (cam.enable) { |
| 128 | + encoder_threads.push_back(std::thread(encoder_thread, &s, cam)); |
| 129 | + if (cam.trigger_rotate) s.max_waiting++; |
| 130 | + } |
| 131 | + } |
| 132 | + for (auto &t : encoder_threads) t.join(); |
| 133 | +} |
| 134 | + |
| 135 | +int main() { |
| 136 | + if (Hardware::TICI()) { |
| 137 | + int ret; |
| 138 | + ret = util::set_realtime_priority(52); |
| 139 | + assert(ret == 0); |
| 140 | + ret = util::set_core_affinity({7}); |
| 141 | + assert(ret == 0); |
| 142 | + } |
| 143 | + encoderd_thread(); |
| 144 | + return 0; |
| 145 | +} |
0 commit comments