2015-04-09 10:12:50 +00:00
# include "download.hh"
# include "util.hh"
# include "globals.hh"
2015-05-05 15:09:42 +00:00
# include "hash.hh"
# include "store-api.hh"
2016-07-26 19:16:52 +00:00
# include "archive.hh"
2017-02-14 13:20:00 +00:00
# include "s3.hh"
2017-03-13 13:40:15 +00:00
# include "compression.hh"
2017-03-03 21:12:17 +00:00
# ifdef ENABLE_S3
# include <aws/core/client/ClientConfiguration.h>
# endif
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
# include <unistd.h>
# include <fcntl.h>
2015-04-09 10:12:50 +00:00
# include <curl/curl.h>
2016-10-17 21:11:04 +00:00
# include <queue>
2015-10-07 15:31:50 +00:00
# include <iostream>
2016-08-10 14:06:33 +00:00
# include <thread>
2016-09-14 14:00:40 +00:00
# include <cmath>
# include <random>
2015-10-07 15:31:50 +00:00
2015-04-09 10:12:50 +00:00
namespace nix {
2015-10-07 15:31:50 +00:00
double getTime ( )
{
struct timeval tv ;
gettimeofday ( & tv , 0 ) ;
return tv . tv_sec + ( tv . tv_usec / 1000000.0 ) ;
}
2016-04-14 14:27:48 +00:00
std : : string resolveUri ( const std : : string & uri )
{
if ( uri . compare ( 0 , 8 , " channel: " ) = = 0 )
return " https://nixos.org/channels/ " + std : : string ( uri , 8 ) + " /nixexprs.tar.xz " ;
else
return uri ;
}
2017-03-14 14:03:53 +00:00
ref < std : : string > decodeContent ( const std : : string & encoding , ref < std : : string > data )
{
if ( encoding = = " " )
return data ;
else if ( encoding = = " br " )
return decompress ( encoding , * data ) ;
else
throw Error ( " unsupported Content-Encoding ‘ %s’ " , encoding ) ;
}
2016-02-29 17:15:20 +00:00
struct CurlDownloader : public Downloader
2015-04-09 10:12:50 +00:00
{
2016-09-14 14:00:40 +00:00
CURLM * curlm = 0 ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
std : : random_device rd ;
std : : mt19937 mt19937 ;
2015-10-07 15:31:50 +00:00
2016-09-14 14:34:37 +00:00
bool enableHttp2 ;
2016-09-14 14:00:40 +00:00
struct DownloadItem : public std : : enable_shared_from_this < DownloadItem >
2015-04-09 10:12:50 +00:00
{
2016-09-14 14:00:40 +00:00
CurlDownloader & downloader ;
DownloadRequest request ;
DownloadResult result ;
2016-09-16 16:54:14 +00:00
bool done = false ; // whether either the success or failure function has been called
std : : function < void ( const DownloadResult & ) > success ;
std : : function < void ( std : : exception_ptr exc ) > failure ;
2016-09-14 14:00:40 +00:00
CURL * req = 0 ;
bool active = false ; // whether the handle has been added to the multi object
std : : string status ;
bool showProgress = false ;
double prevProgressTime { 0 } , startTime { 0 } ;
unsigned int moveBack { 1 } ;
unsigned int attempt = 0 ;
/* Don't start this download until the specified time point
has been reached . */
std : : chrono : : steady_clock : : time_point embargo ;
struct curl_slist * requestHeaders = 0 ;
2017-03-13 13:40:15 +00:00
std : : string encoding ;
2016-09-14 14:00:40 +00:00
DownloadItem ( CurlDownloader & downloader , const DownloadRequest & request )
: downloader ( downloader ) , request ( request )
{
showProgress =
request . showProgress = = DownloadRequest : : yes | |
( request . showProgress = = DownloadRequest : : automatic & & isatty ( STDERR_FILENO ) ) ;
if ( ! request . expectedETag . empty ( ) )
requestHeaders = curl_slist_append ( requestHeaders , ( " If-None-Match: " + request . expectedETag ) . c_str ( ) ) ;
}
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
~ DownloadItem ( )
{
if ( req ) {
if ( active )
curl_multi_remove_handle ( downloader . curlm , req ) ;
curl_easy_cleanup ( req ) ;
}
if ( requestHeaders ) curl_slist_free_all ( requestHeaders ) ;
try {
if ( ! done )
2016-11-25 23:37:43 +00:00
fail ( DownloadError ( Interrupted , format ( " download of ‘ %s’ was interrupted " ) % request . uri ) ) ;
2016-09-14 14:00:40 +00:00
} catch ( . . . ) {
ignoreException ( ) ;
}
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
template < class T >
void fail ( const T & e )
{
2016-09-16 16:54:14 +00:00
assert ( ! done ) ;
2016-09-14 14:00:40 +00:00
done = true ;
2016-09-20 15:49:31 +00:00
callFailure ( failure , std : : make_exception_ptr ( e ) ) ;
2016-09-14 14:00:40 +00:00
}
size_t writeCallback ( void * contents , size_t size , size_t nmemb )
{
size_t realSize = size * nmemb ;
result . data - > append ( ( char * ) contents , realSize ) ;
return realSize ;
}
static size_t writeCallbackWrapper ( void * contents , size_t size , size_t nmemb , void * userp )
{
return ( ( DownloadItem * ) userp ) - > writeCallback ( contents , size , nmemb ) ;
}
size_t headerCallback ( void * contents , size_t size , size_t nmemb )
{
size_t realSize = size * nmemb ;
std : : string line ( ( char * ) contents , realSize ) ;
2016-11-25 23:37:43 +00:00
printMsg ( lvlVomit , format ( " got header for ‘ %s’ : %s " ) % request . uri % trim ( line ) ) ;
2016-09-14 14:00:40 +00:00
if ( line . compare ( 0 , 5 , " HTTP/ " ) = = 0 ) { // new response starts
result . etag = " " ;
auto ss = tokenizeString < vector < string > > ( line , " " ) ;
status = ss . size ( ) > = 2 ? ss [ 1 ] : " " ;
result . data = std : : make_shared < std : : string > ( ) ;
2017-03-13 13:40:15 +00:00
encoding = " " ;
2016-09-14 14:00:40 +00:00
} else {
auto i = line . find ( ' : ' ) ;
if ( i ! = string : : npos ) {
string name = toLower ( trim ( string ( line , 0 , i ) ) ) ;
if ( name = = " etag " ) {
result . etag = trim ( string ( line , i + 1 ) ) ;
/* Hack to work around a GitHub bug: it sends
ETags , but ignores If - None - Match . So if we get
the expected ETag on a 200 response , then shut
down the connection because we already have the
data . */
if ( result . etag = = request . expectedETag & & status = = " 200 " ) {
debug ( format ( " shutting down on 200 HTTP response with expected ETag " ) ) ;
return 0 ;
}
2017-03-13 13:40:15 +00:00
} else if ( name = = " content-encoding " )
encoding = trim ( string ( line , i + 1 ) ) ; ;
2015-04-09 10:12:50 +00:00
}
}
2016-09-14 14:00:40 +00:00
return realSize ;
2015-04-09 10:12:50 +00:00
}
2016-09-14 14:00:40 +00:00
static size_t headerCallbackWrapper ( void * contents , size_t size , size_t nmemb , void * userp )
{
return ( ( DownloadItem * ) userp ) - > headerCallback ( contents , size , nmemb ) ;
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
int progressCallback ( double dltotal , double dlnow )
{
if ( showProgress ) {
double now = getTime ( ) ;
if ( prevProgressTime < = now - 1 ) {
string s = ( format ( " [%1$.0f/%2$.0f KiB, %3$.1f KiB/s] " )
% ( dlnow / 1024.0 )
% ( dltotal / 1024.0 )
% ( now = = startTime ? 0 : dlnow / 1024.0 / ( now - startTime ) ) ) . str ( ) ;
std : : cerr < < " \ e[ " < < moveBack < < " D " < < s ;
moveBack = s . size ( ) ;
std : : cerr . flush ( ) ;
prevProgressTime = now ;
}
}
return _isInterrupted ;
}
static int progressCallbackWrapper ( void * userp , double dltotal , double dlnow , double ultotal , double ulnow )
{
return ( ( DownloadItem * ) userp ) - > progressCallback ( dltotal , dlnow ) ;
}
2017-01-24 12:15:24 +00:00
static int debugCallback ( CURL * handle , curl_infotype type , char * data , size_t size , void * userptr )
{
if ( type = = CURLINFO_TEXT )
vomit ( " curl: %s " , chomp ( std : : string ( data , size ) ) ) ;
return 0 ;
}
2016-09-14 14:00:40 +00:00
void init ( )
{
// FIXME: handle parallel downloads.
if ( showProgress ) {
2016-11-25 23:37:43 +00:00
std : : cerr < < ( format ( " downloading ‘ %1%’ ... " ) % request . uri ) ;
2015-10-07 15:31:50 +00:00
std : : cerr . flush ( ) ;
2016-09-14 14:00:40 +00:00
startTime = getTime ( ) ;
2015-10-07 15:31:50 +00:00
}
2016-09-14 14:00:40 +00:00
if ( ! req ) req = curl_easy_init ( ) ;
curl_easy_reset ( req ) ;
2017-01-24 12:15:24 +00:00
if ( verbosity > = lvlVomit ) {
curl_easy_setopt ( req , CURLOPT_VERBOSE , 1 ) ;
curl_easy_setopt ( req , CURLOPT_DEBUGFUNCTION , DownloadItem : : debugCallback ) ;
}
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_URL , request . uri . c_str ( ) ) ;
curl_easy_setopt ( req , CURLOPT_FOLLOWLOCATION , 1L ) ;
curl_easy_setopt ( req , CURLOPT_NOSIGNAL , 1 ) ;
2017-02-16 12:55:43 +00:00
curl_easy_setopt ( req , CURLOPT_USERAGENT , ( " curl/ " LIBCURL_VERSION " Nix/ " + nixVersion ) . c_str ( ) ) ;
2016-10-14 12:44:15 +00:00
# if LIBCURL_VERSION_NUM >= 0x072b00
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_PIPEWAIT , 1 ) ;
2016-10-04 12:43:23 +00:00
# endif
2016-10-14 12:44:15 +00:00
# if LIBCURL_VERSION_NUM >= 0x072f00
2016-09-14 14:34:37 +00:00
if ( downloader . enableHttp2 )
curl_easy_setopt ( req , CURLOPT_HTTP_VERSION , CURL_HTTP_VERSION_2TLS ) ;
2016-10-04 12:43:23 +00:00
# endif
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_WRITEFUNCTION , DownloadItem : : writeCallbackWrapper ) ;
curl_easy_setopt ( req , CURLOPT_WRITEDATA , this ) ;
curl_easy_setopt ( req , CURLOPT_HEADERFUNCTION , DownloadItem : : headerCallbackWrapper ) ;
curl_easy_setopt ( req , CURLOPT_HEADERDATA , this ) ;
curl_easy_setopt ( req , CURLOPT_PROGRESSFUNCTION , progressCallbackWrapper ) ;
curl_easy_setopt ( req , CURLOPT_PROGRESSDATA , this ) ;
curl_easy_setopt ( req , CURLOPT_NOPROGRESS , 0 ) ;
curl_easy_setopt ( req , CURLOPT_HTTPHEADER , requestHeaders ) ;
if ( request . head )
curl_easy_setopt ( req , CURLOPT_NOBODY , 1 ) ;
if ( request . verifyTLS )
2017-03-06 19:30:35 +00:00
curl_easy_setopt ( req , CURLOPT_CAINFO , settings . caFile . c_str ( ) ) ;
2016-09-14 14:00:40 +00:00
else {
curl_easy_setopt ( req , CURLOPT_SSL_VERIFYPEER , 0 ) ;
curl_easy_setopt ( req , CURLOPT_SSL_VERIFYHOST , 0 ) ;
}
2017-02-09 17:16:09 +00:00
/* If no file exist in the specified path, curl continues to work
2017-02-16 13:46:36 +00:00
anyway as if netrc support was disabled . */
curl_easy_setopt ( req , CURLOPT_NETRC_FILE , settings . netrcFile . c_str ( ) ) ;
2017-02-01 12:37:34 +00:00
curl_easy_setopt ( req , CURLOPT_NETRC , CURL_NETRC_OPTIONAL ) ;
2016-09-14 14:00:40 +00:00
result . data = std : : make_shared < std : : string > ( ) ;
2015-10-07 15:31:50 +00:00
}
2015-05-05 12:39:48 +00:00
2016-09-14 14:00:40 +00:00
void finish ( CURLcode code )
{
if ( showProgress )
//std::cerr << "\e[" << moveBack << "D\e[K\n";
std : : cerr < < " \n " ;
2015-10-07 15:31:50 +00:00
2016-09-14 14:00:40 +00:00
long httpStatus = 0 ;
curl_easy_getinfo ( req , CURLINFO_RESPONSE_CODE , & httpStatus ) ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
char * effectiveUrlCStr ;
curl_easy_getinfo ( req , CURLINFO_EFFECTIVE_URL , & effectiveUrlCStr ) ;
if ( effectiveUrlCStr )
result . effectiveUrl = effectiveUrlCStr ;
2016-02-29 17:15:20 +00:00
2016-11-25 23:37:43 +00:00
debug ( format ( " finished download of ‘ %s’ ; curl status = %d, HTTP status = %d, body = %d bytes " )
2016-09-14 14:00:40 +00:00
% request . uri % code % httpStatus % ( result . data ? result . data - > size ( ) : 0 ) ) ;
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
if ( code = = CURLE_WRITE_ERROR & & result . etag = = request . expectedETag ) {
code = CURLE_OK ;
httpStatus = 304 ;
}
if ( code = = CURLE_OK & &
( httpStatus = = 200 | | httpStatus = = 304 | | httpStatus = = 226 /* FTP */ | | httpStatus = = 0 /* other protocol */ ) )
{
result . cached = httpStatus = = 304 ;
done = true ;
2017-03-13 13:40:15 +00:00
try {
2017-03-14 14:03:53 +00:00
result . data = decodeContent ( encoding , ref < std : : string > ( result . data ) ) ;
2017-03-13 13:40:15 +00:00
callSuccess ( success , failure , const_cast < const DownloadResult & > ( result ) ) ;
} catch ( . . . ) {
done = true ;
callFailure ( failure , std : : current_exception ( ) ) ;
}
2016-09-14 14:00:40 +00:00
} else {
Error err =
( httpStatus = = 404 | | code = = CURLE_FILE_COULDNT_READ_FILE ) ? NotFound :
httpStatus = = 403 ? Forbidden :
( httpStatus = = 408 | | httpStatus = = 500 | | httpStatus = = 503
| | httpStatus = = 504 | | httpStatus = = 522 | | httpStatus = = 524
2017-02-27 13:36:09 +00:00
| | code = = CURLE_COULDNT_RESOLVE_HOST
| | code = = CURLE_RECV_ERROR
2017-04-06 18:18:43 +00:00
// this is a generic SSL failure that in some cases (e.g., certificate error) is permanent but also appears in transient cases, so we consider it retryable
| | code = = CURLE_SSL_CONNECT_ERROR
2017-02-28 13:03:43 +00:00
# if LIBCURL_VERSION_NUM >= 0x073200
2017-03-31 13:50:11 +00:00
| | code = = CURLE_HTTP2
2017-02-28 13:03:43 +00:00
| | code = = CURLE_HTTP2_STREAM
# endif
) ? Transient :
2016-09-14 14:00:40 +00:00
Misc ;
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
attempt + + ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
auto exc =
2016-09-16 16:54:14 +00:00
code = = CURLE_ABORTED_BY_CALLBACK & & _isInterrupted
2016-11-25 23:37:43 +00:00
? DownloadError ( Interrupted , format ( " download of ‘ %s’ was interrupted " ) % request . uri )
2016-09-16 16:54:14 +00:00
: httpStatus ! = 0
2017-01-24 19:13:19 +00:00
? DownloadError ( err , format ( " unable to download ‘ %s’ : HTTP error %d (curl error: %s) " ) % request . uri % httpStatus % curl_easy_strerror ( code ) )
2016-11-25 23:37:43 +00:00
: DownloadError ( err , format ( " unable to download ‘ %s’ : %s (%d) " ) % request . uri % curl_easy_strerror ( code ) % code ) ;
2016-09-14 14:00:40 +00:00
/* If this is a transient error, then maybe retry the
download after a while . */
if ( err = = Transient & & attempt < request . tries ) {
int ms = request . baseRetryTimeMs * std : : pow ( 2.0f , attempt - 1 + std : : uniform_real_distribution < > ( 0.0 , 0.5 ) ( downloader . mt19937 ) ) ;
2016-09-21 14:11:01 +00:00
printError ( format ( " warning: %s; retrying in %d ms " ) % exc . what ( ) % ms ) ;
2016-09-14 14:00:40 +00:00
embargo = std : : chrono : : steady_clock : : now ( ) + std : : chrono : : milliseconds ( ms ) ;
downloader . enqueueItem ( shared_from_this ( ) ) ;
}
else
fail ( exc ) ;
}
}
} ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
struct State
{
2016-10-17 21:11:04 +00:00
struct EmbargoComparator {
bool operator ( ) ( const std : : shared_ptr < DownloadItem > & i1 , const std : : shared_ptr < DownloadItem > & i2 ) {
return i1 - > embargo > i2 - > embargo ;
}
} ;
2016-09-14 14:00:40 +00:00
bool quit = false ;
2016-10-17 21:11:04 +00:00
std : : priority_queue < std : : shared_ptr < DownloadItem > , std : : vector < std : : shared_ptr < DownloadItem > > , EmbargoComparator > incoming ;
2016-09-14 14:00:40 +00:00
} ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
Sync < State > state_ ;
2015-05-05 12:39:48 +00:00
2016-09-14 14:00:40 +00:00
/* We can't use a std::condition_variable to wake up the curl
thread , because it only monitors file descriptors . So use a
pipe instead . */
Pipe wakeupPipe ;
2016-02-22 12:13:19 +00:00
2016-09-14 14:00:40 +00:00
std : : thread workerThread ;
2015-10-21 13:03:29 +00:00
2016-09-14 14:00:40 +00:00
CurlDownloader ( )
2016-10-05 19:29:18 +00:00
: mt19937 ( rd ( ) )
2016-09-14 14:00:40 +00:00
{
static std : : once_flag globalInit ;
std : : call_once ( globalInit , curl_global_init , CURL_GLOBAL_ALL ) ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
curlm = curl_multi_init ( ) ;
2015-10-21 12:59:01 +00:00
2016-10-14 12:44:15 +00:00
# if LIBCURL_VERSION_NUM >= 0x072b00 // correct?
2016-09-14 14:00:40 +00:00
curl_multi_setopt ( curlm , CURLMOPT_PIPELINING , CURLPIPE_MULTIPLEX ) ;
2016-10-04 12:43:23 +00:00
# endif
2016-09-14 14:34:37 +00:00
curl_multi_setopt ( curlm , CURLMOPT_MAX_TOTAL_CONNECTIONS ,
settings . get ( " binary-caches-parallel-connections " , 25 ) ) ;
enableHttp2 = settings . get ( " enable-http2 " , true ) ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
wakeupPipe . create ( ) ;
fcntl ( wakeupPipe . readSide . get ( ) , F_SETFL , O_NONBLOCK ) ;
workerThread = std : : thread ( [ & ] ( ) { workerThreadEntry ( ) ; } ) ;
}
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
~ CurlDownloader ( )
2017-01-17 17:21:02 +00:00
{
stopWorkerThread ( ) ;
workerThread . join ( ) ;
if ( curlm ) curl_multi_cleanup ( curlm ) ;
}
void stopWorkerThread ( )
2016-09-14 14:00:40 +00:00
{
/* Signal the worker thread to exit. */
{
auto state ( state_ . lock ( ) ) ;
state - > quit = true ;
2015-04-09 10:12:50 +00:00
}
2017-01-17 17:21:02 +00:00
writeFull ( wakeupPipe . writeSide . get ( ) , " " , false ) ;
2016-09-14 14:00:40 +00:00
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
void workerThreadMain ( )
{
2017-01-17 17:21:02 +00:00
/* Cause this thread to be notified on SIGINT. */
auto callback = createInterruptCallback ( [ & ] ( ) {
stopWorkerThread ( ) ;
} ) ;
2016-09-14 14:00:40 +00:00
std : : map < CURL * , std : : shared_ptr < DownloadItem > > items ;
2016-10-19 13:02:38 +00:00
bool quit = false ;
2016-09-14 14:00:40 +00:00
std : : chrono : : steady_clock : : time_point nextWakeup ;
while ( ! quit ) {
checkInterrupt ( ) ;
/* Let curl do its thing. */
int running ;
CURLMcode mc = curl_multi_perform ( curlm , & running ) ;
if ( mc ! = CURLM_OK )
throw nix : : Error ( format ( " unexpected error from curl_multi_perform(): %s " ) % curl_multi_strerror ( mc ) ) ;
/* Set the promises of any finished requests. */
CURLMsg * msg ;
int left ;
while ( ( msg = curl_multi_info_read ( curlm , & left ) ) ) {
if ( msg - > msg = = CURLMSG_DONE ) {
auto i = items . find ( msg - > easy_handle ) ;
assert ( i ! = items . end ( ) ) ;
i - > second - > finish ( msg - > data . result ) ;
curl_multi_remove_handle ( curlm , i - > second - > req ) ;
i - > second - > active = false ;
items . erase ( i ) ;
}
}
2015-10-07 15:31:50 +00:00
2016-09-14 14:00:40 +00:00
/* Wait for activity, including wakeup events. */
int numfds = 0 ;
struct curl_waitfd extraFDs [ 1 ] ;
extraFDs [ 0 ] . fd = wakeupPipe . readSide . get ( ) ;
extraFDs [ 0 ] . events = CURL_WAIT_POLLIN ;
extraFDs [ 0 ] . revents = 0 ;
auto sleepTimeMs =
nextWakeup ! = std : : chrono : : steady_clock : : time_point ( )
? std : : max ( 0 , ( int ) std : : chrono : : duration_cast < std : : chrono : : milliseconds > ( nextWakeup - std : : chrono : : steady_clock : : now ( ) ) . count ( ) )
: 1000000000 ;
2017-01-24 12:15:24 +00:00
vomit ( " download thread waiting for %d ms " , sleepTimeMs ) ;
2016-09-14 14:00:40 +00:00
mc = curl_multi_wait ( curlm , extraFDs , 1 , sleepTimeMs , & numfds ) ;
if ( mc ! = CURLM_OK )
throw nix : : Error ( format ( " unexpected error from curl_multi_wait(): %s " ) % curl_multi_strerror ( mc ) ) ;
nextWakeup = std : : chrono : : steady_clock : : time_point ( ) ;
/* Add new curl requests from the incoming requests queue,
except for requests that are embargoed ( waiting for a
2016-10-17 21:11:04 +00:00
retry timeout to expire ) . */
2016-09-14 14:00:40 +00:00
if ( extraFDs [ 0 ] . revents & CURL_WAIT_POLLIN ) {
char buf [ 1024 ] ;
auto res = read ( extraFDs [ 0 ] . fd , buf , sizeof ( buf ) ) ;
if ( res = = - 1 & & errno ! = EINTR )
throw SysError ( " reading curl wakeup socket " ) ;
}
2015-04-09 10:12:50 +00:00
2016-10-17 21:11:04 +00:00
std : : vector < std : : shared_ptr < DownloadItem > > incoming ;
2016-09-14 14:00:40 +00:00
auto now = std : : chrono : : steady_clock : : now ( ) ;
{
auto state ( state_ . lock ( ) ) ;
2016-10-17 21:11:04 +00:00
while ( ! state - > incoming . empty ( ) ) {
auto item = state - > incoming . top ( ) ;
if ( item - > embargo < = now ) {
2016-09-14 14:00:40 +00:00
incoming . push_back ( item ) ;
2016-10-17 21:11:04 +00:00
state - > incoming . pop ( ) ;
} else {
2016-09-14 14:00:40 +00:00
if ( nextWakeup = = std : : chrono : : steady_clock : : time_point ( )
| | item - > embargo < nextWakeup )
nextWakeup = item - > embargo ;
2016-10-17 21:11:04 +00:00
break ;
2016-09-14 14:00:40 +00:00
}
}
quit = state - > quit ;
}
2016-08-11 15:34:43 +00:00
2016-09-14 14:00:40 +00:00
for ( auto & item : incoming ) {
debug ( format ( " starting download of %s " ) % item - > request . uri ) ;
item - > init ( ) ;
curl_multi_add_handle ( curlm , item - > req ) ;
item - > active = true ;
items [ item - > req ] = item ;
}
}
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
debug ( " download thread shutting down " ) ;
2015-04-09 10:12:50 +00:00
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
void workerThreadEntry ( )
2016-02-29 17:15:20 +00:00
{
2016-09-14 14:00:40 +00:00
try {
workerThreadMain ( ) ;
2016-09-16 16:54:14 +00:00
} catch ( nix : : Interrupted & e ) {
2016-09-14 14:00:40 +00:00
} catch ( std : : exception & e ) {
2016-09-21 14:11:01 +00:00
printError ( format ( " unexpected error in download thread: %s " ) % e . what ( ) ) ;
2016-09-14 14:00:40 +00:00
}
2016-08-10 14:06:33 +00:00
2016-09-14 14:00:40 +00:00
{
auto state ( state_ . lock ( ) ) ;
2016-10-17 21:11:04 +00:00
while ( ! state - > incoming . empty ( ) ) state - > incoming . pop ( ) ;
2016-09-14 14:00:40 +00:00
state - > quit = true ;
2016-08-10 14:06:33 +00:00
}
2016-02-29 17:15:20 +00:00
}
2016-09-14 14:00:40 +00:00
void enqueueItem ( std : : shared_ptr < DownloadItem > item )
{
{
auto state ( state_ . lock ( ) ) ;
if ( state - > quit )
throw nix : : Error ( " cannot enqueue download request because the download thread is shutting down " ) ;
2016-10-17 21:11:04 +00:00
state - > incoming . push ( item ) ;
2016-09-14 14:00:40 +00:00
}
writeFull ( wakeupPipe . writeSide . get ( ) , " " ) ;
}
2016-09-16 16:54:14 +00:00
void enqueueDownload ( const DownloadRequest & request ,
std : : function < void ( const DownloadResult & ) > success ,
std : : function < void ( std : : exception_ptr exc ) > failure ) override
2016-09-14 14:00:40 +00:00
{
2017-02-14 13:20:00 +00:00
/* Ugly hack to support s3:// URIs. */
if ( hasPrefix ( request . uri , " s3:// " ) ) {
// FIXME: do this on a worker thread
2017-02-21 12:15:07 +00:00
sync2async < DownloadResult > ( success , failure , [ & ] ( ) - > DownloadResult {
2017-02-14 13:20:00 +00:00
# ifdef ENABLE_S3
2017-03-03 21:12:17 +00:00
S3Helper s3Helper ( Aws : : Region : : US_EAST_1 ) ; // FIXME: make configurable
2017-02-14 13:20:00 +00:00
auto slash = request . uri . find ( ' / ' , 5 ) ;
if ( slash = = std : : string : : npos )
throw nix : : Error ( " bad S3 URI ‘ %s’ " , request . uri ) ;
std : : string bucketName ( request . uri , 5 , slash - 5 ) ;
std : : string key ( request . uri , slash + 1 ) ;
// FIXME: implement ETag
auto s3Res = s3Helper . getObject ( bucketName , key ) ;
DownloadResult res ;
if ( ! s3Res . data )
throw DownloadError ( NotFound , fmt ( " S3 object ‘ %s’ does not exist " , request . uri ) ) ;
res . data = s3Res . data ;
return res ;
# else
throw nix : : Error ( " cannot download ‘ %s’ because Nix is not built with S3 support " , request . uri ) ;
# endif
} ) ;
return ;
}
2016-09-14 14:00:40 +00:00
auto item = std : : make_shared < DownloadItem > ( * this , request ) ;
2016-09-16 16:54:14 +00:00
item - > success = success ;
item - > failure = failure ;
2016-09-14 14:00:40 +00:00
enqueueItem ( item ) ;
}
2015-04-09 10:12:50 +00:00
} ;
2016-09-14 14:00:40 +00:00
ref < Downloader > getDownloader ( )
{
static std : : shared_ptr < Downloader > downloader ;
static std : : once_flag downloaderCreated ;
std : : call_once ( downloaderCreated , [ & ] ( ) { downloader = makeDownloader ( ) ; } ) ;
return ref < Downloader > ( downloader ) ;
}
2016-02-29 17:15:20 +00:00
ref < Downloader > makeDownloader ( )
2015-04-09 10:12:50 +00:00
{
2016-02-29 17:15:20 +00:00
return make_ref < CurlDownloader > ( ) ;
2015-04-09 10:12:50 +00:00
}
2016-09-16 16:54:14 +00:00
std : : future < DownloadResult > Downloader : : enqueueDownload ( const DownloadRequest & request )
{
auto promise = std : : make_shared < std : : promise < DownloadResult > > ( ) ;
enqueueDownload ( request ,
[ promise ] ( const DownloadResult & result ) { promise - > set_value ( result ) ; } ,
[ promise ] ( std : : exception_ptr exc ) { promise - > set_exception ( exc ) ; } ) ;
return promise - > get_future ( ) ;
}
2016-09-14 14:00:40 +00:00
DownloadResult Downloader : : download ( const DownloadRequest & request )
{
return enqueueDownload ( request ) . get ( ) ;
}
2016-08-31 13:57:56 +00:00
Path Downloader : : downloadCached ( ref < Store > store , const string & url_ , bool unpack , string name , const Hash & expectedHash , string * effectiveUrl )
2015-05-05 15:09:42 +00:00
{
2016-04-14 14:27:48 +00:00
auto url = resolveUri ( url_ ) ;
2016-08-15 11:37:11 +00:00
if ( name = = " " ) {
auto p = url . rfind ( ' / ' ) ;
if ( p ! = string : : npos ) name = string ( url , p + 1 ) ;
}
2016-07-26 19:16:52 +00:00
Path expectedStorePath ;
if ( expectedHash ) {
2016-07-26 19:25:52 +00:00
expectedStorePath = store - > makeFixedOutputPath ( unpack , expectedHash , name ) ;
2016-07-26 19:16:52 +00:00
if ( store - > isValidPath ( expectedStorePath ) )
return expectedStorePath ;
}
2016-04-20 12:12:38 +00:00
Path cacheDir = getCacheDir ( ) + " /nix/tarballs " ;
2015-05-05 15:09:42 +00:00
createDirs ( cacheDir ) ;
string urlHash = printHash32 ( hashString ( htSHA256 , url ) ) ;
Path dataFile = cacheDir + " / " + urlHash + " .info " ;
Path fileLink = cacheDir + " / " + urlHash + " -file " ;
Path storePath ;
string expectedETag ;
int ttl = settings . get ( " tarball-ttl " , 60 * 60 ) ;
bool skip = false ;
if ( pathExists ( fileLink ) & & pathExists ( dataFile ) ) {
storePath = readLink ( fileLink ) ;
store - > addTempRoot ( storePath ) ;
if ( store - > isValidPath ( storePath ) ) {
auto ss = tokenizeString < vector < string > > ( readFile ( dataFile ) , " \n " ) ;
if ( ss . size ( ) > = 3 & & ss [ 0 ] = = url ) {
time_t lastChecked ;
2016-08-11 15:34:43 +00:00
if ( string2Int ( ss [ 2 ] , lastChecked ) & & lastChecked + ttl > = time ( 0 ) ) {
2015-05-05 15:09:42 +00:00
skip = true ;
2016-08-31 13:57:56 +00:00
if ( effectiveUrl )
* effectiveUrl = url_ ;
2016-08-11 15:34:43 +00:00
} else if ( ! ss [ 1 ] . empty ( ) ) {
2016-11-25 23:37:43 +00:00
debug ( format ( " verifying previous ETag ‘ %1%’ " ) % ss [ 1 ] ) ;
2015-05-05 15:09:42 +00:00
expectedETag = ss [ 1 ] ;
}
}
} else
storePath = " " ;
}
if ( ! skip ) {
try {
2016-09-14 14:00:40 +00:00
DownloadRequest request ( url ) ;
request . expectedETag = expectedETag ;
auto res = download ( request ) ;
2016-08-31 13:57:56 +00:00
if ( effectiveUrl )
* effectiveUrl = res . effectiveUrl ;
2015-05-05 15:09:42 +00:00
2016-07-26 19:16:52 +00:00
if ( ! res . cached ) {
ValidPathInfo info ;
StringSink sink ;
dumpString ( * res . data , sink ) ;
Hash hash = hashString ( expectedHash ? expectedHash . type : htSHA256 , * res . data ) ;
2016-07-26 19:25:52 +00:00
info . path = store - > makeFixedOutputPath ( false , hash , name ) ;
2016-07-26 19:16:52 +00:00
info . narHash = hashString ( htSHA256 , * sink . s ) ;
2017-02-22 15:31:04 +00:00
info . ca = makeFixedOutputCA ( false , hash ) ;
2016-10-21 14:50:28 +00:00
store - > addToStore ( info , sink . s , false , true ) ;
2016-07-26 19:16:52 +00:00
storePath = info . path ;
}
2015-05-05 15:09:42 +00:00
assert ( ! storePath . empty ( ) ) ;
replaceSymlink ( storePath , fileLink ) ;
2015-10-29 12:26:55 +00:00
writeFile ( dataFile , url + " \n " + res . etag + " \n " + std : : to_string ( time ( 0 ) ) + " \n " ) ;
2015-05-05 15:09:42 +00:00
} catch ( DownloadError & e ) {
if ( storePath . empty ( ) ) throw ;
2016-09-21 14:11:01 +00:00
printError ( format ( " warning: %1%; using cached result " ) % e . msg ( ) ) ;
2015-05-05 15:09:42 +00:00
}
}
if ( unpack ) {
Path unpackedLink = cacheDir + " / " + baseNameOf ( storePath ) + " -unpacked " ;
Path unpackedStorePath ;
if ( pathExists ( unpackedLink ) ) {
unpackedStorePath = readLink ( unpackedLink ) ;
store - > addTempRoot ( unpackedStorePath ) ;
if ( ! store - > isValidPath ( unpackedStorePath ) )
unpackedStorePath = " " ;
}
if ( unpackedStorePath . empty ( ) ) {
2016-11-25 23:37:43 +00:00
printInfo ( format ( " unpacking ‘ %1%’ ... " ) % url ) ;
2015-05-05 15:09:42 +00:00
Path tmpDir = createTempDir ( ) ;
AutoDelete autoDelete ( tmpDir , true ) ;
2015-06-01 13:14:44 +00:00
// FIXME: this requires GNU tar for decompression.
2017-03-15 13:40:47 +00:00
runProgram ( " tar " , true , { " xf " , storePath , " -C " , tmpDir , " --strip-components " , " 1 " } ) ;
2015-05-05 15:09:42 +00:00
unpackedStorePath = store - > addToStore ( name , tmpDir , true , htSHA256 , defaultPathFilter , false ) ;
}
replaceSymlink ( unpackedStorePath , unpackedLink ) ;
2016-07-26 19:16:52 +00:00
storePath = unpackedStorePath ;
2015-05-05 15:09:42 +00:00
}
2016-07-26 19:16:52 +00:00
if ( expectedStorePath ! = " " & & storePath ! = expectedStorePath )
2016-11-25 23:37:43 +00:00
throw nix : : Error ( format ( " hash mismatch in file downloaded from ‘ %s’ " ) % url ) ;
2016-07-26 19:16:52 +00:00
2015-05-05 15:09:42 +00:00
return storePath ;
}
2015-05-06 12:54:31 +00:00
bool isUri ( const string & s )
{
2016-04-14 14:27:48 +00:00
if ( s . compare ( 0 , 8 , " channel: " ) = = 0 ) return true ;
2015-05-06 12:54:31 +00:00
size_t pos = s . find ( " :// " ) ;
if ( pos = = string : : npos ) return false ;
string scheme ( s , 0 , pos ) ;
2017-02-14 13:20:00 +00:00
return scheme = = " http " | | scheme = = " https " | | scheme = = " file " | | scheme = = " channel " | | scheme = = " git " | | scheme = = " s3 " ;
2015-05-06 12:54:31 +00:00
}
2015-04-09 10:12:50 +00:00
}