Commit 7b2cc5f1 authored by Timothy B. Terriberry's avatar Timothy B. Terriberry

Some http improvements.

- Attempt to re-use connections when we've already received enough
   data to do so immediately.
- Make sure when seeking near the end, if the current chunk size is
   such that the _next_ chunk will be half the normal size or less,
   we just ask for the rest of the resource.

With these two changes, a normal open of a single-chain Opus-only
 file requires exactly two HTTP requests.

- Also use the response buffer as a dummy buffer when skipping
   data.
  This will avoid helgrind errors for multiple writes from
   different threads without locking (should someone be reading
   multiple streams from different threads).
  It's also better for SMP cache contention.
parent 7c52622f
......@@ -247,9 +247,6 @@ static const char *op_parse_file_url(const char *_src){
/*We will always attempt to read ahead at least this much in preference to
opening a new connection.*/
# define OP_READAHEAD_THRESH_MIN (32*(opus_int32)1024)
/*The amount to read ahead per iteration of the read-ahead loop.
16 kB is the largest size OpenSSL will return at once.*/
# define OP_READAHEAD_CHUNK_SIZE (16*1024)
/*The amount of data to request after a seek.
This is a trade-off between read throughput after a seek vs. the the ability
......@@ -866,7 +863,7 @@ static void op_http_conn_read_rate_update(OpusHTTPConn *_conn){
0: The read would block, or the connection was closed.
OP_EREAD: There was a fatal read error.*/
static int op_http_conn_read(OpusHTTPConn *_conn,
unsigned char *_buf,int _buf_size,int _blocking){
char *_buf,int _buf_size,int _blocking){
struct pollfd fd;
SSL *ssl_conn;
int nread;
......@@ -1032,7 +1029,7 @@ static int op_http_conn_read_response(OpusHTTPConn *_conn,
OP_ASSERT(size<=read_limit);
OP_ASSERT(read_limit<=size+ret);
/*Actually consume that data.*/
ret=op_http_conn_read(_conn,(unsigned char *)buf+size,read_limit-size,1);
ret=op_http_conn_read(_conn,buf+size,read_limit-size,1);
if(OP_UNLIKELY(ret<=0))return OP_FALSE;
size+=ret;
buf[size]='\0';
......@@ -2097,8 +2094,7 @@ static int op_http_conn_send_request(OpusHTTPStream *_stream,
_stream->request.nbuf=_stream->request_tail;
ret=op_sb_append_nonnegative_int64(&_stream->request,_pos);
ret|=op_sb_append(&_stream->request,"-",1);
if(_chunk_size>0&&_pos<=OP_INT64_MAX-_chunk_size
&&_pos+_chunk_size<_stream->content_length){
if(_chunk_size>0&&OP_ADV_OFFSET(_pos,2*_chunk_size)<_stream->content_length){
/*We shouldn't be pipelining requests with non-HTTP/1.1 servers.*/
OP_ASSERT(_stream->pipeline);
next_end=_pos+_chunk_size;
......@@ -2109,6 +2105,8 @@ static int op_http_conn_send_request(OpusHTTPStream *_stream,
if(_chunk_size>OP_PIPELINE_CHUNK_SIZE_MAX)_chunk_size=-1;
}
else{
/*Either this was a non-pipelined request or we were close enough to the
end to just ask for the rest.*/
next_end=-1;
_chunk_size=-1;
}
......@@ -2368,7 +2366,7 @@ static int op_http_conn_read_body(OpusHTTPStream *_stream,
OP_ASSERT(end_pos>pos);
_buf_size=OP_MIN(_buf_size,end_pos-pos);
}
nread=op_http_conn_read(_conn,_buf,_buf_size,1);
nread=op_http_conn_read(_conn,(char *)_buf,_buf_size,1);
if(OP_UNLIKELY(nread<0))return nread;
pos+=nread;
_conn->pos=pos;
......@@ -2432,6 +2430,8 @@ static int op_http_stream_read(void *_stream,
}
/*Discard data until we reach the _target position.
This destroys the contents of _stream->response.buf, as we need somewhere to
read this data, and that is a convenient place.
_just_read_ahead: Whether or not this is a plain fast-forward.
If 0, we need to issue a new request for a chunk at _target
and discard all the data from our current request(s).
......@@ -2440,7 +2440,6 @@ static int op_http_stream_read(void *_stream,
_target: The stream position to which to read ahead.*/
static int op_http_conn_read_ahead(OpusHTTPStream *_stream,
OpusHTTPConn *_conn,int _just_read_ahead,opus_int64 _target){
static unsigned char dummy_buf[OP_READAHEAD_CHUNK_SIZE];
opus_int64 pos;
opus_int64 end_pos;
opus_int64 next_pos;
......@@ -2470,8 +2469,8 @@ static int op_http_conn_read_ahead(OpusHTTPStream *_stream,
/*We already have a request outstanding.
Finish off the current chunk.*/
while(pos<end_pos){
nread=op_http_conn_read(_conn,dummy_buf,
(int)OP_MIN(end_pos-pos,OP_READAHEAD_CHUNK_SIZE),1);
nread=op_http_conn_read(_conn,_stream->response.buf,
(int)OP_MIN(end_pos-pos,_stream->response.cbuf),1);
/*We failed to read ahead.*/
if(nread<=0)return OP_FALSE;
pos+=nread;
......@@ -2495,8 +2494,8 @@ static int op_http_conn_read_ahead(OpusHTTPStream *_stream,
_conn->next_end=next_next_end;
}
while(pos<end_pos){
nread=op_http_conn_read(_conn,dummy_buf,
(int)OP_MIN(end_pos-pos,OP_READAHEAD_CHUNK_SIZE),1);
nread=op_http_conn_read(_conn,_stream->response.buf,
(int)OP_MIN(end_pos-pos,_stream->response.cbuf),1);
/*We failed to read ahead.*/
if(nread<=0)return OP_FALSE;
pos+=nread;
......@@ -2564,17 +2563,13 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
stream->pos=pos;
return 0;
}
close_pnext=NULL;
close_conn=NULL;
/*First try to find a connection we can use without waiting.*/
pnext=&stream->lru_head;
conn=stream->lru_head;
pipeline=stream->pipeline;
while(conn!=NULL){
opus_int64 conn_pos;
opus_int64 end_pos;
opus_int64 read_ahead_thresh;
int available;
int just_read_ahead;
/*If this connection has been dormant too long or has made too many
requests, close it.
This is to prevent us from hitting server limits/firewall timeouts.*/
......@@ -2585,6 +2580,52 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
conn=*pnext;
continue;
}
available=op_http_conn_estimate_available(conn);
conn_pos=conn->pos;
end_pos=conn->end_pos;
if(conn->next_pos>=0){
OP_ASSERT(end_pos>=0);
OP_ASSERT(conn->next_pos==end_pos);
end_pos=conn->next_end;
}
OP_ASSERT(end_pos<0||conn_pos<=end_pos);
/*Can we quickly read ahead without issuing a new request or waiting for
any more data?
If we have an oustanding request, we'll over-estimate the amount of data
it has available (because we'll count the response headers, too), but
that probably doesn't matter.*/
if(conn_pos<=pos&&pos-conn_pos<=available&&(end_pos<0||pos<end_pos)){
/*Found a suitable connection to re-use.*/
ret=op_http_conn_read_ahead(stream,conn,1,pos);
if(OP_UNLIKELY(ret<0)){
/*The connection might have become stale, so close it and keep going.*/
op_http_conn_close(stream,conn,pnext,1);
conn=*pnext;
continue;
}
/*Sucessfully resurrected this connection.*/
*pnext=conn->next;
conn->next=stream->lru_head;
stream->lru_head=conn;
stream->cur_conni=conn-stream->conns;
return 0;
}
pnext=&conn->next;
conn=conn->next;
}
/*Chances are that didn't work, so now try to find one we can use by reading
ahead a reasonable amount and/or by issuing a new request.*/
close_pnext=NULL;
close_conn=NULL;
pnext=&stream->lru_head;
conn=stream->lru_head;
pipeline=stream->pipeline;
while(conn!=NULL){
opus_int64 conn_pos;
opus_int64 end_pos;
opus_int64 read_ahead_thresh;
int available;
int just_read_ahead;
/*Dividing by 2048 instead of 1000 scales this by nearly 1/2, biasing away
from connection re-use (and roughly compensating for the lag required to
reopen the TCP window of a connection that's been idle).
......@@ -2606,12 +2647,7 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
&&(end_pos<0||pos<end_pos);
if(just_read_ahead||pipeline&&end_pos>=0
&&end_pos-conn_pos-available<=read_ahead_thresh){
/*Found a suitable connection to re-use.
We always attempt to re-use the first suitable connection we find, even
if another one might require less read-ahead, under the assumption
more recently used connetions have TCP windows that are open wider.
This seems to give a slight performance boost over picking the one with
the shortest estimated read-ahead time.*/
/*Found a suitable connection to re-use.*/
ret=op_http_conn_read_ahead(stream,conn,just_read_ahead,pos);
if(OP_UNLIKELY(ret<0)){
/*The connection might have become stale, so close it and keep going.*/
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment