Commit d05cf10f authored by hkuang's avatar hkuang

Add error handling for frame parallel decode and unit test for that.

Change-Id: I6e309e11f1641618d2424b7a2c0fe744b8974dec
parent a9a20a10
......@@ -97,7 +97,7 @@ const char *const kVP9InvalidFileTests[] = {
"invalid-vp90-01.webm",
"invalid-vp90-02.webm",
"invalid-vp90-2-00-quantizer-00.webm.ivf.s5861_r01-05_b6-.ivf",
"invalid-vp90-03.webm",
"invalid-vp90-03-v3.webm",
"invalid-vp90-2-00-quantizer-11.webm.ivf.s52984_r01-05_b6-.ivf",
"invalid-vp90-2-00-quantizer-11.webm.ivf.s52984_r01-05_b6-z.ivf",
};
......
......@@ -4,8 +4,8 @@ fe346136b9b8c1e6f6084cc106485706915795e4 invalid-vp90-01.webm
25751f5d3b05ff03f0719ad42cd625348eb8961e invalid-vp90-01.webm.res
d78e2fceba5ac942246503ec8366f879c4775ca5 invalid-vp90-02.webm
2dadee5306245fa5eeb0f99652d0e17afbcba96d invalid-vp90-02.webm.res
df1a1453feb3c00d7d89746c7003b4163523bff3 invalid-vp90-03.webm
8fe6fd82bf537340f586f97a7ae31fb37ccda302 invalid-vp90-03.webm.res
df1a1453feb3c00d7d89746c7003b4163523bff3 invalid-vp90-03-v3.webm
4935c62becc68c13642a03db1e6d3e2331c1c612 invalid-vp90-03-v3.webm.res
a432f96ff0a787268e2f94a8092ab161a18d1b06 park_joy_90p_10_420.y4m
0b194cc312c3a2e84d156a221b0a5eb615dfddc5 park_joy_90p_10_422.y4m
ff0e0a21dc2adc95b8c1b37902713700655ced17 park_joy_90p_10_444.y4m
......@@ -667,3 +667,6 @@ f97088c7359fc8d3d5aa5eafe57bc7308b3ee124 vp90-2-20-big_superframe-01.webm
7c0ed8d04c4d06c5411dd2e5de2411d37f092db5 vp90-2-20-big_superframe-02.webm.md5
667ec8718c982aef6be07eb94f083c2efb9d2d16 vp90-2-07-frame_parallel-1.webm
bfc82bf848e9c05020d61e3ffc1e62f25df81d19 vp90-2-07-frame_parallel-1.webm.md5
efd5a51d175cfdacd169ed23477729dc558030dc invalid-vp90-2-07-frame_parallel-1.webm
9f912712ec418be69adb910e2ca886a63c4cec08 invalid-vp90-2-07-frame_parallel-2.webm
445f5a53ca9555341852997ccdd480a51540bd14 invalid-vp90-2-07-frame_parallel-3.webm
......@@ -786,14 +786,17 @@ LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-01.webm
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-01.webm.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-02.webm
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-02.webm.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-03.webm
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-03.webm.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-03-v3.webm
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-03-v3.webm.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-00-quantizer-00.webm.ivf.s5861_r01-05_b6-.ivf
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-00-quantizer-00.webm.ivf.s5861_r01-05_b6-.ivf.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-00-quantizer-11.webm.ivf.s52984_r01-05_b6-.ivf
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-00-quantizer-11.webm.ivf.s52984_r01-05_b6-.ivf.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-00-quantizer-11.webm.ivf.s52984_r01-05_b6-z.ivf
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-00-quantizer-11.webm.ivf.s52984_r01-05_b6-z.ivf.res
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-07-frame_parallel-1.webm
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-07-frame_parallel-2.webm
LIBVPX_TEST_DATA-$(CONFIG_VP9_DECODER) += invalid-vp90-2-07-frame_parallel-3.webm
ifeq ($(CONFIG_DECODE_PERF_TESTS),yes)
# BBB VP9 streams
......
......@@ -118,5 +118,91 @@ TEST(VP9MultiThreadedFrameParallel, PauseSeekResume) {
DecodeFiles(files);
}
struct InvalidFileList {
const char *name;
// md5 sum for decoded frames which does not include corrupted frames.
const char *expected_md5;
// Expected number of decoded frames which does not include corrupted frames.
const int expected_frame_count;
};
// Decodes |filename| with |num_threads|. Return the md5 of the decoded
// frames which does not include corrupted frames.
string DecodeInvalidFile(const string &filename, int num_threads,
int expected_frame_count) {
libvpx_test::WebMVideoSource video(filename);
video.Init();
vpx_codec_dec_cfg_t cfg = vpx_codec_dec_cfg_t();
cfg.threads = num_threads;
const vpx_codec_flags_t flags = VPX_CODEC_USE_FRAME_THREADING;
libvpx_test::VP9Decoder decoder(cfg, flags, 0);
libvpx_test::MD5 md5;
video.Begin();
int out_frames = 0;
do {
const vpx_codec_err_t res =
decoder.DecodeFrame(video.cxdata(), video.frame_size());
// TODO(hkuang): frame parallel mode should return an error on corruption.
if (res != VPX_CODEC_OK) {
EXPECT_EQ(VPX_CODEC_OK, res) << decoder.DecodeError();
break;
}
video.Next();
// Flush the decoder at the end of the video.
if (!video.cxdata())
decoder.DecodeFrame(NULL, 0);
libvpx_test::DxDataIterator dec_iter = decoder.GetDxData();
const vpx_image_t *img;
// Get decompressed data
while ((img = dec_iter.Next())) {
++out_frames;
md5.Add(img);
}
} while (video.cxdata() != NULL);
EXPECT_EQ(expected_frame_count, out_frames) <<
"Input frame count does not match expected output frame count";
return string(md5.Get());
}
void DecodeInvalidFiles(const InvalidFileList files[]) {
for (const InvalidFileList *iter = files; iter->name != NULL; ++iter) {
SCOPED_TRACE(iter->name);
for (int t = 2; t <= 8; ++t) {
EXPECT_EQ(iter->expected_md5,
DecodeInvalidFile(iter->name, t, iter->expected_frame_count))
<< "threads = " << t;
}
}
}
TEST(VP9MultiThreadedFrameParallel, InvalidFileTest) {
static const InvalidFileList files[] = {
// invalid-vp90-2-07-frame_parallel-1.webm is a 40 frame video file with
// one key frame for every ten frames. The 11th frame has corrupted data.
{ "invalid-vp90-2-07-frame_parallel-1.webm",
"0549d0f45f60deaef8eb708e6c0eb6cb", 30},
// invalid-vp90-2-07-frame_parallel-2.webm is a 40 frame video file with
// one key frame for every ten frames. The 1st and 31st frames have
// corrupted data.
{ "invalid-vp90-2-07-frame_parallel-2.webm",
"6a1f3cf6f9e7a364212fadb9580d525e", 20},
// invalid-vp90-2-07-frame_parallel-3.webm is a 40 frame video file with
// one key frame for every ten frames. The 13th frame has corrupted data.
{ "invalid-vp90-2-07-frame_parallel-3.webm",
"a567c8259d27ad32b1b7f58db5ac89dd", 32},
{ NULL, NULL, 0},
};
DecodeInvalidFiles(files);
}
#endif // CONFIG_WEBM_IO
} // namespace
......@@ -175,7 +175,7 @@ int Reset(VP9Worker *const /*worker*/) { return 1; }
int Sync(VP9Worker *const worker) { return !worker->had_error; }
void Execute(VP9Worker *const worker) {
worker->had_error |= worker->hook(worker->data1, worker->data2);
worker->had_error |= !worker->hook(worker->data1, worker->data2);
}
void Launch(VP9Worker *const worker) { Execute(worker); }
......
......@@ -648,6 +648,7 @@ static void apply_frame_size(VP9_COMMON *cm, int width, int height) {
cm->subsampling_x, cm->subsampling_y, VP9_DEC_BORDER_IN_PIXELS,
&pool->frame_bufs[cm->new_fb_idx].raw_frame_buffer, pool->get_fb_cb,
pool->cb_priv)) {
unlock_buffer_pool(pool);
vpx_internal_error(&cm->error, VPX_CODEC_MEM_ERROR,
"Failed to allocate frame buffer");
}
......@@ -1165,6 +1166,10 @@ static size_t read_uncompressed_header(VP9Decoder *pbi,
}
setup_frame_size(cm, rb);
if (pbi->need_resync) {
vpx_memset(&cm->ref_frame_map, -1, sizeof(cm->ref_frame_map));
pbi->need_resync = 0;
}
} else {
cm->intra_only = cm->show_frame ? 0 : vp9_rb_read_bit(rb);
......@@ -1176,6 +1181,10 @@ static size_t read_uncompressed_header(VP9Decoder *pbi,
pbi->refresh_frame_flags = vp9_rb_read_literal(rb, REF_FRAMES);
setup_frame_size(cm, rb);
if (pbi->need_resync) {
vpx_memset(&cm->ref_frame_map, -1, sizeof(cm->ref_frame_map));
pbi->need_resync = 0;
}
} else {
pbi->refresh_frame_flags = vp9_rb_read_literal(rb, REF_FRAMES);
for (i = 0; i < REFS_PER_FRAME; ++i) {
......@@ -1203,6 +1212,12 @@ static size_t read_uncompressed_header(VP9Decoder *pbi,
}
}
if (pbi->need_resync) {
vpx_internal_error(&cm->error, VPX_CODEC_CORRUPT_FRAME,
"Keyframe / intra-only frame required to reset decoder"
" state");
}
if (!cm->error_resilient_mode) {
cm->coding_use_prev_mi = 1;
cm->refresh_frame_context = vp9_rb_read_bit(rb);
......@@ -1239,6 +1254,7 @@ static size_t read_uncompressed_header(VP9Decoder *pbi,
++frame_bufs[cm->ref_frame_map[ref_index]].ref_count;
}
unlock_buffer_pool(pool);
pbi->hold_ref_buf = 1;
if (frame_is_intra_only(cm) || cm->error_resilient_mode)
vp9_setup_past_independence(cm);
......@@ -1457,9 +1473,7 @@ void vp9_decode_frame(VP9Decoder *pbi,
*p_data_end = decode_tiles(pbi, data + first_partition_size, data_end);
}
new_fb->corrupted |= xd->corrupted;
if (!new_fb->corrupted) {
if (!xd->corrupted) {
if (!cm->error_resilient_mode && !cm->frame_parallel_decoding_mode) {
vp9_adapt_coef_probs(cm);
......@@ -1470,6 +1484,9 @@ void vp9_decode_frame(VP9Decoder *pbi,
} else {
debug_check_frame_counts(cm);
}
} else {
vpx_internal_error(&cm->error, VPX_CODEC_CORRUPT_FRAME,
"Decode failed. Frame data is corrupted.");
}
// Non frame parallel update frame context here.
......
......@@ -58,6 +58,7 @@ VP9Decoder *vp9_decoder_create(BufferPool *const pool) {
}
cm->error.setjmp = 1;
pbi->need_resync = 1;
initialize_dec();
vp9_rtcd();
......@@ -197,16 +198,6 @@ int vp9_get_reference_dec(VP9Decoder *pbi, int index, YV12_BUFFER_CONFIG **fb) {
return 0;
}
static INLINE void decrease_ref_count(int idx, RefCntBuffer *const frame_bufs,
BufferPool *const pool) {
if (idx >= 0) {
--frame_bufs[idx].ref_count;
if (frame_bufs[idx].ref_count == 0) {
pool->release_fb_cb(pool->cb_priv, &frame_bufs[idx].raw_frame_buffer);
}
}
}
/* If any buffer updating is signaled it should be done here. */
static void swap_frame_buffers(VP9Decoder *pbi) {
int ref_index = 0, mask;
......@@ -235,7 +226,7 @@ static void swap_frame_buffers(VP9Decoder *pbi) {
cm->ref_frame_map[ref_index] = cm->next_ref_frame_map[ref_index];
}
unlock_buffer_pool(pool);
pbi->hold_ref_buf = 0;
cm->frame_to_show = get_frame_new_buffer(cm);
if (!pbi->frame_parallel_decode || !cm->show_frame) {
......@@ -256,7 +247,6 @@ int vp9_receive_compressed_data(VP9Decoder *pbi,
RefCntBuffer *const frame_bufs = cm->buffer_pool->frame_bufs;
const uint8_t *source = *psource;
int retcode = 0;
cm->error.error_code = VPX_CODEC_OK;
if (size == 0) {
......@@ -282,7 +272,7 @@ int vp9_receive_compressed_data(VP9Decoder *pbi,
&frame_bufs[cm->new_fb_idx].raw_frame_buffer);
cm->new_fb_idx = get_free_fb(cm);
pbi->hold_ref_buf = 0;
if (pbi->frame_parallel_decode) {
VP9Worker *const worker = pbi->frame_worker_owner;
vp9_frameworker_lock_stats(worker);
......@@ -300,18 +290,35 @@ int vp9_receive_compressed_data(VP9Decoder *pbi,
cm->error.setjmp = 0;
pbi->ready_for_new_data = 1;
// We do not know if the missing frame(s) was supposed to update
// any of the reference buffers, but we act conservative and
// mark only the last buffer as corrupted.
//
// TODO(jkoleszar): Error concealment is undefined and non-normative
// at this point, but if it becomes so, [0] may not always be the correct
// thing to do here.
if (cm->frame_refs[0].idx != INT_MAX && cm->frame_refs[0].buf != NULL)
cm->frame_refs[0].buf->corrupted = 1;
if (frame_bufs[cm->new_fb_idx].ref_count > 0)
--frame_bufs[cm->new_fb_idx].ref_count;
lock_buffer_pool(pool);
// Release all the reference buffers if worker thread is holding them.
if (pbi->hold_ref_buf == 1) {
int ref_index = 0, mask;
VP9_COMMON *const cm = &pbi->common;
BufferPool *const pool = cm->buffer_pool;
RefCntBuffer *const frame_bufs = cm->buffer_pool->frame_bufs;
for (mask = pbi->refresh_frame_flags; mask; mask >>= 1) {
const int old_idx = cm->ref_frame_map[ref_index];
// Current thread releases the holding of reference frame.
decrease_ref_count(old_idx, frame_bufs, pool);
// Release the reference frame in reference map.
if ((mask & 1) && old_idx >= 0) {
decrease_ref_count(old_idx, frame_bufs, pool);
}
++ref_index;
}
// Current thread releases the holding of reference frame.
for (; ref_index < REF_FRAMES && !cm->show_existing_frame; ++ref_index) {
const int old_idx = cm->ref_frame_map[ref_index];
decrease_ref_count(old_idx, frame_bufs, pool);
}
pbi->hold_ref_buf = 0;
}
// Release current frame.
decrease_ref_count(cm->new_fb_idx, frame_bufs, pool);
unlock_buffer_pool(pool);
return -1;
}
......
......@@ -65,6 +65,8 @@ typedef struct VP9Decoder {
int max_threads;
int inv_tile_order;
int need_resync; // wait for key/intra-only frame.
int hold_ref_buf; // hold the reference buffer.
} VP9Decoder;
int vp9_receive_compressed_data(struct VP9Decoder *pbi,
......@@ -88,6 +90,21 @@ struct VP9Decoder *vp9_decoder_create(BufferPool *const pool);
void vp9_decoder_remove(struct VP9Decoder *pbi);
static INLINE void decrease_ref_count(int idx, RefCntBuffer *const frame_bufs,
BufferPool *const pool) {
if (idx >= 0) {
--frame_bufs[idx].ref_count;
// A worker may only get a free framebuffer index when calling get_free_fb.
// But the private buffer is not set up until finish decoding header.
// So any error happens during decoding header, the frame_bufs will not
// have valid priv buffer.
if (frame_bufs[idx].ref_count == 0 &&
frame_bufs[idx].raw_frame_buffer.priv) {
pool->release_fb_cb(pool->cb_priv, &frame_bufs[idx].raw_frame_buffer);
}
}
}
#ifdef __cplusplus
} // extern "C"
#endif
......
......@@ -320,7 +320,7 @@ void vp9_frameworker_wait(VP9Worker *const worker, RefCntBuffer *const ref_buf,
// Enabling the following line of code will get harmless tsan error but
// will get best performance.
// if (ref_buf->row >= row) return;
// if (ref_buf->row >= row && ref_buf->buf.corrupted != 1) return;
{
// Find the worker thread that owns the reference frame. If the reference
......@@ -340,10 +340,19 @@ void vp9_frameworker_wait(VP9Worker *const worker, RefCntBuffer *const ref_buf,
#endif
vp9_frameworker_lock_stats(ref_worker);
while (ref_buf->row < row && pbi->cur_buf == ref_buf) {
while (ref_buf->row < row && pbi->cur_buf == ref_buf &&
ref_buf->buf.corrupted != 1) {
pthread_cond_wait(&ref_worker_data->stats_cond,
&ref_worker_data->stats_mutex);
}
if (ref_buf->buf.corrupted == 1) {
FrameWorkerData *const worker_data = (FrameWorkerData *)worker->data1;
vp9_frameworker_unlock_stats(ref_worker);
vpx_internal_error(&worker_data->pbi->common.error,
VPX_CODEC_CORRUPT_FRAME,
"Worker %p failed to decode frame", worker);
}
vp9_frameworker_unlock_stats(ref_worker);
}
#else
......@@ -358,8 +367,11 @@ void vp9_frameworker_broadcast(RefCntBuffer *const buf, int row) {
VP9Worker *worker = buf->frame_worker_owner;
#ifdef DEBUG_THREAD
printf("%d %p worker decode to (%d) \r\n", worker_data->worker_id,
buf->frame_worker_owner, row);
{
FrameWorkerData *const worker_data = (FrameWorkerData *)worker->data1;
printf("%d %p worker decode to (%d) \r\n", worker_data->worker_id,
buf->frame_worker_owner, row);
}
#endif
vp9_frameworker_lock_stats(worker);
......@@ -403,7 +415,7 @@ void vp9_frameworker_copy_context(VP9Worker *const dst_worker,
dst_cm->prev_mi_grid_visible = src_cm->mi_grid_visible;
dst_cm->last_frame_seg_map = src_cm->current_frame_seg_map;
}
dst_worker_data->pbi->need_resync = src_worker_data->pbi->need_resync;
vp9_frameworker_unlock_stats(src_worker);
dst_worker_data->pbi->prev_buf =
......
......@@ -290,11 +290,37 @@ static int frame_worker_hook(void *arg1, void *arg2) {
FrameWorkerData *const frame_worker_data = (FrameWorkerData *)arg1;
const uint8_t *data = frame_worker_data->data;
(void)arg2;
frame_worker_data->result =
vp9_receive_compressed_data(frame_worker_data->pbi,
frame_worker_data->data_size,
&data);
frame_worker_data->data_end = data;
if (frame_worker_data->pbi->frame_parallel_decode) {
// In frame parallel decoding, a worker thread must successfully decode all
// the compressed data.
if (frame_worker_data->result != 0 ||
frame_worker_data->data + frame_worker_data->data_size - 1 > data) {
VP9Worker *const worker = frame_worker_data->pbi->frame_worker_owner;
BufferPool *const pool = frame_worker_data->pbi->common.buffer_pool;
// Signal all the other threads that are waiting for this frame.
vp9_frameworker_lock_stats(worker);
frame_worker_data->frame_context_ready = 1;
lock_buffer_pool(pool);
frame_worker_data->pbi->cur_buf->buf.corrupted = 1;
unlock_buffer_pool(pool);
frame_worker_data->pbi->need_resync = 1;
vp9_frameworker_signal_stats(worker);
vp9_frameworker_unlock_stats(worker);
return 0;
}
} else if (frame_worker_data->result != 0) {
// Check decode result in serial decode.
frame_worker_data->pbi->cur_buf->buf.corrupted = 1;
frame_worker_data->pbi->need_resync = 1;
}
return !frame_worker_data->result;
}
......@@ -444,7 +470,6 @@ static vpx_codec_err_t decode_one(vpx_codec_alg_priv_t *ctx,
&ctx->frame_workers[ctx->last_submit_worker_id]);
frame_worker_data->pbi->ready_for_new_data = 0;
// Copy the compressed data into worker's internal buffer.
// TODO(hkuang): Will all the workers allocate the same size
// as the size of the first intra frame be better? This will
......@@ -474,6 +499,7 @@ static vpx_codec_err_t decode_one(vpx_codec_alg_priv_t *ctx,
(ctx->next_submit_worker_id + 1) % ctx->num_frame_workers;
--ctx->available_threads;
worker->had_error = 0;
winterface->launch(worker);
}
......@@ -714,11 +740,7 @@ static void release_last_output_frame(vpx_codec_alg_priv_t *ctx) {
if (ctx->frame_parallel_decode && ctx->last_show_frame >= 0) {
BufferPool *const pool = ctx->buffer_pool;
lock_buffer_pool(pool);
--frame_bufs[ctx->last_show_frame].ref_count;
if (frame_bufs[ctx->last_show_frame].ref_count == 0) {
pool->release_fb_cb(pool->cb_priv,
&frame_bufs[ctx->last_show_frame].raw_frame_buffer);
}
decrease_ref_count(ctx->last_show_frame, frame_bufs, pool);
unlock_buffer_pool(pool);
}
}
......@@ -758,8 +780,12 @@ static vpx_image_t *decoder_get_frame(vpx_codec_alg_priv_t *ctx,
ctx->next_output_worker_id =
(ctx->next_output_worker_id + 1) % ctx->num_frame_workers;
// Wait for the frame from worker thread.
winterface->sync(worker);
if (vp9_get_raw_frame(frame_worker_data->pbi, &sd, &flags) == 0) {
if (!winterface->sync(worker)) {
// Decoding failed. Release the worker thread.
++ctx->available_threads;
if (ctx->flushed != 1)
return img;
} else if (vp9_get_raw_frame(frame_worker_data->pbi, &sd, &flags) == 0) {
VP9_COMMON *const cm = &frame_worker_data->pbi->common;
RefCntBuffer *const frame_bufs = cm->buffer_pool->frame_bufs;
++ctx->available_threads;
......
......@@ -26,7 +26,7 @@ extern "C" {
* Each thread will use one work buffer.
* TODO(hkuang): Add support to set number of worker threads dynamically.
*/
#define VPX_MAXIMUM_WORK_BUFFERS 4
#define VPX_MAXIMUM_WORK_BUFFERS 8
/*!\brief The maximum number of reference buffers that a VP9 encoder may use.
*/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment