Commit e4f32131 authored by Timothy B. Terriberry's avatar Timothy B. Terriberry

Estimate http connection latency and bandwidth.

This gives us a better idea when to re-use a connection.
parent 3774672b
......@@ -159,10 +159,10 @@ static const char *op_parse_file_url(const char *_src){
#if defined(OP_ENABLE_HTTP)
# include <sys/types.h>
# include <sys/socket.h>
# include <sys/time.h>
# include <sys/timeb.h>
# include <arpa/inet.h>
# include <netient/in.h>
# include <fcntl.h>
# include <netinet/in.h>
# include <netdb.h>
# include <poll.h>
# include <unistd.h>
......@@ -443,8 +443,17 @@ static int op_sb_append_nonnegative_int64(OpusStringBuf *_sb,opus_int64 _i){
struct OpusHTTPConn{
/*The current position indicator for this connection.*/
opus_int64 pos;
/*The SSL connection, if this is https.*/
SSL *ssl_conn;
/*The next connection in either the LRU or free list.*/
OpusHTTPConn *next;
/*The last time we blocked for reading from this connection.*/
struct timeb read_time;
/*The number of bytes we've read since the last time we blocked.*/
opus_int64 read_bytes;
/*The estimated throughput of this connection, in bytes/s.*/
opus_int64 read_rate;
/*The socket we're reading from.*/
int fd;
};
......@@ -482,6 +491,8 @@ struct OpusHTTPStream{
/*The connection we're currently reading from.
This can be -1 if no connection is active.*/
int cur_conni;
/*The estimated time required to open a new connection, in milliseconds.*/
opus_int32 connect_rate;
/*Information about the address we connected to.*/
struct addrinfo addr_info;
/*The address we connected to.*/
......@@ -609,7 +620,7 @@ static int op_do_ssl_step(SSL *_ssl_conn,int _fd,op_ssl_step_func _step){
# define OP_NPROTOS (2)
static int op_http_connect(OpusHTTPStream *_stream,OpusHTTPConn *_conn,
struct addrinfo *_addrs){
struct addrinfo *_addrs,struct timeb *_start_time){
struct addrinfo *addr;
struct addrinfo *addrs[OP_NPROTOS];
struct pollfd fds[OP_NPROTOS];
......@@ -640,6 +651,11 @@ static int op_http_connect(OpusHTTPStream *_stream,OpusHTTPConn *_conn,
nprotos++;
}
}
ret=ftime(_start_time);
OP_ASSERT(!ret);
*&_conn->read_time=*_start_time;
_conn->read_bytes=0;
_conn->read_rate=0;
/*Try to start a connection to each protocol.*/
for(pi=0;pi<nprotos;pi++){
ai_family=addrs[pi]->ai_family;
......@@ -866,6 +882,36 @@ static int op_http_conn_write_fully(OpusHTTPConn *_conn,
return 0;
}
static opus_int32 op_time_diff_ms(const struct timeb *_end,
const struct timeb *_start){
opus_int64 dtime;
dtime=_end->time-_start->time;
OP_ASSERT(_end->millitm<1000);
OP_ASSERT(_start->millitm<1000);
if(OP_UNLIKELY(dtime>(0x7FFFFFFF-1000)/1000))return 0x7FFFFFFF;
if(OP_UNLIKELY(dtime<(-0x7FFFFFFF+999)/1000))return -0x7FFFFFFF-1;
return (opus_int32)dtime*1000+_end->millitm-_start->millitm;
}
/*Update the read rate for this connection.*/
static void op_http_conn_read_rate_update(OpusHTTPConn *_conn){
struct timeb read_time;
opus_int32 read_delta_ms;
opus_int64 read_delta_bytes;
opus_int64 read_rate;
int ret;
ret=ftime(&read_time);
OP_ASSERT(!ret);
read_delta_ms=op_time_diff_ms(&read_time,&_conn->read_time);
read_delta_bytes=_conn->read_bytes;
read_rate=_conn->read_rate;
read_delta_ms=OP_MAX(read_delta_ms,1);
read_rate+=read_delta_bytes*1000/read_delta_ms-read_rate+4>>3;
*&_conn->read_time=*&read_time;
_conn->read_bytes=0;
_conn->read_rate=read_rate;
}
/*Tries to read from the given connection.
[out] _buf: Returns the data read.
_size: The size of the buffer.
......@@ -875,9 +921,10 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
struct pollfd fd;
SSL *ssl_conn;
ptrdiff_t nread;
ptrdiff_t nread_unblocked;
fd.fd=_conn->fd;
ssl_conn=_conn->ssl_conn;
nread=0;
nread=nread_unblocked=0;
do{
int err;
if(ssl_conn!=NULL){
......@@ -887,6 +934,7 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
/*Read some data.
Keep going to see if there's more.*/
nread+=ret;
nread_unblocked+=ret;
continue;
}
/*Connection closed.*/
......@@ -907,6 +955,7 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
/*Read some data.
Keep going to see if there's more.*/
nread+=ret;
nread_unblocked+=ret;
continue;
}
/*If we already read some data, return it right now.*/
......@@ -915,11 +964,15 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
if(err!=EAGAIN&&err!=EWOULDBLOCK)return 0;
fd.events=POLLIN;
}
_conn->read_bytes+=nread_unblocked;
op_http_conn_read_rate_update(_conn);
nread_unblocked=0;
if(!_block)break;
/*Need to wait to get any data at all.*/
if(poll(&fd,1,-1)==-1)return 0;
}
while(nread<_size);
_conn->read_bytes+=nread_unblocked;
return nread;
}
......@@ -1169,11 +1222,13 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
ret=op_parse_url(&_stream->url,_url);
if(OP_UNLIKELY(ret<0))return ret;
for(nredirs=0;nredirs<OP_REDIRECT_LIMIT;nredirs++){
char response[OP_RESPONSE_SIZE_MAX];
char *next;
char *status_code;
const char *host;
unsigned port;
struct timeb start_time;
struct timeb end_time;
char response[OP_RESPONSE_SIZE_MAX];
char *next;
char *status_code;
const char *host;
unsigned port;
if(_proxy_host==NULL){
host=_stream->url.host;
port=_stream->url.port;
......@@ -1217,7 +1272,7 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
addrs=op_resolve(host,port);
if(OP_UNLIKELY(addrs==NULL))return OP_FALSE;
}
ret=op_http_connect(_stream,_stream->conns+0,addrs);
ret=op_http_connect(_stream,_stream->conns+0,addrs,&start_time);
if(addrs!=&_stream->addr_info)freeaddrinfo(addrs);
if(OP_UNLIKELY(ret<0))return ret;
/*Build the request to send.*/
......@@ -1269,6 +1324,8 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
ret=op_http_conn_read_response(_stream->conns+0,
response,sizeof(response)/sizeof(*response));
if(OP_UNLIKELY(ret<0))return ret;
ret=ftime(&end_time);
OP_ASSERT(!ret);
next=op_http_parse_status_line(&status_code,response);
if(next==NULL)return OP_FALSE;
if(status_code[0]=='2'){
......@@ -1358,6 +1415,8 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
_stream->content_length=content_length;
_stream->conns[0].pos=0;
_stream->cur_conni=0;
_stream->connect_rate=op_time_diff_ms(&end_time,&start_time);
_stream->connect_rate=OP_MAX(_stream->connect_rate,1);
/*The URL has been successfully opened.*/
return 0;
}
......@@ -1415,12 +1474,16 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
static int op_http_conn_open_pos(OpusHTTPStream *_stream,
OpusHTTPConn *_conn,opus_int64 _pos){
char response[OP_RESPONSE_SIZE_MAX];
char *next;
char *status_code;
opus_int64 range_length;
int ret;
ret=op_http_connect(_stream,_conn,&_stream->addr_info);
struct timeb start_time;
struct timeb end_time;
char response[OP_RESPONSE_SIZE_MAX];
char *next;
char *status_code;
opus_int64 range_length;
opus_int32 connect_rate;
opus_int32 connect_time;
int ret;
ret=op_http_connect(_stream,_conn,&_stream->addr_info,&start_time);
if(OP_UNLIKELY(ret<0))return ret;
/*Build the request to send.*/
_stream->request.nbuf=_stream->request_tail;
......@@ -1433,6 +1496,8 @@ static int op_http_conn_open_pos(OpusHTTPStream *_stream,
ret=op_http_conn_read_response(_conn,
response,sizeof(response)/sizeof(*response));
if(OP_UNLIKELY(ret<0))return ret;
ret=ftime(&end_time);
OP_ASSERT(!ret);
next=op_http_parse_status_line(&status_code,response);
if(next==NULL)return OP_FALSE;
/*We _need_ a 206 Partial Content response.*/
......@@ -1474,7 +1539,12 @@ static int op_http_conn_open_pos(OpusHTTPStream *_stream,
_conn->pos=_pos;
_stream->cur_conni=_conn-_stream->conns;
OP_ASSERT(_stream->cur_conni>=0&&_stream->cur_conni<OP_NCONNS_MAX);
/*The connection has been successfully opened.*/
/*The connection has been successfully opened.
Update the connection time estimate.*/
connect_time=op_time_diff_ms(&end_time,&start_time);
connect_rate=_stream->connect_rate;
connect_rate+=OP_MAX(connect_time,1)-connect_rate+8>>4;
_stream->connect_rate=connect_rate;
return 0;
}
......@@ -1542,12 +1612,12 @@ static size_t op_http_stream_read(void *_ptr,size_t _size,size_t _nmemb,
return nread;
}
/*To this will need to be larger than OP_CHUNK_SIZE to be useful.*/
# define OP_READAHEAD_THRESH (128*1024)
# define OP_READAHEAD_THRESH_MIN (64*1024)
/*16 kB is the largest size OpenSSL will return at once.*/
# define OP_READAHEAD_CHUNK_SIZE (16*1024)
static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
struct timeb seek_time;
OpusHTTPStream *stream;
OpusHTTPConn *conn;
OpusHTTPConn *prev;
......@@ -1582,6 +1652,15 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
}break;
default:return -1;
}
/*Mark when we deactivated the active connection.*/
if(ci>=0){
op_http_conn_read_rate_update(stream->conns+ci);
*&seek_time=*&stream->conns[ci].read_time;
}
else{
ret=ftime(&seek_time);
OP_ASSERT(!ret);
}
/*If we seeked past the end of the stream, just disable the active
connection.*/
if(pos>=content_length){
......@@ -1595,11 +1674,24 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
conn=stream->lru_head;
while(conn!=NULL){
opus_int64 conn_pos;
/*TODO: Estimate connection open time and current throughput, and compute
the read-ahead threshold accordingly.*/
/*TODO: Expire connections aggressively to avoid server timeouts.*/
opus_int64 read_ahead_thresh;
/*If this connection has been dormant too long, close it.
This is to prevent us from hitting server/firewall timeouts.*/
if(op_time_diff_ms(&seek_time,&conn->read_time)>5*1000){
*pnext=conn->next;
conn->next=stream->lru_head;
stream->lru_head=conn;
op_http_conn_close(stream,conn);
conn=*pnext;
continue;
}
/*Dividing by 512 instead of 1000 scales this by nearly 2, biasing towards
connection re-use (and roughly compensating for the ability of the TCP
window to open up on long reads).*/
read_ahead_thresh=OP_MAX(OP_READAHEAD_THRESH_MIN,
stream->connect_rate*conn->read_rate>>9);
conn_pos=conn->pos;
if(pos-OP_READAHEAD_THRESH<=conn_pos&&conn_pos<=pos){
if(pos-read_ahead_thresh<=conn_pos&&conn_pos<=pos){
/*Found a suitable connection to re-use.*/
*pnext=conn->next;
conn->next=stream->lru_head;
......
......@@ -2008,7 +2008,7 @@ static int op_pcm_seek_page_impl(OggOpusFile *_of,
OP_ASSERT(!ret);
/*Take a (pretty decent) guess.*/
bisect=begin+op_rescale64(diff,diff2,end-begin)-OP_CHUNK_SIZE;
if(bisect<begin+OP_CHUNK_SIZE)bisect=begin;
if(bisect-OP_CHUNK_SIZE<begin)bisect=begin;
}
ret=op_seek_helper(_of,bisect);
if(OP_UNLIKELY(ret<0))return ret;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment