| 1 | // Copyright © SixtyFPS GmbH <info@slint.dev> |
| 2 | // SPDX-License-Identifier: MIT |
| 3 | |
| 4 | use std::path::PathBuf; |
| 5 | |
| 6 | use futures::{future::OptionFuture, FutureExt}; |
| 7 | |
| 8 | mod audio; |
| 9 | mod video; |
| 10 | |
| 11 | #[derive (Clone, Copy)] |
| 12 | pub enum ControlCommand { |
| 13 | Play, |
| 14 | Pause, |
| 15 | } |
| 16 | |
| 17 | pub struct Player { |
| 18 | control_sender: smol::channel::Sender<ControlCommand>, |
| 19 | demuxer_thread: Option<std::thread::JoinHandle<()>>, |
| 20 | playing: bool, |
| 21 | playing_changed_callback: Box<dyn Fn(bool)>, |
| 22 | } |
| 23 | |
| 24 | impl Player { |
| 25 | pub fn start( |
| 26 | path: PathBuf, |
| 27 | video_frame_callback: impl FnMut(&ffmpeg_next::util::frame::Video) + Send + 'static, |
| 28 | playing_changed_callback: impl Fn(bool) + 'static, |
| 29 | ) -> Result<Self, anyhow::Error> { |
| 30 | let (control_sender, control_receiver) = smol::channel::unbounded(); |
| 31 | |
| 32 | let demuxer_thread = |
| 33 | std::thread::Builder::new().name("demuxer thread" .into()).spawn(move || { |
| 34 | smol::block_on(async move { |
| 35 | let mut input_context = ffmpeg_next::format::input(&path).unwrap(); |
| 36 | |
| 37 | let video_stream = |
| 38 | input_context.streams().best(ffmpeg_next::media::Type::Video).unwrap(); |
| 39 | let video_stream_index = video_stream.index(); |
| 40 | let video_playback_thread = video::VideoPlaybackThread::start( |
| 41 | &video_stream, |
| 42 | Box::new(video_frame_callback), |
| 43 | ) |
| 44 | .unwrap(); |
| 45 | |
| 46 | let audio_stream = |
| 47 | input_context.streams().best(ffmpeg_next::media::Type::Audio).unwrap(); |
| 48 | let audio_stream_index = audio_stream.index(); |
| 49 | let audio_playback_thread = |
| 50 | audio::AudioPlaybackThread::start(&audio_stream).unwrap(); |
| 51 | |
| 52 | let mut playing = true; |
| 53 | |
| 54 | // This is sub-optimal, as reading the packets from ffmpeg might be blocking |
| 55 | // and the future won't yield for that. So while ffmpeg sits on some blocking |
| 56 | // I/O operation, the caller here will also block and we won't end up polling |
| 57 | // the control_receiver future further down. |
| 58 | let packet_forwarder_impl = async { |
| 59 | for (stream, packet) in input_context.packets() { |
| 60 | if stream.index() == audio_stream_index { |
| 61 | audio_playback_thread.receive_packet(packet).await; |
| 62 | } else if stream.index() == video_stream_index { |
| 63 | video_playback_thread.receive_packet(packet).await; |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | .fuse() |
| 68 | .shared(); |
| 69 | |
| 70 | loop { |
| 71 | // This is sub-optimal, as reading the packets from ffmpeg might be blocking |
| 72 | // and the future won't yield for that. So while ffmpeg sits on some blocking |
| 73 | // I/O operation, the caller here will also block and we won't end up polling |
| 74 | // the control_receiver future further down. |
| 75 | let packet_forwarder: OptionFuture<_> = |
| 76 | if playing { Some(packet_forwarder_impl.clone()) } else { None }.into(); |
| 77 | |
| 78 | smol::pin!(packet_forwarder); |
| 79 | |
| 80 | futures::select! { |
| 81 | _ = packet_forwarder => {}, // playback finished |
| 82 | received_command = control_receiver.recv().fuse() => { |
| 83 | match received_command { |
| 84 | Ok(command) => { |
| 85 | video_playback_thread.send_control_message(command).await; |
| 86 | audio_playback_thread.send_control_message(command).await; |
| 87 | match command { |
| 88 | ControlCommand::Play => { |
| 89 | // Continue in the loop, polling the packet forwarder future to forward |
| 90 | // packets |
| 91 | playing = true; |
| 92 | }, |
| 93 | ControlCommand::Pause => { |
| 94 | playing = false; |
| 95 | } |
| 96 | } |
| 97 | } |
| 98 | Err(_) => { |
| 99 | // Channel closed -> quit |
| 100 | return; |
| 101 | } |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | }) |
| 107 | })?; |
| 108 | |
| 109 | let playing = true; |
| 110 | playing_changed_callback(playing); |
| 111 | |
| 112 | Ok(Self { |
| 113 | control_sender, |
| 114 | demuxer_thread: Some(demuxer_thread), |
| 115 | playing, |
| 116 | playing_changed_callback: Box::new(playing_changed_callback), |
| 117 | }) |
| 118 | } |
| 119 | |
| 120 | pub fn toggle_pause_playing(&mut self) { |
| 121 | if self.playing { |
| 122 | self.playing = false; |
| 123 | self.control_sender.send_blocking(ControlCommand::Pause).unwrap(); |
| 124 | } else { |
| 125 | self.playing = true; |
| 126 | self.control_sender.send_blocking(ControlCommand::Play).unwrap(); |
| 127 | } |
| 128 | (self.playing_changed_callback)(self.playing); |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | impl Drop for Player { |
| 133 | fn drop(&mut self) { |
| 134 | self.control_sender.close(); |
| 135 | if let Some(decoder_thread: JoinHandle<()>) = self.demuxer_thread.take() { |
| 136 | decoder_thread.join().unwrap(); |
| 137 | } |
| 138 | } |
| 139 | } |
| 140 | |