stats.c 30.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org, 
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
11
 * Copyright 2012-2014, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12 13
 */

14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

Jack Moffitt's avatar
Jack Moffitt committed
18 19 20 21
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdarg.h>
22
#include <ctype.h>
Jack Moffitt's avatar
Jack Moffitt committed
23

Michael Smith's avatar
Michael Smith committed
24 25 26
#include <libxml/xmlmemory.h>
#include <libxml/parser.h>
#include <libxml/tree.h>
Jack Moffitt's avatar
Jack Moffitt committed
27

Marvin Scholz's avatar
Marvin Scholz committed
28 29 30 31
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "common/httpp/httpp.h"
#include "common/net/sock.h"
Jack Moffitt's avatar
Jack Moffitt committed
32 33 34

#include "connection.h"

35
#include "source.h"
Jack Moffitt's avatar
Jack Moffitt committed
36 37 38
#include "global.h"
#include "refbuf.h"
#include "client.h"
39
#include "admin.h"
Jack Moffitt's avatar
Jack Moffitt committed
40
#include "stats.h"
41
#include "xslt.h"
42
#include "util.h"
43 44
#define CATMODULE "stats"
#include "logging.h"
Jack Moffitt's avatar
Jack Moffitt committed
45

46
#ifdef _WIN32
47
#define atoll _atoi64
48
#define vsnprintf _vsnprintf
49
#define snprintf _snprintf
50
#endif
Jack Moffitt's avatar
Jack Moffitt committed
51

52 53 54 55
#define STATS_EVENT_SET     0
#define STATS_EVENT_INC     1
#define STATS_EVENT_DEC     2
#define STATS_EVENT_ADD     3
Karl Heyes's avatar
Karl Heyes committed
56 57 58
#define STATS_EVENT_SUB     4
#define STATS_EVENT_REMOVE  5
#define STATS_EVENT_HIDDEN  6
59

60 61 62 63 64 65 66 67
typedef struct _event_queue_tag
{
    volatile stats_event_t *head;
    volatile stats_event_t **tail;
} event_queue_t;

#define event_queue_init(qp)    { (qp)->head = NULL; (qp)->tail = &(qp)->head; }

Jack Moffitt's avatar
Jack Moffitt committed
68 69
typedef struct _event_listener_tag
{
70 71
    event_queue_t queue;
    mutex_t mutex;
Jack Moffitt's avatar
Jack Moffitt committed
72

73
    struct _event_listener_tag *next;
Jack Moffitt's avatar
Jack Moffitt committed
74 75
} event_listener_t;

76
static volatile int _stats_running = 0;
77
static thread_type *_stats_thread_id;
78
static volatile int _stats_threads = 0;
Jack Moffitt's avatar
Jack Moffitt committed
79

80 81
static stats_t _stats;
static mutex_t _stats_mutex;
Jack Moffitt's avatar
Jack Moffitt committed
82

83
static event_queue_t _global_event_queue;
Jack Moffitt's avatar
Jack Moffitt committed
84 85
mutex_t _global_event_mutex;

86
static volatile event_listener_t *_event_listeners;
Jack Moffitt's avatar
Jack Moffitt committed
87 88 89 90 91 92 93


static void *_stats_thread(void *arg);
static int _compare_stats(void *a, void *b, void *arg);
static int _compare_source_stats(void *a, void *b, void *arg);
static int _free_stats(void *key);
static int _free_source_stats(void *key);
94
static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue);
95 96
static stats_node_t *_find_node(avl_tree *tree, const char *name);
static stats_source_t *_find_source(avl_tree *tree, const char *source);
Jack Moffitt's avatar
Jack Moffitt committed
97
static void _free_event(stats_event_t *event);
98
static stats_event_t *_get_event_from_queue(event_queue_t *queue);
Jack Moffitt's avatar
Jack Moffitt committed
99

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123

/* simple helper function for creating an event */
static stats_event_t *build_event (const char *source, const char *name, const char *value)
{
    stats_event_t *event;

    event = (stats_event_t *)calloc(1, sizeof(stats_event_t));
    if (event)
    {
        if (source)
            event->source = (char *)strdup(source);
        if (name)
            event->name = (char *)strdup(name);
        if (value)
            event->value = (char *)strdup(value);
        else
            event->action = STATS_EVENT_REMOVE;
    }
    return event;
}

static void queue_global_event (stats_event_t *event)
{
    thread_mutex_lock(&_global_event_mutex);
124
    _add_event_to_queue (event, &_global_event_queue);
125 126 127
    thread_mutex_unlock(&_global_event_mutex);
}

128
void stats_initialize(void)
Jack Moffitt's avatar
Jack Moffitt committed
129
{
130
    _event_listeners = NULL;
131

132 133 134
    /* set up global struct */
    _stats.global_tree = avl_tree_new(_compare_stats, NULL);
    _stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
Jack Moffitt's avatar
Jack Moffitt committed
135

136 137
    /* set up global mutex */
    thread_mutex_create(&_stats_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
138

139
    /* set up stats queues */
140
    event_queue_init(&_global_event_queue);
141
    thread_mutex_create(&_global_event_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
142

143 144 145
    /* fire off the stats thread */
    _stats_running = 1;
    _stats_thread_id = thread_create("Stats Thread", _stats_thread, NULL, THREAD_ATTACHED);
Jack Moffitt's avatar
Jack Moffitt committed
146 147
}

148
void stats_shutdown(void)
Jack Moffitt's avatar
Jack Moffitt committed
149
{
150
    int n;
Jack Moffitt's avatar
Jack Moffitt committed
151

152
    if (!_stats_running) /* We can't shutdown if we're not running. */
153 154
        return;

155 156 157
    /* wait for thread to exit */
    _stats_running = 0;
    thread_join(_stats_thread_id);
Jack Moffitt's avatar
Jack Moffitt committed
158

159 160 161 162 163 164 165
    /* wait for other threads to shut down */
    do {
        thread_sleep(300000);
        thread_mutex_lock(&_stats_mutex);
        n = _stats_threads;
        thread_mutex_unlock(&_stats_mutex);
    } while (n > 0);
166
    ICECAST_LOG_INFO("stats thread finished");
167

168
    /* free the queues */
Jack Moffitt's avatar
Jack Moffitt committed
169

170 171
    /* destroy the queue mutexes */
    thread_mutex_destroy(&_global_event_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
172

173 174 175
    thread_mutex_destroy(&_stats_mutex);
    avl_tree_free(_stats.source_tree, _free_source_stats);
    avl_tree_free(_stats.global_tree, _free_stats);
176

177 178 179 180
    while (1)
    {
        stats_event_t *event = _get_event_from_queue (&_global_event_queue);
        if (event == NULL) break;
181 182 183 184
        if(event->source)
            free(event->source);
        if(event->value)
            free(event->value);
185 186
        if(event->name)
            free(event->name);
187 188
        free(event);
    }
Jack Moffitt's avatar
Jack Moffitt committed
189 190
}

191
stats_t *stats_get_stats(void)
Jack Moffitt's avatar
Jack Moffitt committed
192
{
193
    /* lock global stats
194
     
195
     copy stats
Jack Moffitt's avatar
Jack Moffitt committed
196

197
     unlock global stats
Jack Moffitt's avatar
Jack Moffitt committed
198

199
     return copied stats */
Jack Moffitt's avatar
Jack Moffitt committed
200

201
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
202 203
}

204 205
/* simple name=tag stat create/update */
void stats_event(const char *source, const char *name, const char *value)
Jack Moffitt's avatar
Jack Moffitt committed
206
{
207 208
    stats_event_t *event;

209 210
    if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
    {
211
        ICECAST_LOG_WARN("seen non-UTF8 data, probably incorrect metadata (%s, %s)", name, value);
212 213
        return;
    }
214
    event = build_event(source, name, value);
215
    if (event)
216
        queue_global_event(event);
Jack Moffitt's avatar
Jack Moffitt committed
217 218
}

219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239

/* wrapper for stats_event, this takes a charset to convert from */
void stats_event_conv(const char *mount, const char *name, const char *value, const char *charset)
{
    const char *metadata = value;
    xmlBufferPtr conv = xmlBufferCreate ();

    if (charset)
    {
        xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);

        if (handle)
        {
            xmlBufferPtr raw = xmlBufferCreate ();
            xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
            if (xmlCharEncInFunc (handle, conv, raw) > 0)
                metadata = (char *)xmlBufferContent (conv);
            xmlBufferFree (raw);
            xmlCharEncCloseFunc (handle);
        }
        else
240
            ICECAST_LOG_WARN("No charset found for \"%s\"", charset);
241 242 243 244 245 246
    }

    stats_event (mount, name, metadata);
    xmlBufferFree (conv);
}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
/* make stat hidden (non-zero). name can be NULL if it applies to a whole
 * source stats tree. */
void stats_event_hidden (const char *source, const char *name, int hidden)
{
    stats_event_t *event;
    const char *str = NULL;

    if (hidden)
        str = "";
    event = build_event (source, name, str);
    if (event)
    {
        event->action = STATS_EVENT_HIDDEN;
        queue_global_event (event);
    }
}
263 264 265

/* printf style formatting for stat create/update */
void stats_event_args(const char *source, char *name, char *format, ...)
Jack Moffitt's avatar
Jack Moffitt committed
266
{
267 268
    char buf[1024];
    va_list val;
269 270 271 272
    int ret;

    if (name == NULL)
        return;
273
    va_start(val, format);
274
    ret = vsnprintf(buf, sizeof(buf), format, val);
275
    va_end(val);
Jack Moffitt's avatar
Jack Moffitt committed
276

277 278
    if (ret < 0 || (unsigned int)ret >= sizeof (buf))
    {
279
        ICECAST_LOG_WARN("problem with formatting %s stat %s",
280 281 282
                source==NULL ? "global" : source, name);
        return;
    }
283
    stats_event(source, name, buf);
Jack Moffitt's avatar
Jack Moffitt committed
284 285
}

286
static char *_get_stats(const char *source, const char *name)
Jack Moffitt's avatar
Jack Moffitt committed
287
{
288 289 290
    stats_node_t *stats = NULL;
    stats_source_t *src = NULL;
    char *value = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
291

292
    thread_mutex_lock(&_stats_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
293

294 295 296 297 298 299 300 301
    if (source == NULL) {
        stats = _find_node(_stats.global_tree, name);
    } else {
        src = _find_source(_stats.source_tree, source);
        if (src) {
            stats = _find_node(src->stats_tree, name);
        }
    }
Jack Moffitt's avatar
Jack Moffitt committed
302

303
    if (stats) value = (char *)strdup(stats->value);
Jack Moffitt's avatar
Jack Moffitt committed
304

305
    thread_mutex_unlock(&_stats_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
306

307
    return value;
Jack Moffitt's avatar
Jack Moffitt committed
308 309
}

310
char *stats_get_value(const char *source, const char *name)
311
{
312
    return(_get_stats(source, name));
313
}
314 315 316

/* increase the value in the provided stat by 1 */
void stats_event_inc(const char *source, const char *name)
Jack Moffitt's avatar
Jack Moffitt committed
317
{
318
    stats_event_t *event = build_event (source, name, NULL);
319
    /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
320 321 322 323
    if (event)
    {
        event->action = STATS_EVENT_INC;
        queue_global_event (event);
324
    }
Jack Moffitt's avatar
Jack Moffitt committed
325 326
}

327
void stats_event_add(const char *source, const char *name, unsigned long value)
Jack Moffitt's avatar
Jack Moffitt committed
328
{
329
    stats_event_t *event = build_event (source, name, NULL);
330
    /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
331 332 333 334 335 336
    if (event)
    {
        event->value = malloc (16);
        snprintf (event->value, 16, "%ld", value);
        event->action = STATS_EVENT_ADD;
        queue_global_event (event);
337
    }
Jack Moffitt's avatar
Jack Moffitt committed
338 339
}

Karl Heyes's avatar
Karl Heyes committed
340 341 342 343 344 345 346 347 348 349 350 351
void stats_event_sub(const char *source, const char *name, unsigned long value)
{
    stats_event_t *event = build_event (source, name, NULL);
    if (event)
    {
        event->value = malloc (16);
        snprintf (event->value, 16, "%ld", value);
        event->action = STATS_EVENT_SUB;
        queue_global_event (event);
    }
}

352 353
/* decrease the value in the provided stat by 1 */
void stats_event_dec(const char *source, const char *name)
Jack Moffitt's avatar
Jack Moffitt committed
354
{
355
    /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
356 357 358 359 360
    stats_event_t *event = build_event (source, name, NULL);
    if (event)
    {
        event->action = STATS_EVENT_DEC;
        queue_global_event (event);
361
    }
Jack Moffitt's avatar
Jack Moffitt committed
362 363 364 365 366
}

/* note: you must call this function only when you have exclusive access
** to the avl_tree
*/
367
static stats_node_t *_find_node(avl_tree *stats_tree, const char *name)
Jack Moffitt's avatar
Jack Moffitt committed
368
{
369 370 371 372 373 374
    stats_node_t *stats;
    avl_node *node;
    int cmp;

    /* get the root node */
    node = stats_tree->root->right;
375

376
    while (node) {
377
        stats = (stats_node_t *) node->key;
378 379 380 381 382 383 384 385 386 387 388
        cmp = strcmp(name, stats->name);
        if (cmp < 0) 
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return stats;
    }
    
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
389 390 391 392 393
}

/* note: you must call this function only when you have exclusive access
** to the avl_tree
*/
394
static stats_source_t *_find_source(avl_tree *source_tree, const char *source)
Jack Moffitt's avatar
Jack Moffitt committed
395
{
396 397 398 399 400 401 402
    stats_source_t *stats;
    avl_node *node;
    int cmp;

    /* get the root node */
    node = source_tree->root->right;
    while (node) {
403
        stats = (stats_source_t *) node->key;
404 405 406 407 408 409 410 411
        cmp = strcmp(source, stats->source);
        if (cmp < 0)
            node = node->left;
        else if (cmp > 0)
            node = node->right;
        else
            return stats;
    }
Jack Moffitt's avatar
Jack Moffitt committed
412

413 414
    /* didn't find it */
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
415 416 417 418
}

static stats_event_t *_copy_event(stats_event_t *event)
{
419
    stats_event_t *copy = (stats_event_t *)calloc(1, sizeof(stats_event_t));
420 421 422 423
    if (event->source) 
        copy->source = (char *)strdup(event->source);
    else
        copy->source = NULL;
424 425
    if (event->name)
        copy->name = (char *)strdup(event->name);
426 427 428 429
    if (event->value)
        copy->value = (char *)strdup(event->value);
    else
        copy->value = NULL;
430
    copy->hidden = event->hidden;
431
    copy->next = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
432

433
    return copy;
Jack Moffitt's avatar
Jack Moffitt committed
434 435
}

436 437

/* helper to apply specialised changes to a stats node */
438
static void modify_node_event(stats_node_t *node, stats_event_t *event)
439 440 441
{
    char *str;

442 443 444 445 446 447 448 449
    if (event->action == STATS_EVENT_HIDDEN)
    {
        if (event->value)
            node->hidden = 1;
        else
            node->hidden = 0;
        return;
    }
450 451
    if (event->action != STATS_EVENT_SET)
    {
452
        int64_t value = 0;
453 454 455 456 457 458 459 460 461 462 463 464

        switch (event->action)
        {
            case STATS_EVENT_INC:
                value = atoi (node->value)+1;
                break;
            case STATS_EVENT_DEC:
                value = atoi (node->value)-1;
                break;
            case STATS_EVENT_ADD:
                value = atoi (node->value)+atoi (event->value);
                break;
465 466 467
            case STATS_EVENT_SUB:
                value = atoll (node->value) - atoll (event->value);
                break;
468
            default:
469
                ICECAST_LOG_WARN("unhandled event (%d) for %s", event->action, event->source);
470 471 472
                break;
        }
        str = malloc (16);
473
        snprintf (str, 16, "%" PRId64, value);
474 475
        if (event->value == NULL)
            event->value = strdup (str);
476 477 478 479 480
    }
    else
        str = (char *)strdup (event->value);
    free (node->value);
    node->value = str;
481
    if (event->source)
482
        ICECAST_LOG_DEBUG("update \"%s\" %s (%s)", event->source, node->name, node->value);
483
    else
484
        ICECAST_LOG_DEBUG("update global %s (%s)", node->name, node->value);
485 486 487 488 489 490 491
}


static void process_global_event (stats_event_t *event)
{
    stats_node_t *node;

492
    /* ICECAST_LOG_DEBUG("global event %s %s %d", event->name, event->value, event->action); */
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
    if (event->action == STATS_EVENT_REMOVE)
    {
        /* we're deleting */
        node = _find_node(_stats.global_tree, event->name);
        if (node != NULL)
            avl_delete(_stats.global_tree, (void *)node, _free_stats);
        return;
    }
    node = _find_node(_stats.global_tree, event->name);
    if (node)
    {
        modify_node_event (node, event);
    }
    else
    {
        /* add node */
        node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
        node->name = (char *)strdup(event->name);
        node->value = (char *)strdup(event->value);

        avl_insert(_stats.global_tree, (void *)node);
    }
}


static void process_source_event (stats_event_t *event)
{
    stats_source_t *snode = _find_source(_stats.source_tree, event->source);
    if (snode == NULL)
    {
        if (event->action == STATS_EVENT_REMOVE)
            return;
        snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
        if (snode == NULL)
            return;
528
        ICECAST_LOG_DEBUG("new source stat %s", event->source);
529 530
        snode->source = (char *)strdup(event->source);
        snode->stats_tree = avl_tree_new(_compare_stats, NULL);
531 532 533 534
        if (event->action == STATS_EVENT_HIDDEN)
            snode->hidden = 1;
        else
            snode->hidden = 0;
535

536
        avl_insert(_stats.source_tree, (void *) snode);
537 538 539 540 541 542 543 544 545 546 547
    }
    if (event->name)
    {
        stats_node_t *node = _find_node(snode->stats_tree, event->name);
        if (node == NULL)
        {
            if (event->action == STATS_EVENT_REMOVE)
                return;
            /* adding node */
            if (event->value)
            {
548
                ICECAST_LOG_DEBUG("new node %s (%s)", event->name, event->value);
549 550 551
                node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
                node->name = (char *)strdup(event->name);
                node->value = (char *)strdup(event->value);
552
                node->hidden = snode->hidden;
553 554 555 556 557 558 559

                avl_insert(snode->stats_tree, (void *)node);
            }
            return;
        }
        if (event->action == STATS_EVENT_REMOVE)
        {
560
            ICECAST_LOG_DEBUG("delete node %s", event->name);
561 562 563 564 565 566
            avl_delete(snode->stats_tree, (void *)node, _free_stats);
            return;
        }
        modify_node_event (node, event);
        return;
    }
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
    if (event->action == STATS_EVENT_HIDDEN)
    {
        avl_node *node = avl_get_first (snode->stats_tree);

        if (event->value)
            snode->hidden = 1;
        else
            snode->hidden = 0;
        while (node)
        {
            stats_node_t *stats = (stats_node_t*)node->key;
            stats->hidden = snode->hidden;
            node = avl_get_next (node);
        }
        return;
    }
583 584
    if (event->action == STATS_EVENT_REMOVE)
    {
585
        ICECAST_LOG_DEBUG("delete source node %s", event->source);
586 587 588 589
        avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
    }
}

590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
/* NOTE: implicit %z is added to format string. */
static inline void __format_time(char * buffer, size_t len, const char * format) {
    time_t now = time(NULL);
    struct tm local;
    char tzbuffer[32];
    char timebuffer[128];
#ifdef _WIN32
    struct tm *thetime;
    int time_days, time_hours, time_tz;
    int tempnum1, tempnum2;
    char sign;
#endif

    localtime_r (&now, &local);
#ifndef _WIN32
    strftime (tzbuffer, sizeof(tzbuffer), "%z", &local);
#else
    thetime = gmtime (&now);
    time_days = local.tm_yday - thetime->tm_yday;

    if (time_days < -1) {
        tempnum1 = 24;
    } else {
        tempnum1 = 1;
    }

    if (tempnum1 < time_days) {
        tempnum2 = -24;
    } else {
        tempnum2 = time_days*24;
    }

    time_hours = (tempnum2 + local.tm_hour - thetime->tm_hour);
    time_tz = time_hours * 60 + local.tm_min - thetime->tm_min;

    if (time_tz < 0) {
        sign = '-';
        time_tz = -time_tz;
    } else {
        sign = '+';
    }

    snprintf(tzbuffer, sizeof(tzbuffer), "%c%.2d%.2d", sign, time_tz / 60, time_tz % 60);
#endif
    strftime (timebuffer, sizeof(timebuffer), format, &local);

    snprintf(buffer, len, "%s%s", timebuffer, tzbuffer);
}
638

Karl Heyes's avatar
Karl Heyes committed
639 640 641 642
void stats_event_time (const char *mount, const char *name)
{
    char buffer[100];

643
    __format_time(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S ");
Karl Heyes's avatar
Karl Heyes committed
644 645 646 647
    stats_event (mount, name, buffer);
}


648 649 650 651
void stats_event_time_iso8601 (const char *mount, const char *name)
{
    char buffer[100];

652
    __format_time(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%S");
653 654 655 656
    stats_event (mount, name, buffer);
}


657 658
void stats_global (ice_config_t *config)
{
659
    stats_event (NULL, "server_id", config->server_id);
660 661 662 663 664 665
    stats_event (NULL, "host", config->hostname);
    stats_event (NULL, "location", config->location);
    stats_event (NULL, "admin", config->admin);
}


Jack Moffitt's avatar
Jack Moffitt committed
666 667
static void *_stats_thread(void *arg)
{
668 669 670 671
    stats_event_t *event;
    stats_event_t *copy;
    event_listener_t *listener;

Karl Heyes's avatar
Karl Heyes committed
672
    stats_event_time (NULL, "server_start");
673
    stats_event_time_iso8601 (NULL, "server_start_iso8601");
674 675 676 677 678 679

    /* global currently active stats */
    stats_event (NULL, "clients", "0");
    stats_event (NULL, "connections", "0");
    stats_event (NULL, "sources", "0");
    stats_event (NULL, "stats", "0");
Karl Heyes's avatar
Karl Heyes committed
680
    stats_event (NULL, "listeners", "0");
681 682 683 684 685 686 687

    /* global accumulating stats */
    stats_event (NULL, "client_connections", "0");
    stats_event (NULL, "source_client_connections", "0");
    stats_event (NULL, "source_relay_connections", "0");
    stats_event (NULL, "source_total_connections", "0");
    stats_event (NULL, "stats_connections", "0");
688
    stats_event (NULL, "listener_connections", "0");
689

690
    ICECAST_LOG_INFO("stats thread started");
691
    while (_stats_running) {
692
        thread_mutex_lock(&_global_event_mutex);
693
        if (_global_event_queue.head != NULL) {
694
            /* grab the next event from the queue */
695
            event = _get_event_from_queue (&_global_event_queue);
696 697
            thread_mutex_unlock(&_global_event_mutex);

698 699
            if (event == NULL)
                continue;
700 701 702
            event->next = NULL;

            thread_mutex_lock(&_stats_mutex);
703 704 705 706 707 708

            /* check if we are dealing with a global or source event */
            if (event->source == NULL)
                process_global_event (event);
            else
                process_source_event (event);
709 710 711
            
            /* now we have an event that's been processed into the running stats */
            /* this event should get copied to event listeners' queues */
712
            listener = (event_listener_t *)_event_listeners;
713 714
            while (listener) {
                copy = _copy_event(event);
715 716 717
                thread_mutex_lock (&listener->mutex);
                _add_event_to_queue (copy, &listener->queue);
                thread_mutex_unlock (&listener->mutex);
718 719 720 721 722 723 724 725

                listener = listener->next;
            }

            /* now we need to destroy the event */
            _free_event(event);

            thread_mutex_unlock(&_stats_mutex);
726
            continue;
727
        }
728 729 730 731
        else
        {
            thread_mutex_unlock(&_global_event_mutex);
        }
732

733
        thread_sleep(300000);
734 735 736
    }

    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
737 738
}

739
/* you must have the _stats_mutex locked here */
740
static void _unregister_listener(event_listener_t *listener)
741 742 743 744 745
{
    event_listener_t **prev = (event_listener_t **)&_event_listeners,
                     *current = *prev;
    while (current)
    {
746
        if (current == listener)
747 748 749 750 751 752 753 754 755 756
        {
            *prev = current->next;
            break;
        }
        prev = &current->next;
        current = *prev;
    }
}


Jack Moffitt's avatar
Jack Moffitt committed
757 758
static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
{
759 760 761 762 763 764 765 766
    stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
    
    if (source != NULL)
        event->source = (char *)strdup(source);
    else
        event->source = NULL;
    event->name = (char *)strdup(node->name);
    event->value = (char *)strdup(node->value);
767
    event->hidden = node->hidden;
768
    event->action = STATS_EVENT_SET;
769
    event->next = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
770

771
    return event;
Jack Moffitt's avatar
Jack Moffitt committed
772 773 774
}


775 776 777 778
static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue)
{
    *queue->tail = event;
    queue->tail = (volatile stats_event_t **)&event->next;
Jack Moffitt's avatar
Jack Moffitt committed
779 780 781
}


782 783 784
static stats_event_t *_get_event_from_queue (event_queue_t *queue)
{
    stats_event_t *event = NULL;
Jack Moffitt's avatar
Jack Moffitt committed
785

786 787 788 789 790 791 792
    if (queue && queue->head)
    {
        event = (stats_event_t *)queue->head;
        queue->head = event->next;
        if (queue->head == NULL)
            queue->tail = &queue->head;
    }
Jack Moffitt's avatar
Jack Moffitt committed
793

794
    return event;
Jack Moffitt's avatar
Jack Moffitt committed
795 796
}

797
static int _send_event_to_client(stats_event_t *event, client_t *client)
Jack Moffitt's avatar
Jack Moffitt committed
798
{
799
    int len;
800
    char buf [200];
Jack Moffitt's avatar
Jack Moffitt committed
801

802
    /* send data to the client!!!! */
803
    len = snprintf (buf, sizeof (buf), "EVENT %s %s %s\n",
804 805 806
            (event->source != NULL) ? event->source : "global",
            event->name ? event->name : "null",
            event->value ? event->value : "null");
807
    if (len > 0 && len < (int)sizeof (buf))
808 809 810 811 812 813
    {
        client_send_bytes (client, buf, len);
        if (client->con->error)
            return -1;
    }
    return 0;
Jack Moffitt's avatar
Jack Moffitt committed
814 815
}

816 817

static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int hidden)
Jack Moffitt's avatar
Jack Moffitt committed
818
{
819 820
    avl_node *avlnode;
    xmlNodePtr ret = NULL;
821 822

    thread_mutex_lock(&_stats_mutex);
823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841
    /* general stats first */
    avlnode = avl_get_first(_stats.global_tree);
    while (avlnode)
    {
        stats_node_t *stat = avlnode->key;
        if (stat->hidden <=  hidden)
            xmlNewTextChild (root, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
        avlnode = avl_get_next (avlnode);
    }
    /* now per mount stats */
    avlnode = avl_get_first(_stats.source_tree);
    while (avlnode)
    {
        stats_source_t *source = (stats_source_t *)avlnode->key;
        if (source->hidden <= hidden &&
                (show_mount == NULL || strcmp (show_mount, source->source) == 0))
        {
            avl_node *avlnode2 = avl_get_first (source->stats_tree);
            xmlNodePtr xmlnode = xmlNewTextChild (root, NULL, XMLSTR("source"), NULL);
842

843 844 845 846 847 848 849 850 851
            xmlSetProp (xmlnode, XMLSTR("mount"), XMLSTR(source->source));
            if (ret == NULL)
                ret = xmlnode;
            while (avlnode2)
            {
                stats_node_t *stat = avlnode2->key;
                xmlNewTextChild (xmlnode, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
                avlnode2 = avl_get_next (avlnode2);
            }
852
        }
853
        avlnode = avl_get_next (avlnode);
854 855
    }
    thread_mutex_unlock(&_stats_mutex);
856
    return ret;
Jack Moffitt's avatar
Jack Moffitt committed
857 858
}

859

860 861 862 863 864
/* factoring out code for stats loops
** this function copies all stats to queue, and registers 
** the queue for all new events atomically.
** note: mutex must already be created!
*/
865
static void _register_listener (event_listener_t *listener)
Jack Moffitt's avatar
Jack Moffitt committed
866
{
867 868 869 870
    avl_node *node;
    avl_node *node2;
    stats_event_t *event;
    stats_source_t *source;
Jack Moffitt's avatar
Jack Moffitt committed
871

872
    thread_mutex_lock(&_stats_mutex);
Jack Moffitt's avatar
Jack Moffitt committed
873

874
    /* first we fill our queue with the current stats */
875

876 877 878
    /* start with the global stats */
    node = avl_get_first(_stats.global_tree);
    while (node) {
879 880
        event = _make_event_from_node((stats_node_t *) node->key, NULL);
        _add_event_to_queue(event, &listener->queue);
Jack Moffitt's avatar
Jack Moffitt committed
881

882 883
        node = avl_get_next(node);
    }
Jack Moffitt's avatar
Jack Moffitt committed
884

885 886 887 888 889 890 891
    /* now the stats for each source */
    node = avl_get_first(_stats.source_tree);
    while (node) {
        source = (stats_source_t *)node->key;
        node2 = avl_get_first(source->stats_tree);
        while (node2) {
            event = _make_event_from_node((stats_node_t *)node2->key, source->source);
892
            _add_event_to_queue (event, &listener->queue);
893 894 895 896 897 898

            node2 = avl_get_next(node2);
        }
        
        node = avl_get_next(node);
    }
Jack Moffitt's avatar
Jack Moffitt committed
899

900
    /* now we register to receive future event notices */
901 902
    listener->next = (event_listener_t *)_event_listeners;
    _event_listeners = listener;
Jack Moffitt's avatar
Jack Moffitt committed
903

904
    thread_mutex_unlock(&_stats_mutex);
905 906 907 908
}

void *stats_connection(void *arg)
{
909
    client_t *client = (client_t *)arg;
910
    stats_event_t *event;
911
    event_listener_t listener;
912

913
    ICECAST_LOG_INFO("stats client starting");
914

915
    event_queue_init (&listener.queue);
916 917 918
    /* increment the thread count */
    thread_mutex_lock(&_stats_mutex);
    _stats_threads++;
919
    stats_event_args (NULL, "stats", "%d", _stats_threads);
920 921
    thread_mutex_unlock(&_stats_mutex);

922
    thread_mutex_create (&(listener.mutex));
923

924
    _register_listener (&listener);
925 926

    while (_stats_running) {
927 928 929
        thread_mutex_lock (&listener.mutex);
        event = _get_event_from_queue (&listener.queue);
        thread_mutex_unlock (&listener.mutex);
930
        if (event != NULL) {
931
            if (_send_event_to_client(event, client) < 0) {
932 933 934 935 936 937
                _free_event(event);
                break;
            }
            _free_event(event);
            continue;
        }
938
        thread_sleep (500000);
939 940 941
    }

    thread_mutex_lock(&_stats_mutex);
942
    _unregister_listener (&listener);
943
    _stats_threads--;
944
    stats_event_args (NULL, "stats", "%d", _stats_threads);
945 946
    thread_mutex_unlock(&_stats_mutex);

947
    thread_mutex_destroy (&listener.mutex);
948
    client_destroy (client);
949
    ICECAST_LOG_INFO("stats client finished");
950

951
    return NULL;
Jack Moffitt's avatar
Jack Moffitt committed
952 953
}

954 955 956 957 958 959 960 961 962 963 964 965 966

void stats_callback (client_t *client, void *notused)
{
    if (client->con->error)
    {
        client_destroy (client);
        return;
    }
    client_set_queue (client, NULL);
    thread_create("Stats Connection", stats_connection, (void *)client, THREAD_DETACHED);
}


Jack Moffitt's avatar
Jack Moffitt committed
967
typedef struct _source_xml_tag {
968 969
    char *mount;
    xmlNodePtr node;
Jack Moffitt's avatar
Jack Moffitt committed
970

971
    struct _source_xml_tag *next;
Jack Moffitt's avatar
Jack Moffitt committed
972 973 974
} source_xml_t;


975
void stats_transform_xslt(client_t *client, const char *uri)
976 977
{
    xmlDocPtr doc;
978 979
    char *xslpath = util_get_path_from_normalised_uri(uri);
    const char *mount = httpp_get_query_param(client->parser, "mount");
980

981
    doc = stats_get_xml(0, mount, client->mode);
982

983
    xslt_transform(doc, xslpath, client);
984 985

    xmlFreeDoc(doc);
986
    free(xslpath);
987 988
}

989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
static void __add_metadata(xmlNodePtr node, const char *tag) {
    const char *value = strstr(tag, "=");
    char *name = NULL;
    size_t namelen = value - tag + 1;
    size_t i;

    if (!value)
        return;

    name = malloc(namelen);
    if (!name)
        return;

    for (i = 0; i < (namelen - 1); i++)
        name[i] = tolower(tag[i]);

    name[namelen-1] = 0;

    xmlNewTextChild(node, NULL, XMLSTR(name), XMLSTR(value+1));

    free(name);
}

1012
xmlDocPtr stats_get_xml(int show_hidden, const char *show_mount, operation_mode mode)
Jack Moffitt's avatar
Jack Moffitt committed
1013
{
1014
    xmlDocPtr doc;
1015
    xmlNodePtr node;
1016
    source_t * source;
1017

1018 1019
    doc = xmlNewDoc (XMLSTR("1.0"));
    node = xmlNewDocNode (doc, NULL, XMLSTR("icestats"), NULL);
1020 1021
    xmlDocSetRootElement(doc, node);

1022
    node = _dump_stats_to_doc (node, show_mount, show_hidden);
1023

1024
    if (show_mount && node) {
1025 1026 1027
        xmlNodePtr metadata = xmlNewTextChild(node, NULL, XMLSTR("metadata"), NULL);
        int i;

1028 1029
        avl_tree_rlock(global.source_tree);
        source = source_find_mount_raw(show_mount);
1030 1031
        for (i = 0; i < source->format->vc.comments; i++)
            __add_metadata(metadata, source->format->vc.user_comments[i]);
1032
        admin_add_listeners_to_mount(source, node, mode);
1033 1034 1035
        avl_tree_unlock(global.source_tree);
    }

1036
    return doc;
Jack Moffitt's avatar
Jack Moffitt committed
1037 1038
}

1039

Jack Moffitt's avatar
Jack Moffitt committed
1040 1041
static int _compare_stats(void *arg, void *a, void *b)
{
1042 1043
    stats_node_t *nodea = (stats_node_t *)a;
    stats_node_t *nodeb = (stats_node_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
1044

1045
    return strcmp(nodea->name, nodeb->name);
Jack Moffitt's avatar
Jack Moffitt committed
1046 1047 1048 1049
}

static int _compare_source_stats(void *arg, void *a, void *b)
{
1050 1051
    stats_source_t *nodea = (stats_source_t *)a;
    stats_source_t *nodeb = (stats_source_t *)b;
Jack Moffitt's avatar
Jack Moffitt committed
1052

1053
    return strcmp(nodea->source, nodeb->source);
Jack Moffitt's avatar
Jack Moffitt committed
1054 1055 1056 1057
}

static int _free_stats(void *key)
{
1058 1059 1060 1061 1062 1063
    stats_node_t *node = (stats_node_t *)key;
    free(node->value);
    free(node->name);
    free(node);
    
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
1064 1065 1066 1067
}

static int _free_source_stats(void *key)
{
1068 1069 1070
    stats_source_t *node = (stats_source_t *)key;
    avl_tree_free(node->stats_tree, _free_stats);
    free(node->source);
1071
    free(node);
Jack Moffitt's avatar
Jack Moffitt committed
1072

1073
    return 1;
Jack Moffitt's avatar
Jack Moffitt committed
1074 1075 1076 1077
}

static void _free_event(stats_event_t *event)
{
1078 1079 1080 1081
    if (event->source) free(event->source);
    if (event->name) free(event->name);
    if (event->value) free(event->value);
    free(event);
Jack Moffitt's avatar
Jack Moffitt committed
1082
}
1083 1084


1085
refbuf_t *stats_get_streams (void)
1086
{
1087
#define STREAMLIST_BLKSIZE  4096
1088
    avl_node *node;
1089
    unsigned int remaining = STREAMLIST_BLKSIZE;
1090 1091
    refbuf_t *start = refbuf_new (remaining), *cur = start;
    char *buffer = cur->data;
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102

    /* now the stats for each source */
    thread_mutex_lock (&_stats_mutex);
    node = avl_get_first(_stats.source_tree);
    while (node)
    {
        int ret;
        stats_source_t *source = (stats_source_t *)node->key;

        if (source->hidden == 0)
        {
1103
            if (remaining <= strlen (source->source) + 3)
1104
            {
1105 1106 1107 1108 1109
                cur->len = STREAMLIST_BLKSIZE - remaining;
                cur->next = refbuf_new (STREAMLIST_BLKSIZE);
                remaining = STREAMLIST_BLKSIZE;
                cur = cur->next;
                buffer = cur->data;
1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
            }
            ret = snprintf (buffer, remaining, "%s\r\n", source->source);
            if (ret > 0)
            {
                buffer += ret;
                remaining -= ret;
            }
        }
        node = avl_get_next(node);
    }
1120
    thread_mutex_unlock(&_stats_mutex);
1121 1122
    cur->len = STREAMLIST_BLKSIZE - remaining;
    return start;
1123 1124
}

1125 1126


1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
/* This removes any source stats from virtual mountpoints, ie mountpoints
 * where no source_t exists. This function requires the global sources lock
 * to be held before calling.
 */
void stats_clear_virtual_mounts (void)
{
    avl_node *snode;

    thread_mutex_lock (&_stats_mutex);
    snode = avl_get_first(_stats.source_tree);
    while (snode)
    {
        stats_source_t *src = (stats_source_t *)snode->key;
        source_t *source = source_find_mount_raw (src->source);

        if (source == NULL)
        {
            /* no source_t is reserved so remove them now */
            snode = avl_get_next (snode);
1146
            ICECAST_LOG_DEBUG("releasing %s stats", src->source);
1147 1148 1149 1150 1151 1152 1153 1154 1155
            avl_delete (_stats.source_tree, src, _free_source_stats);
            continue;
        }

        snode = avl_get_next (snode);
    }
    thread_mutex_unlock (&_stats_mutex);
}