yp.c 26.7 KB
Newer Older
1 2 3 4 5
/* Icecast
 *
 * This program is distributed under the GNU General Public License, version 2.
 * A copy of this license is included with this source.
 *
6
 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 8 9 10 11 12
 *                      Michael Smith <msmith@xiph.org>,
 *                      oddsock <oddsock@xiph.org>,
 *                      Karl Heyes <karl@xiph.org>
 *                      and others (see AUTHORS for details).
 */

13
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 15 16 17
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

18 19
#include <stdio.h>
#include <string.h>
20
#include <stdlib.h>
21

22
#include "common/thread/thread.h"
23

24
#include "yp.h"
25
#include "global.h"
26
#include "curl.h"
27 28
#include "logging.h"
#include "source.h"
29
#include "cfgfile.h"
30
#include "stats.h"
31 32 33 34

#ifdef WIN32
#define snprintf _snprintf
#endif
35

36
#define CATMODULE "yp"
37

38 39 40
struct yp_server
{
    char        *url;
41
    char        *server_id;
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    unsigned    url_timeout;
    unsigned    touch_interval;
    int         remove;

    CURL *curl;
    struct ypdata_tag *mounts, *pending_mounts;
    struct yp_server *next;
    char curl_error[CURL_ERROR_SIZE];
};



typedef struct ypdata_tag
{
    int remove;
57
    int release;
58 59 60 61 62 63 64 65 66 67 68 69 70 71
    int cmd_ok;

    char *sid;
    char *mount;
    char *url;
    char *listen_url;
    char *server_name;
    char *server_desc;
    char *server_genre;
    char *cluster_password;
    char *bitrate;
    char *audio_info;
    char *server_type;
    char *current_song;
72
    char *subtype;
73 74

    struct yp_server *server;
75 76 77 78
    time_t next_update;
    unsigned touch_interval;
    char *error_msg;
    int (*process)(struct ypdata_tag *yp, char *s, unsigned len);
79 80 81 82 83 84 85 86

    struct ypdata_tag *next;
} ypdata_t;


static rwlock_t yp_lock;
static mutex_t yp_pending_lock;

Karl Heyes's avatar
Karl Heyes committed
87
static volatile struct yp_server *active_yps = NULL, *pending_yps = NULL;
88
static volatile int yp_update = 0;
89 90 91
static int yp_running;
static time_t now;
static thread_type *yp_thread;
Karl Heyes's avatar
Karl Heyes committed
92
static volatile unsigned client_limit = 0;
93
static volatile char *server_version = NULL;
94 95

static void *yp_update_thread(void *arg);
96 97 98 99
static void add_yp_info(ypdata_t *yp, void *info, int type);
static int do_yp_remove(ypdata_t *yp, char *s, unsigned len);
static int do_yp_add(ypdata_t *yp, char *s, unsigned len);
static int do_yp_touch(ypdata_t *yp, char *s, unsigned len);
100 101
static void add_pending_yp (struct yp_server *server);
static void delete_marked_yp(struct yp_server *server);
102 103 104 105
static void yp_destroy_ypdata(ypdata_t *ypdata);


/* curl callback used to parse headers coming back from the YP server */
106
static size_t handle_returned_header (void *ptr, size_t size, size_t nmemb, void *stream)
107 108
{
    ypdata_t *yp = stream;
109
    size_t bytes = size * nmemb;
110

111
    /* ICECAST_LOG_DEBUG("header from YP is \"%.*s\"", bytes, ptr); */
112
    if (strncasecmp (ptr, "YPResponse: 1", 13) == 0)
113 114
        yp->cmd_ok = 1;

115
    if (strncasecmp (ptr, "YPMessage: ", 11) == 0)
116
    {
117
        size_t len = bytes - 11;
118 119 120
        free (yp->error_msg);
        yp->error_msg = calloc (1, len);
        if (yp->error_msg)
121
            sscanf (ptr + 11, "%[^\r\n]", yp->error_msg);
122 123 124 125
    }

    if (yp->process == do_yp_add)
    {
126
        if (strncasecmp (ptr, "SID: ", 5) == 0)
127
        {
128
            size_t len = bytes - 5;
129 130 131
            free (yp->sid);
            yp->sid = calloc (1, len);
            if (yp->sid)
132
                sscanf (ptr + 5, "%[^\r\n]", yp->sid);
133
        }
134
    }
135
    if (strncasecmp (ptr, "TouchFreq: ", 11) == 0)
136 137
    {
        unsigned secs;
138 139
        if ( sscanf (ptr + 11, "%u", &secs) != 1 )
            secs = 0;
140 141
        if (secs < 30)
            secs = 30;
142
        ICECAST_LOG_DEBUG("server touch interval is %u", secs);
143
        yp->touch_interval = secs;
144
    }
145
    return (size_t)bytes;
146 147 148 149 150 151 152 153
}


/* search the active and pending YP server lists */
static struct yp_server *find_yp_server (const char *url)
{
    struct yp_server *server;

154
    server = (struct yp_server *)active_yps;
155 156 157 158 159 160
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            return server;
        server = server->next;
    }
161
    server = (struct yp_server *)pending_yps;
162 163 164 165 166 167 168 169 170 171 172 173
    while (server)
    {
        if (strcmp (server->url, url) == 0)
            break;
        server = server->next;
    }
    return server;
}


static void destroy_yp_server (struct yp_server *server)
{
174 175
    ypdata_t *yp;

176 177
    if (server == NULL)
        return;
178
    ICECAST_LOG_DEBUG("Removing YP server entry for %s", server->url);
179 180 181 182 183 184 185 186 187 188 189 190 191 192

    /* delete yps:
     * first move all pendings into main queue.
     * then mark all main queue entries for deleting.
     * then remove all marked entries.
     */
    add_pending_yp(server);
    yp = server->mounts;
    while (yp) {
        yp->remove = 1;
        yp = yp->next;
    }
    delete_marked_yp(server);

193
    icecast_curl_free(server->curl);
194 195
    if (server->mounts) ICECAST_LOG_WARN("active ypdata not freed");
    if (server->pending_mounts) ICECAST_LOG_WARN("pending ypdata not freed");
196
    free (server->url);
197
    free (server->server_id);
198 199 200 201 202 203
    free (server);
}



/* search for a ypdata entry corresponding to a specific mountpoint */
204
static ypdata_t *find_yp_mount (ypdata_t *mounts, const char *mount)
205
{
206
    ypdata_t *yp = mounts;
207 208 209 210 211 212 213 214 215
    while (yp)
    {
        if (strcmp (yp->mount, mount) == 0)
            break;
        yp = yp->next;
    }
    return yp;
}

216 217 218

void yp_recheck_config (ice_config_t *config)
{
219
    size_t i;
220 221
    struct yp_server *server;

222
    ICECAST_LOG_DEBUG("Updating YP configuration");
223 224
    thread_rwlock_rlock (&yp_lock);

225
    server = (struct yp_server *)active_yps;
226 227 228 229 230
    while (server)
    {
        server->remove = 1;
        server = server->next;
    }
Karl Heyes's avatar
Karl Heyes committed
231
    client_limit = config->client_limit;
232 233
    free ((char*)server_version);
    server_version = strdup (config->server_id);
234
    /* for each yp url in config, check to see if one exists
235 236 237 238 239 240 241 242 243 244 245 246 247
       if not, then add it. */
    for (i=0 ; i < config->num_yp_directories; i++)
    {
        server = find_yp_server (config->yp_url[i]);
        if (server == NULL)
        {
            server = calloc (1, sizeof (struct yp_server));

            if (server == NULL)
            {
                destroy_yp_server (server);
                break;
            }
248
            server->server_id = strdup ((char *)server_version);
249 250
            server->url = strdup (config->yp_url[i]);
            server->url_timeout = config->yp_url_timeout[i];
251
            server->touch_interval = config->yp_touch_interval[i];
252
            server->curl = icecast_curl_new(server->url, &(server->curl_error[0]));
253 254 255 256 257
            if (server->curl == NULL)
            {
                destroy_yp_server (server);
                break;
            }
258 259
            if (server->url_timeout > 10 || server->url_timeout < 1)
                server->url_timeout = 6;
260 261 262
            if (server->touch_interval < 30)
                server->touch_interval = 30;
            curl_easy_setopt (server->curl, CURLOPT_HEADERFUNCTION, handle_returned_header);
263
            server->next = (struct yp_server *)pending_yps;
264
            pending_yps = server;
265
            ICECAST_LOG_INFO("Adding new YP server \"%s\" (timeout %ds, default interval %ds)",
266 267 268 269 270 271 272 273
                    server->url, server->url_timeout, server->touch_interval);
        }
        else
        {
            server->remove = 0;
        }
    }
    thread_rwlock_unlock (&yp_lock);
274
    thread_rwlock_wlock(&yp_lock);
275
    yp_update = 1;
276
    thread_rwlock_unlock(&yp_lock);
277 278
}

279

280
void yp_initialize(void)
281
{
282
    ice_config_t *config = config_get_config();
283 284
    thread_rwlock_create (&yp_lock);
    thread_mutex_create (&yp_pending_lock);
285 286
    yp_recheck_config (config);
    config_release_config ();
287 288
    yp_thread = thread_create("YP Touch Thread", yp_update_thread,
                            (void *)NULL, THREAD_ATTACHED);
289
}
290 291 292



293
/* handler for curl, checks if successful handling occurred
294 295
 * return 0 for ok, -1 for this entry failed, -2 for server fail.
 * On failure case, update and process are modified
296
 */
297
static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
298
{
299 300 301
    int curlcode;
    struct yp_server *server = yp->server;

302
    /* ICECAST_LOG_DEBUG("send YP (%s):%s", cmd, post); */
303 304 305 306 307 308 309
    yp->cmd_ok = 0;
    curl_easy_setopt (server->curl, CURLOPT_POSTFIELDS, post);
    curl_easy_setopt (server->curl, CURLOPT_WRITEHEADER, yp);
    curlcode = curl_easy_perform (server->curl);
    if (curlcode)
    {
        yp->process = do_yp_add;
310
        yp->next_update = now + 1200;
311
        ICECAST_LOG_ERROR("connection to %s failed with \"%s\"", server->url, server->curl_error);
312
        return -2;
313
    }
314 315
    if (yp->cmd_ok == 0)
    {
316 317
        if (yp->error_msg == NULL)
            yp->error_msg = strdup ("no response from server");
318 319
        if (yp->process == do_yp_add)
        {
320
            ICECAST_LOG_ERROR("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
321
            yp->next_update = now + 7200;
322
        }
323
        if (yp->process == do_yp_touch)
324
        {
325 326 327 328 329 330 331
            /* At this point the touch request failed, either because they rejected our session
             * or the server isn't accessible. This means we have to wait before doing another
             * add request. We have a minimum delay but we could allow the directory server to
             * give us a wait time using the TouchFreq header. This time could be given in such
             * cases as a firewall block or incorrect listenurl.
             */
            if (yp->touch_interval < 1200)
332
                yp->next_update = now + 1200;
333
            else
334
                yp->next_update = now + yp->touch_interval;
335
            ICECAST_LOG_INFO("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
336
        }
337
        yp->process = do_yp_add;
338
        free(yp->sid);
339
        yp->sid = NULL;
340
        return -1;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
341
    }
342
    ICECAST_LOG_DEBUG("YP %s at %s succeeded", cmd, server->url);
343 344 345 346 347
    return 0;
}


/* routines for building and issues requests to the YP server */
348
static int do_yp_remove (ypdata_t *yp, char *s, unsigned len)
349
{
350 351
    int ret = 0;

352 353
    if (yp->sid)
    {
354
        ret = snprintf (s, len, "action=remove&sid=%s", yp->sid);
355 356 357
        if (ret >= (signed)len)
            return ret+1;

358
        ICECAST_LOG_INFO("clearing up YP entry for %s", yp->mount);
359
        ret = send_to_yp ("remove", yp, s);
360 361
        free (yp->sid);
        yp->sid = NULL;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
362
    }
363 364
    yp->remove = 1;
    yp->process = do_yp_add;
365
    yp_update = 1;
366

367
    return ret;
368 369
}

370

371
static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
372
{
373 374
    int ret;
    char *value;
375 376 377 378 379 380
    ice_config_t *config;
    char *admin;

    config = config_get_config();
    admin = util_url_escape(config->admin);
    config_release_config();
381

382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
    value = stats_get_value (yp->mount, "server_type");
    add_yp_info (yp, value, YP_SERVER_TYPE);
    free (value);

    value = stats_get_value (yp->mount, "server_name");
    add_yp_info (yp, value, YP_SERVER_NAME);
    free (value);

    value = stats_get_value (yp->mount, "server_url");
    add_yp_info (yp, value, YP_SERVER_URL);
    free (value);

    value = stats_get_value (yp->mount, "genre");
    add_yp_info (yp, value, YP_SERVER_GENRE);
    free (value);

    value = stats_get_value (yp->mount, "bitrate");
399 400
    if (value == NULL)
        value = stats_get_value (yp->mount, "ice-bitrate");
401 402 403 404 405 406 407
    add_yp_info (yp, value, YP_BITRATE);
    free (value);

    value = stats_get_value (yp->mount, "server_description");
    add_yp_info (yp, value, YP_SERVER_DESC);
    free (value);

408
    value = stats_get_value (yp->mount, "subtype");
409 410 411 412 413 414
    add_yp_info (yp, value, YP_SUBTYPE);
    free (value);

    value = stats_get_value (yp->mount, "audio_info");
    add_yp_info (yp, value, YP_AUDIO_INFO);
    free (value);
415

416
    ret = snprintf (s, len, "action=add&admin=%s&sn=%s&genre=%s&cpswd=%s&desc="
417
                    "%s&url=%s&listenurl=%s&type=%s&stype=%s&b=%s&%s\r\n",
418
                    admin,
419 420
                    yp->server_name, yp->server_genre, yp->cluster_password,
                    yp->server_desc, yp->url, yp->listen_url,
421
                    yp->server_type, yp->subtype, yp->bitrate, yp->audio_info);
422 423
    free(admin);

424 425
    if (ret >= (signed)len)
        return ret+1;
426 427
    ret = send_to_yp ("add", yp, s);
    if (ret == 0)
428 429 430 431 432
    {
        yp->process = do_yp_touch;
        /* force first touch in 5 secs */
        yp->next_update = time(NULL) + 5;
    }
433
    return ret;
434 435 436
}


437
static int do_yp_touch (ypdata_t *yp, char *s, unsigned len)
438
{
Karl Heyes's avatar
Karl Heyes committed
439
    unsigned listeners = 0, max_listeners = 1;
440 441 442 443 444 445 446 447
    char *val, *artist, *title;
    int ret;

    artist = (char *)stats_get_value (yp->mount, "artist");
    title = (char *)stats_get_value (yp->mount, "title");
    if (artist || title)
    {
         char *song;
448
         const char *separator = " - ";
449 450 451 452 453 454 455 456 457 458
         if (artist == NULL)
         {
             artist = strdup("");
             separator = "";
         }
         if (title == NULL) title = strdup("");
         song = malloc (strlen (artist) + strlen (title) + strlen (separator) +1);
         if (song)
         {
             sprintf (song, "%s%s%s", artist, separator, title);
459 460
             add_yp_info(yp, song, YP_CURRENT_SONG);
             stats_event (yp->mount, "yp_currently_playing", song);
461 462 463 464 465 466 467 468 469 470 471 472
             free (song);
         }
    }
    free (artist);
    free (title);

    val = (char *)stats_get_value (yp->mount, "listeners");
    if (val)
    {
        listeners = atoi (val);
        free (val);
    }
Karl Heyes's avatar
Karl Heyes committed
473
    val = stats_get_value (yp->mount, "max_listeners");
474
    if (val == NULL || strcmp (val, "unlimited") == 0 || atoi(val) < 0)
Karl Heyes's avatar
Karl Heyes committed
475 476 477
        max_listeners = client_limit;
    else
        max_listeners = atoi (val);
478
    free (val);
Karl Heyes's avatar
Karl Heyes committed
479

480 481 482
    val = stats_get_value (yp->mount, "subtype");
    if (val)
    {
483
        add_yp_info (yp, val, YP_SUBTYPE);
484 485
        free (val);
    }
486 487

    ret = snprintf (s, len, "action=touch&sid=%s&st=%s"
Karl Heyes's avatar
Karl Heyes committed
488
            "&listeners=%u&max_listeners=%u&stype=%s\r\n",
489
            yp->sid, yp->current_song, listeners, max_listeners, yp->subtype);
490 491 492 493

    if (ret >= (signed)len)
        return ret+1; /* space required for above text and nul*/

494 495 496 497 498 499
    if (send_to_yp ("touch", yp, s) == 0)
    {
        yp->next_update = now + yp->touch_interval;
        return 0;
    }
    return -1;
500
}
501

502 503


504
static int process_ypdata(struct yp_server *server, ypdata_t *yp)
505
{
506
    unsigned len = 1024;
507
    char *s = NULL, *tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
508

509
    if (now < yp->next_update)
510
        return 0;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
511

512 513 514
    /* loop just in case the memory area isn't big enough */
    while (1)
    {
515
        int ret;
516
        if ((tmp = realloc(s, len)) == NULL)
517
            return 0;
518
        s = tmp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
519

520 521 522 523 524 525
        if (yp->release)
        {
            yp->process = do_yp_remove;
            yp->next_update = 0;
        }

526
        ret = yp->process (yp, s, len);
527
        if (ret <= 0)
528 529
        {
           free (s);
530
           return ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
531
        }
532
        len = ret;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
533
    }
534
    return 0;
535
}
536 537 538


static void yp_process_server (struct yp_server *server)
539
{
540
    ypdata_t *yp;
541
    int state = 0;
542

543
    /* ICECAST_LOG_DEBUG("processing yp server %s", server->url); */
544 545 546 547
    yp = server->mounts;
    while (yp)
    {
        now = time (NULL);
548 549 550 551 552
        /* if one of the streams shows that the server cannot be contacted then mark the
         * other entries for an update later. Assume YP server is dead and skip it for now
         */
        if (state == -2)
        {
553
            ICECAST_LOG_DEBUG("skipping %s on %s", yp->mount, server->url);
554 555 556 557 558
            yp->process = do_yp_add;
            yp->next_update += 900;
        }
        else
            state = process_ypdata (server, yp);
559 560 561 562 563 564
        yp = yp->next;
    }
}



565
static ypdata_t *create_yp_entry (const char *mount)
566 567
{
    ypdata_t *yp;
568
    char *s;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
569

570 571 572 573 574 575
    yp = calloc (1, sizeof (ypdata_t));
    do
    {
        unsigned len = 512;
        int ret;
        char *url;
576
        mount_proxy *mountproxy = NULL;
577 578 579 580
        ice_config_t *config;

        if (yp == NULL)
            break;
581
        yp->mount = strdup (mount);
582 583 584 585 586 587 588 589 590
        yp->server_name = strdup ("");
        yp->server_desc = strdup ("");
        yp->server_genre = strdup ("");
        yp->bitrate = strdup ("");
        yp->server_type = strdup ("");
        yp->cluster_password = strdup ("");
        yp->url = strdup ("");
        yp->current_song = strdup ("");
        yp->audio_info = strdup ("");
591
        yp->subtype = strdup ("");
592 593 594 595 596 597
        yp->process = do_yp_add;

        url = malloc (len);
        if (url == NULL)
            break;
        config = config_get_config();
598
        ret = snprintf (url, len, "http://%s:%d%s", config->hostname, config->port, mount);
599
        if (ret >= (signed)len)
600
        {
601 602
            s = realloc (url, ++ret);
            if (s) url = s;
603
            snprintf (url, ret, "http://%s:%d%s", config->hostname, config->port, mount);
604
        }
605

606
        mountproxy = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
607
        if (mountproxy && mountproxy->cluster_password)
608
            add_yp_info (yp, mountproxy->cluster_password, YP_CLUSTER_PASSWORD);
609
        config_release_config();
610

611 612 613 614 615 616 617 618 619 620 621 622 623 624
        yp->listen_url = util_url_escape (url);
        free (url);
        if (yp->listen_url == NULL)
            break;

        return yp;
    } while (0);

    yp_destroy_ypdata (yp);
    return NULL;
}


/* Check for changes in the YP servers configured */
625
static void check_servers (void)
626
{
627 628
    struct yp_server *server = (struct yp_server *)active_yps,
                     **server_p = (struct yp_server **)&active_yps;
629 630 631 632 633 634

    while (server)
    {
        if (server->remove)
        {
            struct yp_server *to_go = server;
635
            ICECAST_LOG_DEBUG("YP server \"%s\"removed", server->url);
636 637
            *server_p = server->next;
            server = server->next;
638
            destroy_yp_server(to_go);
639
            continue;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
640
        }
641 642 643 644 645 646 647 648
        server_p = &server->next;
        server = server->next;
    }
    /* add new server entries */
    while (pending_yps)
    {
        avl_node *node;

649
        server = (struct yp_server *)pending_yps;
650 651
        pending_yps = server->next;

652
        ICECAST_LOG_DEBUG("Add pending yps %s", server->url);
653
        server->next = (struct yp_server *)active_yps;
654 655 656 657 658 659 660 661 662 663
        active_yps = server;

        /* new YP server configured, need to populate with existing sources */
        avl_tree_rlock (global.source_tree);
        node = avl_get_first (global.source_tree);
        while (node)
        {
            ypdata_t *yp;

            source_t *source = node->key;
664
            if (source->yp_public && (yp = create_yp_entry (source->mount)) != NULL)
665
            {
666
                ICECAST_LOG_DEBUG("Adding existing mount %s", source->mount);
667 668 669 670
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->mounts;
                server->mounts = yp;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
671
            }
672
            node = avl_get_next (node);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
673
        }
674
        avl_tree_unlock (global.source_tree);
675
    }
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
}


static void add_pending_yp (struct yp_server *server)
{
    ypdata_t *current, *yp;
    unsigned count = 0;

    if (server->pending_mounts == NULL)
        return;
    current = server->mounts;
    server->mounts = server->pending_mounts;
    server->pending_mounts = NULL;
    yp = server->mounts;
    while (1)
    {
        count++;
        if (yp->next == NULL)
            break;
        yp = yp->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
696
    }
697
    yp->next = current;
698
    ICECAST_LOG_DEBUG("%u YP entries added to %s", count, server->url);
699 700
}

701

702
static void delete_marked_yp(struct yp_server *server)
703 704
{
    ypdata_t *yp = server->mounts, **prev = &server->mounts;
705

706 707 708 709 710
    while (yp)
    {
        if (yp->remove)
        {
            ypdata_t *to_go = yp;
711
            ICECAST_LOG_DEBUG("removed %s from YP server %s", yp->mount, server->url);
712 713
            *prev = yp->next;
            yp = yp->next;
714
            yp_destroy_ypdata(to_go);
715 716 717 718 719
            continue;
        }
        prev = &yp->next;
        yp = yp->next;
    }
720
}
721 722 723


static void *yp_update_thread(void *arg)
724
{
725
    ICECAST_LOG_INFO("YP update thread started");
726
    int running;
727 728

    yp_running = 1;
729 730 731
    running = 1;

    while (running) {
732 733 734 735 736 737
        struct yp_server *server;

        thread_sleep (200000);

        /* do the YP communication */
        thread_rwlock_rlock (&yp_lock);
738
        server = (struct yp_server *)active_yps;
739 740
        while (server)
        {
741
            /* ICECAST_LOG_DEBUG("trying %s", server->url); */
742 743
            yp_process_server (server);
            server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
744
        }
745 746 747
        /* update the local YP structure */
        if (yp_update)
        {
748
            thread_rwlock_unlock(&yp_lock);
749 750
            thread_rwlock_wlock (&yp_lock);
            check_servers ();
751
            server = (struct yp_server *)active_yps;
752 753
            while (server)
            {
754
                /* ICECAST_LOG_DEBUG("Checking yps %s", server->url); */
755 756 757
                add_pending_yp (server);
                delete_marked_yp (server);
                server = server->next;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
758
            }
759
            yp_update = 0;
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
760
        }
761 762
        running = yp_running;
        thread_rwlock_unlock(&yp_lock);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
763
    }
764 765 766 767 768
    thread_rwlock_destroy (&yp_lock);
    thread_mutex_destroy (&yp_pending_lock);
    /* free server and ypdata left */
    while (active_yps)
    {
769
        struct yp_server *server = (struct yp_server *)active_yps;
770 771 772
        active_yps = server->next;
        destroy_yp_server (server);
    }
773

774
    return NULL;
775
}
776

777 778 779


static void yp_destroy_ypdata(ypdata_t *ypdata)
780
{
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
781
    if (ypdata) {
782
        if (ypdata->mount) {
783
            free(ypdata->mount);
784 785
        }
        if (ypdata->url) {
786
            free(ypdata->url);
787
        }
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
        if (ypdata->sid) {
            free(ypdata->sid);
        }
        if (ypdata->server_name) {
            free(ypdata->server_name);
        }
        if (ypdata->server_desc) {
            free(ypdata->server_desc);
        }
        if (ypdata->server_genre) {
            free(ypdata->server_genre);
        }
        if (ypdata->cluster_password) {
            free(ypdata->cluster_password);
        }
        if (ypdata->listen_url) {
            free(ypdata->listen_url);
        }
        if (ypdata->current_song) {
            free(ypdata->current_song);
        }
        if (ypdata->bitrate) {
            free(ypdata->bitrate);
        }
        if (ypdata->server_type) {
            free(ypdata->server_type);
        }
815 816 817
        if (ypdata->audio_info) {
            free(ypdata->audio_info);
        }
818 819 820
        free(ypdata->subtype);
        free(ypdata->error_msg);
        free(ypdata);
Ed "oddsock" Zaleski's avatar
Ed "oddsock" Zaleski committed
821
    }
822
}
823

824
static void add_yp_info(ypdata_t *yp, void *info, int type)
825 826
{
    char *escaped;
827 828

    if (!info)
829
        return;
830

831 832 833 834
    escaped = util_url_escape(info);
    if (escaped == NULL)
        return;

835 836
    switch (type)
    {
837
        case YP_SERVER_NAME:
838 839
            free (yp->server_name);
            yp->server_name = escaped;
840
            break;
841
        case YP_SERVER_DESC:
842 843
            free (yp->server_desc);
            yp->server_desc = escaped;
844
            break;
845
        case YP_SERVER_GENRE:
846 847
            free (yp->server_genre);
            yp->server_genre = escaped;
848
            break;
849
        case YP_SERVER_URL:
850 851
            free (yp->url);
            yp->url = escaped;
852
            break;
853
        case YP_BITRATE:
854 855
            free (yp->bitrate);
            yp->bitrate = escaped;
856
            break;
857
        case YP_AUDIO_INFO:
858 859
            free (yp->audio_info);
            yp->audio_info = escaped;
860
            break;
861
        case YP_SERVER_TYPE:
862 863
            free (yp->server_type);
            yp->server_type = escaped;
864
            break;
865
        case YP_CURRENT_SONG:
866 867
            free (yp->current_song);
            yp->current_song = escaped;
868
            break;
869
        case YP_CLUSTER_PASSWORD:
870 871
            free (yp->cluster_password);
            yp->cluster_password = escaped;
872
            break;
873
        case YP_SUBTYPE:
874 875
            free (yp->subtype);
            yp->subtype = escaped;
876
            break;
877 878
        default:
            free (escaped);
879 880 881 882 883
    }
}


/* Add YP entries to active servers */
884
void yp_add (const char *mount)
885 886 887 888 889 890 891 892
{
    struct yp_server *server;

    /* make sure YP thread is not modifying the lists */
    thread_rwlock_rlock (&yp_lock);

    /* make sure we don't race against another yp_add */
    thread_mutex_lock (&yp_pending_lock);
893
    server = (struct yp_server *)active_yps;
894 895 896
    while (server)
    {
        ypdata_t *yp;
897 898 899 900

        /* on-demand relays may already have a YP entry */
        yp = find_yp_mount (server->mounts, mount);
        if (yp == NULL)
901
        {
902 903 904 905
            /* add new ypdata to each servers pending yp */
            yp = create_yp_entry (mount);
            if (yp)
            {
906
                ICECAST_LOG_DEBUG("Adding %s to %s", mount, server->url);
907 908 909
                yp->server = server;
                yp->touch_interval = server->touch_interval;
                yp->next = server->pending_mounts;
910
                yp->next_update = time(NULL) + 60;
911 912 913
                server->pending_mounts = yp;
                yp_update = 1;
            }
914
        }
915
        else
916
            ICECAST_LOG_DEBUG("YP entry %s already exists", mount);
917 918 919 920 921 922 923 924 925 926 927
        server = server->next;
    }
    thread_mutex_unlock (&yp_pending_lock);
    thread_rwlock_unlock (&yp_lock);
}



/* Mark an existing entry in the YP list as to be marked for deletion */
void yp_remove (const char *mount)
{
928
    struct yp_server *server = (struct yp_server *)active_yps;
929 930 931 932

    thread_rwlock_rlock (&yp_lock);
    while (server)
    {
933 934 935
        ypdata_t *list = server->mounts;

        while (1)
936
        {
937 938 939 940 941 942 943 944
            ypdata_t *yp = find_yp_mount (list, mount);
            if (yp == NULL)
                break;
            if (yp->release || yp->remove)
            {
                list = yp->next;
                continue;   /* search again these are old entries */
            }
945
            ICECAST_LOG_DEBUG("release %s on YP %s", mount, server->url);
946
            yp->release = 1;
947
            yp->next_update = 0;
948
        }
949
        server = server->next;
950
    }
951
    thread_rwlock_unlock (&yp_lock);
952
}
953 954 955 956 957 958


/* This is similar to yp_remove, but we force a touch
 * attempt */
void yp_touch (const char *mount)
{
959
    struct yp_server *server = (struct yp_server *)active_yps;
960
    ypdata_t *search_list = NULL;
961 962

    thread_rwlock_rlock (&yp_lock);
963 964 965
    if (server)
        search_list = server->mounts;

966 967
    while (server)
    {
968
        ypdata_t *yp = find_yp_mount (search_list, mount);
969 970
        if (yp)
        {
971
            /* we may of found old entries not purged yet, so skip them */
972
            if (yp->release != 0 || yp->remove != 0)
973 974 975 976
            {
                search_list = yp->next;
                continue;
            }
977 978 979
            /* don't update the directory if there is a touch scheduled soon */
            if (yp->process == do_yp_touch && now + yp->touch_interval - yp->next_update > 60)
                yp->next_update = now + 3;
980 981
        }
        server = server->next;
982 983
        if (server)
            search_list = server->mounts;
984 985 986 987 988
    }
    thread_rwlock_unlock (&yp_lock);
}


989
void yp_shutdown (void)
990
{
991
    thread_rwlock_wlock(&yp_lock);
992 993
    yp_running = 0;
    yp_update = 1;
994 995
    thread_rwlock_unlock(&yp_lock);

996 997
    if (yp_thread)
        thread_join (yp_thread);
998 999
    free ((char*)server_version);
    server_version = NULL;
1000
    ICECAST_LOG_INFO("YP thread down");
1001 1002
}