slave.c 4.89 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
41
#include "geturl.h"
42
#include "source.h"
43
#include "format.h"
44 45 46 47

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
48
thread_type *_slave_thread_id;
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
    if (config_get_config()->master_server == NULL)
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

67 68 69
static void create_relay_stream(char *server, int port, char *mount)
{
    sock_t streamsock;
70 71 72
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
73
    client_t *client;
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

    DEBUG1("Adding source at mountpoint \"%s\"", mount);

	streamsock = sock_connect_wto(server, port, 0);
	if (streamsock == SOCK_ERROR) {
        WARN0("Failed to relay stream from master server");
        return;
	}
	con = create_connection(streamsock, NULL);
	sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", mount);
	memset(header, 0, sizeof(header));
	if (util_read_header(con->sock, header, 4096) == 0) {
		connection_close(con);
		return;
	}
	parser = httpp_create_parser();
	httpp_initialize(parser, NULL);
	if(!httpp_parse_response(parser, header, strlen(header), mount)) {
        if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
            ERROR1("Error parsing relay request: %s", 
                    httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
        }
        else
            ERROR0("Error parsing relay request");
		connection_close(con);
        httpp_destroy(parser);
        return;
    }

    client = client_create(con, parser);
	if (!connection_create_source(client, con, parser, 
                httpp_getvar(parser, HTTPP_VAR_URI))) {
        client_destroy(client);
	}
    return;
}

static void *_slave_thread(void *arg) {
	sock_t mastersock;
	char buf[256];
114
    int interval = config_get_config()->master_update_interval;
115 116 117
    char *authheader, *data;
    int len;
    char *username = "relay";
118 119 120 121 122
    char *password = config_get_config()->master_password;

    if(password == NULL)
        password = config_get_config()->source_password;

123 124 125 126 127 128 129 130 131 132 133

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

		mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
		if (mastersock == SOCK_ERROR) {
134
            WARN0("Relay slave failed to contact master server to fetch stream list");
135 136
			continue;
		}
137

138
        len = strlen(username) + strlen(password) + 1;
139 140 141
        authheader = malloc(len+1);
        strcpy(authheader, username);
        strcat(authheader, ":");
142
        strcat(authheader, password);
143
        data = util_base64_encode(authheader);
144 145 146 147
		sock_write(mastersock, 
                "GET /admin/streamlist HTTP/1.0\r\n"
                "Authorization: Basic %s\r\n"
                "\r\n", data);
148
        free(authheader);
149
        free(data);
150
		while (sock_read_line(mastersock, buf, sizeof(buf))) {
151 152 153 154 155
            if(!strlen(buf))
                break;
        }

		while (sock_read_line(mastersock, buf, sizeof(buf))) {
156 157 158
			avl_tree_rlock(global.source_tree);
			if (!source_find_mount(buf)) {
				avl_tree_unlock(global.source_tree);
159

160 161 162 163 164 165 166
                create_relay_stream(
                        config_get_config()->master_server,
                        config_get_config()->master_server_port,
                        buf);
			} 
            else
    			avl_tree_unlock(global.source_tree);
167 168
		}
		sock_close(mastersock);
169 170 171 172 173 174 175 176 177 178 179 180 181

        /* And now, we process the individual mounts... */
        relay_server *relay = config_get_config()->relay;
        while(relay) {
            avl_tree_rlock(global.source_tree);
            if(!source_find_mount(relay->mount)) {
                avl_tree_unlock(global.source_tree);

                create_relay_stream(relay->server, relay->port, relay->mount);
            }
            else
                avl_tree_unlock(global.source_tree);
        }
182 183 184 185
	}
	thread_exit(0);
	return NULL;
}
186