format_ebml.c 13.1 KB
Newer Older
1 2
/* Icecast
 *
3
 * This program is distributed under the GNU General Public License,
4 5 6
 * version 2. A copy of this license is included with this source.
 * At your option, this specific source file can also be distributed
 * under the GNU GPL version 3.
7
 *
8
 * Copyright 2012,      David Richards, Mozilla Foundation,
9
 *                      and others (see AUTHORS for details).
10
 * Copyright 2014,      Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>.
11 12 13 14
 */

/* format_ebml.c
 *
giles's avatar
giles committed
15
 * format plugin for WebM/EBML
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
 *
 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "refbuf.h"
#include "source.h"
#include "client.h"

#include "stats.h"
#include "format.h"
#include "format_ebml.h"

#define CATMODULE "format-ebml"

#include "logging.h"

#define EBML_HEADER_MAX_SIZE 131072
#define EBML_SLICE_SIZE 4096


typedef struct ebml_client_data_st ebml_client_data_t;

struct ebml_client_data_st {

    refbuf_t *header;
    int header_pos;

};

struct ebml_st {

    char *cluster_id;
    int cluster_start;
56

57 58 59 60 61 62 63 64 65 66 67 68
    int position;
    unsigned char *input_buffer;
    unsigned char *buffer;

    int header_read;
    int header_size;
    int header_position;
    int header_read_position;
    unsigned char *header;

};

69 70 71 72 73 74
static void ebml_free_plugin(format_plugin_t *plugin);
static refbuf_t *ebml_get_buffer(source_t *source);
static int ebml_write_buf_to_client(client_t *client);
static void ebml_write_buf_to_file(source_t *source, refbuf_t *refbuf);
static int ebml_create_client_data(source_t *source, client_t *client);
static void ebml_free_client_data(client_t *client);
75 76 77 78 79 80 81 82 83

static ebml_t *ebml_create();
static void ebml_destroy(ebml_t *ebml);
static int ebml_read_space(ebml_t *ebml);
static int ebml_read(ebml_t *ebml, char *buffer, int len);
static int ebml_last_was_sync(ebml_t *ebml);
static char *ebml_write_buffer(ebml_t *ebml, int len);
static int ebml_wrote(ebml_t *ebml, int len);

84
int format_ebml_get_plugin(source_t *source)
85 86 87 88 89 90 91 92 93 94 95 96 97
{

    ebml_source_state_t *ebml_source_state = calloc(1, sizeof(ebml_source_state_t));
    format_plugin_t *plugin = calloc(1, sizeof(format_plugin_t));

    plugin->get_buffer = ebml_get_buffer;
    plugin->write_buf_to_client = ebml_write_buf_to_client;
    plugin->create_client_data = ebml_create_client_data;
    plugin->free_plugin = ebml_free_plugin;
    plugin->write_buf_to_file = ebml_write_buf_to_file;
    plugin->set_tag = NULL;
    plugin->apply_settings = NULL;

98
    plugin->contenttype = httpp_getvar(source->parser, "content-type");
99 100

    plugin->_state = ebml_source_state;
101
    vorbis_comment_init(&plugin->vc);
102 103 104
    source->format = plugin;

    ebml_source_state->ebml = ebml_create();
105
    
106 107 108
    return 0;
}

109
static void ebml_free_plugin(format_plugin_t *plugin)
110 111 112 113
{

    ebml_source_state_t *ebml_source_state = plugin->_state;

114
    refbuf_release(ebml_source_state->header);
115
    ebml_destroy(ebml_source_state->ebml);
116
    free(ebml_source_state);
117
    vorbis_comment_clear(&plugin->vc);
118
    free(plugin);
119 120
}

121 122
/* Write to a client from the header buffer.
 */
123
static int send_ebml_header(client_t *client)
124 125 126 127 128 129
{

    ebml_client_data_t *ebml_client_data = client->format_data;
    int len = EBML_SLICE_SIZE;
    int ret;

130
    if (ebml_client_data->header->len - ebml_client_data->header_pos < len)
131 132 133
    {
        len = ebml_client_data->header->len - ebml_client_data->header_pos;
    }
134
    ret = client_send_bytes (client,
135 136 137 138 139 140 141 142 143 144 145 146
                             ebml_client_data->header->data + ebml_client_data->header_pos,
                             len);

    if (ret > 0)
    {
        ebml_client_data->header_pos += ret;
    }

    return ret;

}

147 148
/* Initial write-to-client function.
 */
149 150 151 152 153 154 155 156 157 158 159
static int ebml_write_buf_to_client (client_t *client)
{

    ebml_client_data_t *ebml_client_data = client->format_data;

    if (ebml_client_data->header_pos != ebml_client_data->header->len)
    {
        return send_ebml_header (client);
    }
    else
    {
160 161
        /* Now that the header's sent, short-circuit to the generic
         * write-refbufs function. */
162 163 164 165 166 167
        client->write_to_client = format_generic_write_to_client;
        return client->write_to_client(client);
    }

}

168 169
/* Return a refbuf to add to the queue.
 */
170
static refbuf_t *ebml_get_buffer(source_t *source)
171 172 173 174 175 176 177 178 179 180 181 182 183 184
{

    ebml_source_state_t *ebml_source_state = source->format->_state;
    format_plugin_t *format = source->format;
    char *data = NULL;
    int bytes = 0;
    refbuf_t *refbuf;
    int ret;

    while (1)
    {

        if ((bytes = ebml_read_space(ebml_source_state->ebml)) > 0)
        {
185
            /* A chunk is available for reading */
186 187 188 189 190
            refbuf = refbuf_new(bytes);
            ebml_read(ebml_source_state->ebml, refbuf->data, bytes);

            if (ebml_source_state->header == NULL)
            {
191
                /* Capture header before adding clusters to the queue */
192 193 194 195 196 197 198 199 200 201 202 203 204
                ebml_source_state->header = refbuf;
                continue;
            }

            if (ebml_last_was_sync(ebml_source_state->ebml))
            {
                refbuf->sync_point = 1;
            }
            return refbuf;

        }
        else
        {
205
            /* Feed more bytes into the parser */
206 207 208 209 210 211 212 213 214 215
            data = ebml_write_buffer(ebml_source_state->ebml, EBML_SLICE_SIZE);
            bytes = client_read_bytes (source->client, data, EBML_SLICE_SIZE);
            if (bytes <= 0)
            {
                ebml_wrote (ebml_source_state->ebml, 0);
                return NULL;
            }
            format->read_bytes += bytes;
            ret = ebml_wrote (ebml_source_state->ebml, bytes);
            if (ret != bytes) {
216
                ICECAST_LOG_ERROR("Problem processing stream");
217 218 219 220 221 222 223
                source->running = 0;
                return NULL;
            }
        }
    }
}

224 225
/* Initialize client state.
 */
226
static int ebml_create_client_data(source_t *source, client_t *client)
227
{
228
    ebml_client_data_t *ebml_client_data;
229 230
    ebml_source_state_t *ebml_source_state = source->format->_state;

231 232
    if (!ebml_source_state->header)
        return -1;
233

234 235 236
    ebml_client_data = calloc(1, sizeof(ebml_client_data_t));
    if (!ebml_client_data)
        return -1;
237

238 239 240 241 242
    ebml_client_data->header = ebml_source_state->header;
    refbuf_addref(ebml_client_data->header);
    client->format_data = ebml_client_data;
    client->free_client_data = ebml_free_client_data;
    return 0;
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
}


static void ebml_free_client_data (client_t *client)
{

    ebml_client_data_t *ebml_client_data = client->format_data;

    refbuf_release (ebml_client_data->header);
    free (client->format_data);
    client->format_data = NULL;
}


static void ebml_write_buf_to_file_fail (source_t *source)
{
259
    ICECAST_LOG_WARN("Write to dump file failed, disabling");
260 261 262 263 264 265 266 267 268 269 270 271 272
    fclose (source->dumpfile);
    source->dumpfile = NULL;
}


static void ebml_write_buf_to_file (source_t *source, refbuf_t *refbuf)
{

    ebml_source_state_t *ebml_source_state = source->format->_state;

    if (ebml_source_state->file_headers_written == 0)
    {
        if (fwrite (ebml_source_state->header->data, 1,
273
                    ebml_source_state->header->len,
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
                    source->dumpfile) != ebml_source_state->header->len)
            ebml_write_buf_to_file_fail(source);
        else
            ebml_source_state->file_headers_written = 1;
    }

    if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) != refbuf->len)
    {
        ebml_write_buf_to_file_fail(source);
    }

}


/* internal ebml parsing */

static void ebml_destroy(ebml_t *ebml)
{

    free(ebml->header);
    free(ebml->input_buffer);
    free(ebml->buffer);
    free(ebml);

}

static ebml_t *ebml_create()
{

    ebml_t *ebml = calloc(1, sizeof(ebml_t));

    ebml->header = calloc(1, EBML_HEADER_MAX_SIZE);
    ebml->buffer = calloc(1, EBML_SLICE_SIZE * 4);
    ebml->input_buffer = calloc(1, EBML_SLICE_SIZE);

    ebml->cluster_id = "\x1F\x43\xB6\x75";

    ebml->cluster_start = -2;

    return ebml;

}

317 318 319
/* Return the size of a buffer needed to store the next
 * chunk that ebml_read can yield.
 */
320 321 322 323 324 325 326
static int ebml_read_space(ebml_t *ebml)
{

    int read_space;

    if (ebml->header_read == 1)
    {
327 328 329
        /* The header has previously been read */
        if (ebml->cluster_start > 0) {
            /* return up until just before a new cluster starts */
330
            read_space = ebml->cluster_start;
331 332 333 334
        } else {
            /* return most of what we have, but leave enough unread
             * to detect the next cluster.
             */
335
            read_space = ebml->position - 4;
336
        }
337

338 339 340 341
        return read_space;
    }
    else
    {
342 343
        if (ebml->header_size != 0) {
            /* The header can be read */
344
            return ebml->header_size;
345 346
        } else {
            /* The header's not ready yet */
347
            return 0;
348
        }
349 350 351 352
    }

}

353 354 355
/* Return a chunk of the EBML/MKV/WebM stream.
 * A cluster element's opening tag will always start a new chunk.
 */
356 357 358 359 360 361
static int ebml_read(ebml_t *ebml, char *buffer, int len)
{

    int read_space;
    int to_read;

362
    if (len < 1) {
363
        return 0;
364
    }
365

366
    if (ebml->header_read == 1)
367
    {
368 369 370
        /* The header has previously been read */
        if (ebml->cluster_start > 0) {
            /* return up until just before a new cluster starts */
371
            read_space = ebml->cluster_start;
372
        } else {
373
            read_space = ebml->position - 4;
374
        }
375

376
        if (read_space < 1) {
377
            return 0;
378
        }
379

380
        if (read_space >= len ) {
381
            to_read = len;
382
        } else {
383
            to_read = read_space;
384
        }
385 386

        memcpy(buffer, ebml->buffer, to_read);
387 388
        
        /* Shift unread data down to the start of the buffer */
389 390
        memmove(ebml->buffer, ebml->buffer + to_read, ebml->position - to_read);
        ebml->position -= to_read;
391

392
        if (ebml->cluster_start > 0) {
393
            ebml->cluster_start -= to_read;
394
        }
395 396 397 398 399
    }
    else
    {
        if (ebml->header_size != 0)
        {
400
            /* Can read a chunk of the header */
401 402
            read_space = ebml->header_size - ebml->header_read_position;

403
            if (read_space >= len) {
404
                to_read = len;
405
            } else {
406
                to_read = read_space;
407
            }
408 409 410 411

            memcpy(buffer, ebml->header, to_read);
            ebml->header_read_position += to_read;

412
            if (ebml->header_read_position == ebml->header_size) {
413
                ebml->header_read = 1;
414
            }
415 416 417
        }
        else
        {
418
            /* The header's not ready yet */
419 420 421 422 423 424 425 426
            return 0;
        }
    }

    return to_read;

}

427 428 429
/* Return nonzero if the chunk last returned by ebml_read was
 * a sync point.
 */
430 431 432
static int ebml_last_was_sync(ebml_t *ebml)
{

433 434 435 436
    if (ebml->cluster_start == 0) {
        /* The data buffer now starts with a cluster, so the chunk
         * just removed from it was (probably) not a cluster's start.
         */
437 438 439
        ebml->cluster_start -= 1;
        return 0;
    }
440

441 442 443 444
    if (ebml->cluster_start == -1) {
        /* Above logic triggered for the previous chunk, therefore the
         * chunk just removed was a cluster's start.
         */
445 446 447
        ebml->cluster_start -= 1;
        return 1;
    }
448

449 450 451 452 453 454 455
    return 0;

}

static char *ebml_write_buffer(ebml_t *ebml, int len)
{

456
    return (char *) ebml->input_buffer;
457 458 459

}

460 461
/* Process data that has been written to the EBML parser's input buffer.
 */
462 463 464 465 466
static int ebml_wrote(ebml_t *ebml, int len)
{

    int b;

467 468 469
    if (ebml->header_size == 0) {
        /* Still reading header */
        if ((ebml->header_position + len) > EBML_HEADER_MAX_SIZE) {
470
            ICECAST_LOG_ERROR("EBML Header too large, failing");
471 472
            return -1;
        }
473

474 475
/*
        ICECAST_LOG_DEBUG("EBML: Adding to header, ofset is %d size is %d adding %d",
476 477
                          ebml->header_size, ebml->header_position, len);
*/
478

479 480
        memcpy(ebml->header + ebml->header_position, ebml->input_buffer, len);
        ebml->header_position += len;
481 482
    } else {
        /* Header's already been determined, read into data buffer */
483 484
        memcpy(ebml->buffer + ebml->position, ebml->input_buffer, len);
    }
485

486 487
    for (b = 0; b < len - 4; b++)
    {
488 489 490 491 492 493
        /* Scan for cluster start marker.
         * False positives are possible, but unlikely, and only
         * permanently corrupt a stream if they occur while scanning
         * the initial header. Else, a client can reconnect in a few
         * seconds and get a real sync point.
         */
494 495
        if (!memcmp(ebml->input_buffer + b, ebml->cluster_id, 4))
        {
496 497 498
/*
            ICECAST_LOG_DEBUG("EBML: found cluster");
*/
499

500 501
            if (ebml->header_size == 0)
            {
502
                /* We were looking for the header end; now we've found it */
503
                ebml->header_size = ebml->header_position - len + b;
504 505
                
                /* Shift data after the header into the data buffer */
506 507
                memcpy(ebml->buffer, ebml->input_buffer + b, len - b);
                ebml->position = len - b;
508 509
                
                /* Mark the start of the data as the first sync point */
510 511
                ebml->cluster_start = -1;
                return len;
512 513
            } else {
                /* We've located a sync point in the data stream */
514 515 516 517
                ebml->cluster_start = ebml->position + b;
            }
        }
    }
518

519 520 521 522 523
    ebml->position += len;

    return len;

}