format_ebml.c 14.7 KB
Newer Older
1 2
/* Icecast
 *
3
 * This program is distributed under the GNU General Public License,
4 5 6
 * version 2. A copy of this license is included with this source.
 * At your option, this specific source file can also be distributed
 * under the GNU GPL version 3.
7
 *
8
 * Copyright 2012,      David Richards, Mozilla Foundation,
9
 *                      and others (see AUTHORS for details).
10
 * Copyright 2014,      Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>.
11 12 13 14
 */

/* format_ebml.c
 *
giles's avatar
giles committed
15
 * format plugin for WebM/EBML
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
 *
 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "refbuf.h"
#include "source.h"
#include "client.h"

#include "stats.h"
#include "format.h"
#include "format_ebml.h"

#define CATMODULE "format-ebml"

#include "logging.h"

#define EBML_HEADER_MAX_SIZE 131072
#define EBML_SLICE_SIZE 4096

42 43 44 45
typedef enum ebml_read_mode {
    EBML_STATE_READING_HEADER = 0,
    EBML_STATE_READING_CLUSTERS
} ebml_read_mode;
46

47 48 49 50 51 52 53

typedef enum ebml_chunk_type {
    EBML_CHUNK_HEADER = 0,
    EBML_CHUNK_CLUSTER_START,
    EBML_CHUNK_CLUSTER_CONTINUE
} ebml_chunk_type;

54 55 56 57 58 59 60 61 62 63 64
typedef struct ebml_client_data_st ebml_client_data_t;

struct ebml_client_data_st {

    refbuf_t *header;
    int header_pos;

};

struct ebml_st {

65 66
    ebml_read_mode output_state;
    
67 68
    char *cluster_id;
    int cluster_start;
69

70 71
    int position;
    unsigned char *buffer;
72 73 74

    int input_position;
    unsigned char *input_buffer;
75
    
76 77 78 79 80 81 82
    int header_size;
    int header_position;
    int header_read_position;
    unsigned char *header;

};

83 84 85 86 87 88
static void ebml_free_plugin(format_plugin_t *plugin);
static refbuf_t *ebml_get_buffer(source_t *source);
static int ebml_write_buf_to_client(client_t *client);
static void ebml_write_buf_to_file(source_t *source, refbuf_t *refbuf);
static int ebml_create_client_data(source_t *source, client_t *client);
static void ebml_free_client_data(client_t *client);
89 90 91 92

static ebml_t *ebml_create();
static void ebml_destroy(ebml_t *ebml);
static int ebml_read_space(ebml_t *ebml);
93
static int ebml_read(ebml_t *ebml, char *buffer, int len, ebml_chunk_type *chunk_type);
94
static unsigned char *ebml_get_write_buffer(ebml_t *ebml, int *bytes);
95 96
static int ebml_wrote(ebml_t *ebml, int len);

97
int format_ebml_get_plugin(source_t *source)
98 99 100 101 102 103 104 105 106 107 108 109 110
{

    ebml_source_state_t *ebml_source_state = calloc(1, sizeof(ebml_source_state_t));
    format_plugin_t *plugin = calloc(1, sizeof(format_plugin_t));

    plugin->get_buffer = ebml_get_buffer;
    plugin->write_buf_to_client = ebml_write_buf_to_client;
    plugin->create_client_data = ebml_create_client_data;
    plugin->free_plugin = ebml_free_plugin;
    plugin->write_buf_to_file = ebml_write_buf_to_file;
    plugin->set_tag = NULL;
    plugin->apply_settings = NULL;

111
    plugin->contenttype = httpp_getvar(source->parser, "content-type");
112 113

    plugin->_state = ebml_source_state;
114
    vorbis_comment_init(&plugin->vc);
115 116 117
    source->format = plugin;

    ebml_source_state->ebml = ebml_create();
118
    
119 120 121
    return 0;
}

122
static void ebml_free_plugin(format_plugin_t *plugin)
123 124 125 126
{

    ebml_source_state_t *ebml_source_state = plugin->_state;

127
    refbuf_release(ebml_source_state->header);
128
    ebml_destroy(ebml_source_state->ebml);
129
    free(ebml_source_state);
130
    vorbis_comment_clear(&plugin->vc);
131
    free(plugin);
132 133
}

134 135
/* Write to a client from the header buffer.
 */
136
static int send_ebml_header(client_t *client)
137 138 139 140 141 142
{

    ebml_client_data_t *ebml_client_data = client->format_data;
    int len = EBML_SLICE_SIZE;
    int ret;

143
    if (ebml_client_data->header->len - ebml_client_data->header_pos < len)
144 145 146
    {
        len = ebml_client_data->header->len - ebml_client_data->header_pos;
    }
147
    ret = client_send_bytes (client,
148 149 150 151 152 153 154 155 156 157 158 159
                             ebml_client_data->header->data + ebml_client_data->header_pos,
                             len);

    if (ret > 0)
    {
        ebml_client_data->header_pos += ret;
    }

    return ret;

}

160 161
/* Initial write-to-client function.
 */
162 163 164 165 166 167 168 169 170 171 172
static int ebml_write_buf_to_client (client_t *client)
{

    ebml_client_data_t *ebml_client_data = client->format_data;

    if (ebml_client_data->header_pos != ebml_client_data->header->len)
    {
        return send_ebml_header (client);
    }
    else
    {
173 174
        /* Now that the header's sent, short-circuit to the generic
         * write-refbufs function. */
175 176 177 178 179 180
        client->write_to_client = format_generic_write_to_client;
        return client->write_to_client(client);
    }

}

181 182
/* Return a refbuf to add to the queue.
 */
183
static refbuf_t *ebml_get_buffer(source_t *source)
184 185 186 187
{

    ebml_source_state_t *ebml_source_state = source->format->_state;
    format_plugin_t *format = source->format;
188 189 190
    unsigned char *write_buffer = NULL;
    int read_bytes = 0;
    int write_bytes = 0;
191
    ebml_chunk_type chunk_type;
192 193 194 195 196
    refbuf_t *refbuf;
    int ret;

    while (1)
    {
197 198
        read_bytes = ebml_read_space(ebml_source_state->ebml);
        if (read_bytes > 0) {
199
            /* A chunk is available for reading */
200 201
            refbuf = refbuf_new(read_bytes);
            ebml_read(ebml_source_state->ebml, refbuf->data, read_bytes, &chunk_type);
202 203 204

            if (ebml_source_state->header == NULL)
            {
205
                /* Capture header before adding clusters to the queue */
206 207 208 209
                ebml_source_state->header = refbuf;
                continue;
            }

210
/*            ICECAST_LOG_DEBUG("EBML: generated refbuf, size %i : %hhi %hhi %hhi",
211
 *                            read_bytes, refbuf->data[0], refbuf->data[1], refbuf->data[2]);
212 213 214
 */
                              
            if (chunk_type == EBML_CHUNK_CLUSTER_START)
215 216
            {
                refbuf->sync_point = 1;
217
/*                ICECAST_LOG_DEBUG("EBML: ^ was sync point"); */
218 219 220
            }
            return refbuf;

221
        } else if(read_bytes == 0) {
222
            /* Feed more bytes into the parser */
223 224 225
            write_buffer = ebml_get_write_buffer(ebml_source_state->ebml, &write_bytes);
            read_bytes = client_read_bytes (source->client, write_buffer, write_bytes);
            if (read_bytes <= 0) {
226 227 228
                ebml_wrote (ebml_source_state->ebml, 0);
                return NULL;
            }
229 230 231
            format->read_bytes += read_bytes;
            ret = ebml_wrote (ebml_source_state->ebml, read_bytes);
            if (ret != read_bytes) {
232
                ICECAST_LOG_ERROR("Problem processing stream");
233 234 235
                source->running = 0;
                return NULL;
            }
236 237 238 239
        } else {
            ICECAST_LOG_ERROR("Problem processing stream");
            source->running = 0;
            return NULL;
240 241 242 243
        }
    }
}

244 245
/* Initialize client state.
 */
246
static int ebml_create_client_data(source_t *source, client_t *client)
247
{
248
    ebml_client_data_t *ebml_client_data;
249 250
    ebml_source_state_t *ebml_source_state = source->format->_state;

251 252
    if (!ebml_source_state->header)
        return -1;
253

254 255 256
    ebml_client_data = calloc(1, sizeof(ebml_client_data_t));
    if (!ebml_client_data)
        return -1;
257

258 259 260 261 262
    ebml_client_data->header = ebml_source_state->header;
    refbuf_addref(ebml_client_data->header);
    client->format_data = ebml_client_data;
    client->free_client_data = ebml_free_client_data;
    return 0;
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
}


static void ebml_free_client_data (client_t *client)
{

    ebml_client_data_t *ebml_client_data = client->format_data;

    refbuf_release (ebml_client_data->header);
    free (client->format_data);
    client->format_data = NULL;
}


static void ebml_write_buf_to_file_fail (source_t *source)
{
279
    ICECAST_LOG_WARN("Write to dump file failed, disabling");
280 281 282 283 284 285 286 287 288 289 290 291 292
    fclose (source->dumpfile);
    source->dumpfile = NULL;
}


static void ebml_write_buf_to_file (source_t *source, refbuf_t *refbuf)
{

    ebml_source_state_t *ebml_source_state = source->format->_state;

    if (ebml_source_state->file_headers_written == 0)
    {
        if (fwrite (ebml_source_state->header->data, 1,
293
                    ebml_source_state->header->len,
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
                    source->dumpfile) != ebml_source_state->header->len)
            ebml_write_buf_to_file_fail(source);
        else
            ebml_source_state->file_headers_written = 1;
    }

    if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) != refbuf->len)
    {
        ebml_write_buf_to_file_fail(source);
    }

}


/* internal ebml parsing */

static void ebml_destroy(ebml_t *ebml)
{

    free(ebml->header);
    free(ebml->input_buffer);
    free(ebml->buffer);
    free(ebml);

}

static ebml_t *ebml_create()
{

    ebml_t *ebml = calloc(1, sizeof(ebml_t));

325 326
    ebml->output_state = EBML_STATE_READING_HEADER;

327 328 329 330 331 332
    ebml->header = calloc(1, EBML_HEADER_MAX_SIZE);
    ebml->buffer = calloc(1, EBML_SLICE_SIZE * 4);
    ebml->input_buffer = calloc(1, EBML_SLICE_SIZE);

    ebml->cluster_id = "\x1F\x43\xB6\x75";

333
    ebml->cluster_start = -1;
334 335 336 337 338

    return ebml;

}

339 340 341
/* Return the size of a buffer needed to store the next
 * chunk that ebml_read can yield.
 */
342 343 344 345 346
static int ebml_read_space(ebml_t *ebml)
{

    int read_space;

347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
    switch (ebml->output_state) {
        case EBML_STATE_READING_HEADER:
        
            if (ebml->header_size != 0) {
                /* The header can be read */
                return ebml->header_size;
            } else {
                /* The header's not ready yet */
                return 0;
            }
            break;
            
        case EBML_STATE_READING_CLUSTERS:
            
            if (ebml->cluster_start > 0) {
                /* return up until just before a new cluster starts */
                read_space = ebml->cluster_start;
            } else {
365 366
                /* return what we have */
                read_space = ebml->position;
367
            }
368

369
            return read_space;
370
    }
371 372 373
    
    ICECAST_LOG_ERROR("EBML: Invalid parser read state");
    return 0;
374 375
}

376
/* Return a chunk of the EBML/MKV/WebM stream.
377
 * The header will be buffered until it can be returned as one chunk.
378
 * A cluster element's opening tag will always start a new chunk.
379 380 381
 * 
 * chunk_type will be set to indicate if the chunk is the header,
 * the start of a cluster, or continuing the current cluster.
382
 */
383
static int ebml_read(ebml_t *ebml, char *buffer, int len, ebml_chunk_type *chunk_type)
384 385 386 387
{

    int read_space;
    int to_read;
388 389
    
    *chunk_type = EBML_CHUNK_HEADER;
390

391
    if (len < 1) {
392
        return 0;
393
    }
394

395 396 397 398 399 400 401
    switch (ebml->output_state) {
        case EBML_STATE_READING_HEADER:
        
            if (ebml->header_size != 0)
            {
                /* Can read a chunk of the header */
                read_space = ebml->header_size - ebml->header_read_position;
402

403 404 405 406 407
                if (read_space >= len) {
                    to_read = len;
                } else {
                    to_read = read_space;
                }
408

409 410
                memcpy(buffer, ebml->header, to_read);
                ebml->header_read_position += to_read;
411 412 413
                
                *chunk_type = EBML_CHUNK_HEADER;
                
414 415 416 417 418 419 420 421 422 423 424
                if (ebml->header_read_position == ebml->header_size) {
                    ebml->output_state = EBML_STATE_READING_CLUSTERS;
                }
            } else {
                /* The header's not ready yet */
                return 0;
            }
        
            break;
            
        case EBML_STATE_READING_CLUSTERS:
425
        
426 427 428 429 430 431 432 433 434 435
            *chunk_type = EBML_CHUNK_CLUSTER_CONTINUE;
            read_space = ebml->position;
            
            if (ebml->cluster_start == 0) {
                /* new cluster is starting now */
                *chunk_type = EBML_CHUNK_CLUSTER_START;
                
                /* mark end of cluster */
                ebml->cluster_start = -1;
            } else if (ebml->cluster_start > 0) {
436 437 438
                /* return up until just before a new cluster starts */
                read_space = ebml->cluster_start;
            }
439

440 441 442
            if (read_space < 1) {
                return 0;
            }
443

444
            if (read_space >= len ) {
445
                to_read = len;
446
            } else {
447
                to_read = read_space;
448
            }
449

450 451 452 453 454
            memcpy(buffer, ebml->buffer, to_read);
            
            /* Shift unread data down to the start of the buffer */
            memmove(ebml->buffer, ebml->buffer + to_read, ebml->position - to_read);
            ebml->position -= to_read;
455

456 457
            if (ebml->cluster_start > 0) {
                ebml->cluster_start -= to_read;
458
            }
459 460
        
            break;
461 462 463 464 465 466
    }

    return to_read;

}

467 468 469 470 471 472
/* Get pointer & length of the buffer able to accept input.
 * 
 * Returns the start of the writable space;
 * Sets bytes to the amount of space available.
 */
static unsigned char *ebml_get_write_buffer(ebml_t *ebml, int *bytes)
473
{
474 475
    *bytes = EBML_SLICE_SIZE - ebml->input_position;
    return ebml->input_buffer + ebml->input_position;
476 477
}

478 479
/* Process data that has been written to the EBML parser's input buffer.
 */
480 481 482 483 484
static int ebml_wrote(ebml_t *ebml, int len)
{

    int b;

485 486 487
    if (ebml->header_size == 0) {
        /* Still reading header */
        if ((ebml->header_position + len) > EBML_HEADER_MAX_SIZE) {
488
            ICECAST_LOG_ERROR("EBML Header too large, failing");
489 490
            return -1;
        }
491

492 493
/*
        ICECAST_LOG_DEBUG("EBML: Adding to header, ofset is %d size is %d adding %d",
494 495
                          ebml->header_size, ebml->header_position, len);
*/
496

497 498
        memcpy(ebml->header + ebml->header_position, ebml->input_buffer, len);
        ebml->header_position += len;
499 500
    } else {
        /* Header's already been determined, read into data buffer */
501 502
        memcpy(ebml->buffer + ebml->position, ebml->input_buffer, len);
    }
503

504
    for (b = 0; b <= len - 4; b++)
505
    {
506 507 508 509 510 511
        /* Scan for cluster start marker.
         * False positives are possible, but unlikely, and only
         * permanently corrupt a stream if they occur while scanning
         * the initial header. Else, a client can reconnect in a few
         * seconds and get a real sync point.
         */
512 513
        if (!memcmp(ebml->input_buffer + b, ebml->cluster_id, 4))
        {
514 515 516
/*
            ICECAST_LOG_DEBUG("EBML: found cluster");
*/
517

518 519
            if (ebml->header_size == 0)
            {
520
                /* We were looking for the header end; now we've found it */
521
                ebml->header_size = ebml->header_position - len + b;
522 523
                
                /* Shift data after the header into the data buffer */
524 525
                memcpy(ebml->buffer, ebml->input_buffer + b, len - b);
                ebml->position = len - b;
526 527
                
                /* Mark the start of the data as the first sync point */
528
                ebml->cluster_start = 0;
529
                return len;
530 531
            } else {
                /* We've located a sync point in the data stream */
532 533 534 535
                ebml->cluster_start = ebml->position + b;
            }
        }
    }
536

537 538 539 540 541
    ebml->position += len;

    return len;

}