slave.c 3.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/* slave.c
 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
 *
 * Periodically requests a list of streams from a master server
 * and creates source threads for any it doesn't already have.
 * */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif

#include "os.h"

#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"

#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "format.h"
#include "logging.h"

#include "source.h"

#define CATMODULE "slave"

static void *_slave_thread(void *arg);
Michael Smith's avatar
Michael Smith committed
48
thread_t *_slave_thread_id;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static int _initialized = 0;

void slave_initialize(void) {
	if (_initialized) return;
    /* Don't create a slave thread if it isn't configured */
    if (config_get_config()->master_server == NULL)
        return;

	_initialized = 1;
	_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}

void slave_shutdown(void) {
	if (!_initialized) return;
	_initialized = 0;
	thread_join(_slave_thread_id);
}

static void *_slave_thread(void *arg) {
	sock_t mastersock, streamsock;
	char buf[256];
	char header[4096];
	connection_t *con;
	http_parser_t *parser;
    int interval = config_get_config()->master_update_interval;

	while (_initialized) {
        if (config_get_config()->master_update_interval > ++interval) {
		    thread_sleep(1000000);
            continue;
        }
        else
            interval = 0;

		mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
		if (mastersock == SOCK_ERROR) {
85
            WARN0("Relay slave failed to contact master server to fetch stream list");
86
87
88
89
90
91
92
93
			continue;
		}
		sock_write(mastersock, "GET /allstreams.txt HTTP/1.0\r\nice-password: %s\r\n\r\n", config_get_config()->source_password);
		while (sock_read_line(mastersock, buf, sizeof(buf))) {
			buf[strlen(buf)] = 0;
			avl_tree_rlock(global.source_tree);
			if (!source_find_mount(buf)) {
				avl_tree_unlock(global.source_tree);
94
95

                DEBUG1("Adding source at mountpoint \"%s\"", buf);
96
97
				streamsock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
				if (streamsock == SOCK_ERROR) {
98
                    WARN0("Failed to relay stream from master server");
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
					continue;
				}
				con = create_connection(streamsock, NULL);
				sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", buf);
				memset(header, 0, sizeof(header));
				if (util_read_header(con->sock, header, 4096) == 0) {
					connection_close(con);
					continue;
				}
				parser = httpp_create_parser();
				httpp_initialize(parser, NULL);
				if(!httpp_parse_response(parser, header, strlen(header), buf)) {
                    if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
                        ERROR1("Error parsing relay request: %s", 
                                httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                    }
                    else
                        ERROR0("Error parsing relay request");
					connection_close(con);
                    httpp_destroy(parser);
                    continue;
                }

				if (!connection_create_source(con, parser, 
                            httpp_getvar(parser, HTTPP_VAR_URI))) {
					connection_close(con);
					httpp_destroy(parser);
				}
				continue;

			}
			avl_tree_unlock(global.source_tree);
		}
		sock_close(mastersock);
	}
	thread_exit(0);
	return NULL;
}