slave.c 4.22 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "format.h"
#include "logging.h"

#include "source.h"

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
Michael Smith's avatar
Michael Smith committed
48
thread_t *_slave_thread_id;
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
    if (config_get_config()->master_server == NULL)
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

static void *_slave_thread(void *arg) {
	sock_t mastersock, streamsock;
	char buf[256];
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
73
    client_t *client;
74
    int interval = config_get_config()->master_update_interval;
75 76 77
    char *authheader, *data;
    int len;
    char *username = "relay";
78 79 80 81 82
    char *password = config_get_config()->master_password;

    if(password == NULL)
        password = config_get_config()->source_password;

83 84 85 86 87 88 89 90 91 92 93

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

		mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
		if (mastersock == SOCK_ERROR) {
94
            WARN0("Relay slave failed to contact master server to fetch stream list");
95 96
			continue;
		}
97

98
        len = strlen(username) + strlen(password) + 1;
99 100 101
        authheader = malloc(len+1);
        strcpy(authheader, username);
        strcat(authheader, ":");
102
        strcat(authheader, password);
103 104 105
        data = util_base64_encode(authheader);
		sock_write(mastersock, "GET /allstreams.txt HTTP/1.0\r\nAuthorization: Basic %s\r\n\r\n", data);
        free(data);
106 107 108 109 110
		while (sock_read_line(mastersock, buf, sizeof(buf))) {
			buf[strlen(buf)] = 0;
			avl_tree_rlock(global.source_tree);
			if (!source_find_mount(buf)) {
				avl_tree_unlock(global.source_tree);
111 112

                DEBUG1("Adding source at mountpoint \"%s\"", buf);
113 114
				streamsock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
				if (streamsock == SOCK_ERROR) {
115
                    WARN0("Failed to relay stream from master server");
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
					continue;
				}
				con = create_connection(streamsock, NULL);
				sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", buf);
				memset(header, 0, sizeof(header));
				if (util_read_header(con->sock, header, 4096) == 0) {
					connection_close(con);
					continue;
				}
				parser = httpp_create_parser();
				httpp_initialize(parser, NULL);
				if(!httpp_parse_response(parser, header, strlen(header), buf)) {
                    if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
                        ERROR1("Error parsing relay request: %s", 
                                httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                    }
                    else
                        ERROR0("Error parsing relay request");
					connection_close(con);
                    httpp_destroy(parser);
                    continue;
                }

139 140
                client = client_create(con, parser);
				if (!connection_create_source(client, con, parser, 
141
                            httpp_getvar(parser, HTTPP_VAR_URI))) {
142
                    client_destroy(client);
143 144 145 146 147 148 149 150 151 152 153
				}
				continue;

			}
			avl_tree_unlock(global.source_tree);
		}
		sock_close(mastersock);
	}
	thread_exit(0);
	return NULL;
}