From 58fa33d121fb6e96b253fffb60fc6968baf385ff Mon Sep 17 00:00:00 2001 From: Andrew Eikum Date: Thu, 11 Nov 2021 11:16:06 -0600 Subject: [PATCH] media-converter: Clean up already-converted entries CW-Bug-Id: #19614 --- media-converter/src/audioconv.rs | 131 ++++++++++++++++++++++++++----- media-converter/src/fossilize.rs | 123 +++++++++++++++++++++++++++++ media-converter/src/videoconv.rs | 96 ++++++++++++++++++---- 3 files changed, 318 insertions(+), 32 deletions(-) diff --git a/media-converter/src/audioconv.rs b/media-converter/src/audioconv.rs index 5d5ec232..291cc06d 100644 --- a/media-converter/src/audioconv.rs +++ b/media-converter/src/audioconv.rs @@ -47,6 +47,7 @@ use std::io; use std::io::Read; use std::fs; use std::fs::OpenOptions; +use std::collections::HashSet; #[cfg(target_arch = "x86")] use crate::murmur3_x86_128::murmur3_x86_128_full as murmur3_128_full; @@ -176,8 +177,8 @@ const _AUDIOCONV_FLAG_RESERVED2: u32 = 0x40000000; /* not yet used */ const _AUDIOCONV_FLAG_V2: u32 = 0x80000000; /* indicates a "version 2" header, process somehow differently (TBD) */ /* properties of the "blank" audio file */ -const BLANK_AUDIO_FILE_LENGTH_MS: f32 = 10.0; -const BLANK_AUDIO_FILE_RATE: f32 = 48000.0; +const _BLANK_AUDIO_FILE_LENGTH_MS: f32 = 10.0; +const _BLANK_AUDIO_FILE_RATE: f32 = 48000.0; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -186,22 +187,112 @@ static CAT: Lazy = Lazy::new(|| { Some("Proton audio converter")) }); -static DUMP_FOZDB: Lazy>> = Lazy::new(|| { - let dump_file_path = match std::env::var("MEDIACONV_AUDIO_DUMP_FILE") { - Err(_) => { return Mutex::new(None); }, - Ok(c) => c, - }; +struct AudioConverterDumpFozdb { + fozdb: Option, + already_cleaned: bool, +} - let dump_file_path = std::path::Path::new(&dump_file_path); - - if fs::create_dir_all(&dump_file_path.parent().unwrap()).is_err() { - return Mutex::new(None); +impl AudioConverterDumpFozdb { + fn new() -> Self { + Self { + fozdb: None, + already_cleaned: false, + } } - match fossilize::StreamArchive::new(&dump_file_path, OpenOptions::new().write(true).read(true).create(true), AUDIOCONV_FOZ_NUM_TAGS) { - Ok(newdb) => Mutex::new(Some(newdb)), - Err(_) => Mutex::new(None), + fn open(&mut self, create: bool) -> &mut Self { + if self.fozdb.is_none() { + let dump_file_path = match std::env::var("MEDIACONV_AUDIO_DUMP_FILE") { + Err(_) => { return self; }, + Ok(c) => c, + }; + + let dump_file_path = std::path::Path::new(&dump_file_path); + + if fs::create_dir_all(&dump_file_path.parent().unwrap()).is_err() { + return self; + } + + match fossilize::StreamArchive::new(&dump_file_path, OpenOptions::new().write(true).read(true).create(create), AUDIOCONV_FOZ_NUM_TAGS) { + Ok(newdb) => { + self.fozdb = Some(newdb); + }, + Err(_) => { + return self; + }, + } + } + self } + + fn close(&mut self) { + self.fozdb = None + } + + fn discard_transcoded(&mut self) { + if self.already_cleaned { + return; + } + if let Some(fozdb) = &mut self.open(false).fozdb { + if let Ok(read_fozdb_path) = std::env::var("MEDIACONV_AUDIO_TRANSCODED_FILE") { + if let Ok(read_fozdb) = fossilize::StreamArchive::new(&read_fozdb_path, OpenOptions::new().read(true), AUDIOCONV_FOZ_NUM_TAGS) { + let mut chunks_to_discard = HashSet::<(u32, u128)>::new(); + let mut chunks_to_keep = HashSet::<(u32, u128)>::new(); + + for stream_id in fozdb.iter_tag(AUDIOCONV_FOZ_TAG_STREAM).cloned().collect::>() { + if let Ok(chunks_size) = fozdb.entry_size(AUDIOCONV_FOZ_TAG_STREAM, stream_id) { + let mut buf = vec![0u8; chunks_size].into_boxed_slice(); + if fozdb.read_entry(AUDIOCONV_FOZ_TAG_STREAM, stream_id, 0, &mut buf, fossilize::CRCCheck::WithCRC).is_ok() { + + let mut has_all = true; + let mut stream_chunks = Vec::<(u32, u128)>::new(); + + for i in 0..(chunks_size / 16) { + let offs = i * 16; + let chunk_id = u128::from_le_bytes(copy_into_array(&buf[offs..offs + 16])); + + if !read_fozdb.has_entry(AUDIOCONV_FOZ_TAG_PTNADATA, chunk_id) { + has_all = false; + break; + } + + stream_chunks.push((AUDIOCONV_FOZ_TAG_AUDIODATA, chunk_id)); + } + + for x in stream_chunks { + if has_all { + chunks_to_discard.insert(x); + chunks_to_discard.insert((AUDIOCONV_FOZ_TAG_CODECINFO, x.1)); + } else { + chunks_to_keep.insert(x); + chunks_to_keep.insert((AUDIOCONV_FOZ_TAG_CODECINFO, x.1)); + } + } + + if has_all { + chunks_to_discard.insert((AUDIOCONV_FOZ_TAG_STREAM, stream_id)); + } + } + } + } + + let mut chunks = Vec::<(u32, u128)>::new(); + for x in chunks_to_discard.difference(&chunks_to_keep) { + chunks.push(*x); + } + + if fozdb.discard_entries(&chunks).is_err() { + self.close(); + } + } + } + } + self.already_cleaned = true; + } +} + +static DUMP_FOZDB: Lazy> = Lazy::new(|| { + Mutex::new(AudioConverterDumpFozdb::new()) }); static DUMPING_DISABLED: Lazy = Lazy::new(|| { @@ -333,8 +424,9 @@ impl StreamState { fn write_to_foz(&self) -> Result<(), gst::LoggableError> { if self.needs_dump && !self.buffers.is_empty() { - let mut db = (*DUMP_FOZDB).lock().unwrap(); - let db = match &mut *db { + let db = &mut (*DUMP_FOZDB).lock().unwrap(); + let mut db = &mut db.open(true).fozdb; + let db = match &mut db { Some(d) => d, None => { return Err(gst_loggable_error!(CAT, "Failed to open fossilize db!")) }, }; @@ -660,8 +752,11 @@ impl ElementImpl for AudioConv { { /* open fozdb here; this is the right place to fail and opening may be * expensive */ - let db = (*DUMP_FOZDB).lock().unwrap(); - if (*db).is_none() { + (*DUMP_FOZDB).lock().unwrap().discard_transcoded(); + + let db = &mut (*DUMP_FOZDB).lock().unwrap(); + let db = &mut db.open(true).fozdb; + if db.is_none() { gst_error!(CAT, "Failed to open fossilize db!"); return Err(gst::StateChangeError); } diff --git a/media-converter/src/fossilize.rs b/media-converter/src/fossilize.rs index d9f10421..79248ba1 100644 --- a/media-converter/src/fossilize.rs +++ b/media-converter/src/fossilize.rs @@ -427,4 +427,127 @@ impl StreamArchive { Ok(()) } + + /* rewrites the database, discarding entries listed in 'to_discard' */ + pub fn discard_entries(&mut self, to_discard: &Vec<(FossilizeTag, FossilizeHash)>) -> Result<(), Error> { + self.write_pos = self.file.seek(io::SeekFrom::Start(0))?; + for v in self.seen_blobs.iter_mut() { + v.clear(); + } + + let mut magic_and_version = [0 as u8; MAGIC_LEN_BYTES]; + self.file.read_exact(&mut magic_and_version)?; + + let version = magic_and_version[15]; + + if magic_and_version[0..12] != FOSSILIZE_MAGIC || + version < FOSSILIZE_MIN_COMPAT_VERSION || + version > FOSSILIZE_VERSION { + return Err(Error::CorruptDatabase); + } + + self.write_pos = MAGIC_LEN_BYTES as u64; + + loop { + let mut name_and_header = [0u8; PAYLOAD_NAME_LEN_BYTES + PAYLOAD_HEADER_LEN_BYTES]; + let res = self.file.read_exact(&mut name_and_header); + + if let Err(fail) = res { + if fail.kind() == io::ErrorKind::UnexpectedEof { + break; + } + return Err(Error::IOError(fail)); + } + + let name = &name_and_header[0..PAYLOAD_NAME_LEN_BYTES]; + + let tag = FossilizeTag::from_ascii_bytes(&name[0..FOSSILIZETAG_ASCII_LEN])?; + let hash = FossilizeHash::from_ascii_bytes(&name[FOSSILIZETAG_ASCII_LEN..])?; + + let payload_entry = PayloadEntry::new_from_slice( + self.file.seek(io::SeekFrom::Current(0))?, + &name_and_header[PAYLOAD_NAME_LEN_BYTES..] + ); + + if to_discard.contains(&(tag, hash)) { + /* skip over this entry */ + let res = self.file.seek(io::SeekFrom::Current(payload_entry.payload_info.size as i64)); + match res { + Ok(_) => { + }, + + Err(e) => { + /* truncated chunk is not fatal */ + if e.kind() != io::ErrorKind::UnexpectedEof { + return Err(Error::IOError(e)); + } + }, + } + } else { + let mut read_pos = self.file.seek(io::SeekFrom::Current(0))?; + if self.write_pos == read_pos - name_and_header.len() as u64 { + /* if we haven't dropped any chunks, we can just skip it rather than rewrite it */ + let res = self.file.seek(io::SeekFrom::Current(payload_entry.payload_info.size as i64)); + match res { + Ok(p) => { + self.write_pos = p; + }, + + Err(e) => { + /* truncated chunk is not fatal */ + if e.kind() != io::ErrorKind::UnexpectedEof { + return Err(Error::IOError(e)); + } + }, + } + } else { + /* we're offset, so we have to rewrite */ + self.file.seek(io::SeekFrom::Start(self.write_pos))?; + + { + /* write header */ + let mut name = [0u8; PAYLOAD_NAME_LEN_BYTES]; + name[0..FOSSILIZETAG_ASCII_LEN].copy_from_slice(&tag.to_ascii_bytes()); + name[FOSSILIZETAG_ASCII_LEN..].copy_from_slice(&hash.to_ascii_bytes()); + self.file.write_all(&name)?; + self.write_pos += name.len() as u64; + + let buf = payload_entry.payload_info.to_slice(); + self.file.write_all(&buf)?; + self.write_pos += buf.len() as u64; + } + + /* copy contents */ + const BUFFER_COPY_BYTES: usize = 8 * 1024 * 1024; /* tuneable */ + let mut buf = box_array![0u8; BUFFER_COPY_BYTES]; + let end_read = read_pos + payload_entry.payload_info.size as u64; + loop { + let to_read = std::cmp::min((end_read - read_pos) as usize, BUFFER_COPY_BYTES); + if to_read == 0 { + break; + } + + self.file.seek(io::SeekFrom::Start(read_pos))?; + + let readed = self.file.read(&mut (*buf)[0..to_read])?; + if readed == 0 { + break; + } + + read_pos += readed as u64; + + self.file.seek(io::SeekFrom::Start(self.write_pos))?; + self.file.write_all(&buf[0..readed])?; + self.write_pos += readed as u64; + } + + self.file.seek(io::SeekFrom::Start(read_pos))?; + } + } + } + + self.file.set_len(self.write_pos)?; + + self.prepare() + } } diff --git a/media-converter/src/videoconv.rs b/media-converter/src/videoconv.rs index 658452fb..a123ef04 100644 --- a/media-converter/src/videoconv.rs +++ b/media-converter/src/videoconv.rs @@ -35,6 +35,7 @@ use glib::subclass::prelude::*; use crate::format_hash; use crate::HASH_SEED; use crate::box_array; +use crate::copy_into_array; use crate::BufferedReader; use gst; @@ -112,22 +113,85 @@ const VIDEOCONV_FOZ_TAG_OGVDATA: u32 = 1; const VIDEOCONV_FOZ_TAG_STREAM: u32 = 2; const VIDEOCONV_FOZ_NUM_TAGS: usize = 3; -static DUMP_FOZDB: Lazy>> = Lazy::new(|| { - let dump_file_path = match std::env::var("MEDIACONV_VIDEO_DUMP_FILE") { - Err(_) => { return Mutex::new(None); }, - Ok(c) => c, - }; +struct VideoConverterDumpFozdb { + fozdb: Option, + already_cleaned: bool, +} - let dump_file_path = std::path::Path::new(&dump_file_path); - - if fs::create_dir_all(&dump_file_path.parent().unwrap()).is_err() { - return Mutex::new(None); +impl VideoConverterDumpFozdb { + fn new() -> Self { + Self { + fozdb: None, + already_cleaned: false, + } } - match fossilize::StreamArchive::new(&dump_file_path, OpenOptions::new().write(true).read(true).create(true), VIDEOCONV_FOZ_NUM_TAGS) { - Ok(newdb) => Mutex::new(Some(newdb)), - Err(_) => Mutex::new(None), + fn open(&mut self, create: bool) -> &mut Self { + if self.fozdb.is_none() { + let dump_file_path = match std::env::var("MEDIACONV_VIDEO_DUMP_FILE") { + Err(_) => { return self; }, + Ok(c) => c, + }; + + let dump_file_path = std::path::Path::new(&dump_file_path); + + if fs::create_dir_all(&dump_file_path.parent().unwrap()).is_err() { + return self; + } + + match fossilize::StreamArchive::new(&dump_file_path, OpenOptions::new().write(true).read(true).create(create), VIDEOCONV_FOZ_NUM_TAGS) { + Ok(newdb) => { + self.fozdb = Some(newdb); + }, + Err(_) => { + return self; + }, + } + } + self } + + fn close(&mut self) { + self.fozdb = None + } + + fn discard_transcoded(&mut self) { + if self.already_cleaned { + return + } + if let Some(fozdb) = &mut self.open(false).fozdb { + if let Ok(read_fozdb_path) = std::env::var("MEDIACONV_VIDEO_TRANSCODED_FILE") { + if let Ok(read_fozdb) = fossilize::StreamArchive::new(&read_fozdb_path, OpenOptions::new().read(true), VIDEOCONV_FOZ_NUM_TAGS) { + let mut chunks = Vec::<(u32, u128)>::new(); + + for stream_id in fozdb.iter_tag(VIDEOCONV_FOZ_TAG_STREAM).cloned().collect::>() { + if read_fozdb.has_entry(VIDEOCONV_FOZ_TAG_OGVDATA, stream_id) { + if let Ok(chunks_size) = fozdb.entry_size(VIDEOCONV_FOZ_TAG_STREAM, stream_id) { + let mut buf = vec![0u8; chunks_size].into_boxed_slice(); + if fozdb.read_entry(VIDEOCONV_FOZ_TAG_STREAM, stream_id, 0, &mut buf, fossilize::CRCCheck::WithCRC).is_ok() { + for i in 0..(chunks_size / 16) { + let offs = i * 16; + let chunk_id = u128::from_le_bytes(copy_into_array(&buf[offs..offs + 16])); + chunks.push((VIDEOCONV_FOZ_TAG_VIDEODATA, chunk_id)); + } + } + } + chunks.push((VIDEOCONV_FOZ_TAG_STREAM, stream_id)); + } + } + + if fozdb.discard_entries(&chunks).is_err() { + self.close(); + } + } + } + } + self.already_cleaned = true; + } +} + +static DUMP_FOZDB: Lazy> = Lazy::new(|| { + Mutex::new(VideoConverterDumpFozdb::new()) }); struct PadReader<'a> { @@ -611,8 +675,10 @@ impl VideoConv { } fn dump_upstream_data(&self, hash: u128) -> io::Result<()> { - let mut db = (*DUMP_FOZDB).lock().unwrap(); - let db = match &mut *db { + + let db = &mut (*DUMP_FOZDB).lock().unwrap(); + let mut db = &mut db.open(true).fozdb; + let db = match &mut db { Some(d) => d, None => { gst_error!(CAT, "Unable to open fozdb!"); return Err(io::Error::new(io::ErrorKind::Other, "unable to open fozdb")); }, }; @@ -646,6 +712,8 @@ impl VideoConv { state: &mut VideoConvState ) -> Result<(), gst::LoggableError> { + (*DUMP_FOZDB).lock().unwrap().discard_transcoded(); + let hash = self.hash_upstream_data(); if let Ok(hash) = hash {