2015-04-09 10:12:50 +00:00
# include "download.hh"
# include "util.hh"
# include "globals.hh"
2015-05-05 15:09:42 +00:00
# include "hash.hh"
# include "store-api.hh"
2016-07-26 19:16:52 +00:00
# include "archive.hh"
2017-02-14 13:20:00 +00:00
# include "s3.hh"
2017-03-13 13:40:15 +00:00
# include "compression.hh"
2017-07-11 21:11:35 +00:00
# include "pathlocks.hh"
2017-03-13 13:40:15 +00:00
2017-03-03 21:12:17 +00:00
# ifdef ENABLE_S3
# include <aws/core/client/ClientConfiguration.h>
# endif
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
# include <unistd.h>
# include <fcntl.h>
2015-04-09 10:12:50 +00:00
# include <curl/curl.h>
2016-10-17 21:11:04 +00:00
# include <queue>
2015-10-07 15:31:50 +00:00
# include <iostream>
2016-08-10 14:06:33 +00:00
# include <thread>
2016-09-14 14:00:40 +00:00
# include <cmath>
# include <random>
2015-10-07 15:31:50 +00:00
2015-04-09 10:12:50 +00:00
namespace nix {
2015-10-07 15:31:50 +00:00
double getTime ( )
{
struct timeval tv ;
gettimeofday ( & tv , 0 ) ;
return tv . tv_sec + ( tv . tv_usec / 1000000.0 ) ;
}
2016-04-14 14:27:48 +00:00
std : : string resolveUri ( const std : : string & uri )
{
if ( uri . compare ( 0 , 8 , " channel: " ) = = 0 )
return " https://nixos.org/channels/ " + std : : string ( uri , 8 ) + " /nixexprs.tar.xz " ;
else
return uri ;
}
2017-03-14 14:03:53 +00:00
ref < std : : string > decodeContent ( const std : : string & encoding , ref < std : : string > data )
{
if ( encoding = = " " )
return data ;
else if ( encoding = = " br " )
return decompress ( encoding , * data ) ;
else
2017-07-30 11:27:57 +00:00
throw Error ( " unsupported Content-Encoding '%s' " , encoding ) ;
2017-03-14 14:03:53 +00:00
}
2016-02-29 17:15:20 +00:00
struct CurlDownloader : public Downloader
2015-04-09 10:12:50 +00:00
{
2016-09-14 14:00:40 +00:00
CURLM * curlm = 0 ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
std : : random_device rd ;
std : : mt19937 mt19937 ;
2015-10-07 15:31:50 +00:00
2016-09-14 14:34:37 +00:00
bool enableHttp2 ;
2016-09-14 14:00:40 +00:00
struct DownloadItem : public std : : enable_shared_from_this < DownloadItem >
2015-04-09 10:12:50 +00:00
{
2016-09-14 14:00:40 +00:00
CurlDownloader & downloader ;
DownloadRequest request ;
DownloadResult result ;
2017-05-16 14:09:57 +00:00
Activity act ;
2016-09-16 16:54:14 +00:00
bool done = false ; // whether either the success or failure function has been called
std : : function < void ( const DownloadResult & ) > success ;
std : : function < void ( std : : exception_ptr exc ) > failure ;
2016-09-14 14:00:40 +00:00
CURL * req = 0 ;
bool active = false ; // whether the handle has been added to the multi object
std : : string status ;
unsigned int attempt = 0 ;
/* Don't start this download until the specified time point
has been reached . */
std : : chrono : : steady_clock : : time_point embargo ;
struct curl_slist * requestHeaders = 0 ;
2017-03-13 13:40:15 +00:00
std : : string encoding ;
2016-09-14 14:00:40 +00:00
DownloadItem ( CurlDownloader & downloader , const DownloadRequest & request )
2017-08-14 18:14:55 +00:00
: downloader ( downloader )
, request ( request )
2017-08-25 15:49:40 +00:00
, act ( * logger , actDownload , fmt ( " downloading '%s' " , request . uri ) , { } , request . parentAct )
2016-09-14 14:00:40 +00:00
{
if ( ! request . expectedETag . empty ( ) )
requestHeaders = curl_slist_append ( requestHeaders , ( " If-None-Match: " + request . expectedETag ) . c_str ( ) ) ;
}
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
~ DownloadItem ( )
{
if ( req ) {
if ( active )
curl_multi_remove_handle ( downloader . curlm , req ) ;
curl_easy_cleanup ( req ) ;
}
if ( requestHeaders ) curl_slist_free_all ( requestHeaders ) ;
try {
if ( ! done )
2017-07-30 11:27:57 +00:00
fail ( DownloadError ( Interrupted , format ( " download of '%s' was interrupted " ) % request . uri ) ) ;
2016-09-14 14:00:40 +00:00
} catch ( . . . ) {
ignoreException ( ) ;
}
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
template < class T >
void fail ( const T & e )
{
2016-09-16 16:54:14 +00:00
assert ( ! done ) ;
2016-09-14 14:00:40 +00:00
done = true ;
2016-09-20 15:49:31 +00:00
callFailure ( failure , std : : make_exception_ptr ( e ) ) ;
2016-09-14 14:00:40 +00:00
}
size_t writeCallback ( void * contents , size_t size , size_t nmemb )
{
size_t realSize = size * nmemb ;
result . data - > append ( ( char * ) contents , realSize ) ;
return realSize ;
}
static size_t writeCallbackWrapper ( void * contents , size_t size , size_t nmemb , void * userp )
{
return ( ( DownloadItem * ) userp ) - > writeCallback ( contents , size , nmemb ) ;
}
size_t headerCallback ( void * contents , size_t size , size_t nmemb )
{
size_t realSize = size * nmemb ;
std : : string line ( ( char * ) contents , realSize ) ;
2017-07-30 11:27:57 +00:00
printMsg ( lvlVomit , format ( " got header for '%s': %s " ) % request . uri % trim ( line ) ) ;
2016-09-14 14:00:40 +00:00
if ( line . compare ( 0 , 5 , " HTTP/ " ) = = 0 ) { // new response starts
result . etag = " " ;
auto ss = tokenizeString < vector < string > > ( line , " " ) ;
status = ss . size ( ) > = 2 ? ss [ 1 ] : " " ;
result . data = std : : make_shared < std : : string > ( ) ;
2017-03-13 13:40:15 +00:00
encoding = " " ;
2016-09-14 14:00:40 +00:00
} else {
auto i = line . find ( ' : ' ) ;
if ( i ! = string : : npos ) {
string name = toLower ( trim ( string ( line , 0 , i ) ) ) ;
if ( name = = " etag " ) {
result . etag = trim ( string ( line , i + 1 ) ) ;
/* Hack to work around a GitHub bug: it sends
ETags , but ignores If - None - Match . So if we get
the expected ETag on a 200 response , then shut
down the connection because we already have the
data . */
if ( result . etag = = request . expectedETag & & status = = " 200 " ) {
debug ( format ( " shutting down on 200 HTTP response with expected ETag " ) ) ;
return 0 ;
}
2017-03-13 13:40:15 +00:00
} else if ( name = = " content-encoding " )
encoding = trim ( string ( line , i + 1 ) ) ; ;
2015-04-09 10:12:50 +00:00
}
}
2016-09-14 14:00:40 +00:00
return realSize ;
2015-04-09 10:12:50 +00:00
}
2016-09-14 14:00:40 +00:00
static size_t headerCallbackWrapper ( void * contents , size_t size , size_t nmemb , void * userp )
{
return ( ( DownloadItem * ) userp ) - > headerCallback ( contents , size , nmemb ) ;
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
int progressCallback ( double dltotal , double dlnow )
{
2017-08-14 18:14:55 +00:00
act . progress ( dlnow , dltotal ) ;
2016-09-14 14:00:40 +00:00
return _isInterrupted ;
}
static int progressCallbackWrapper ( void * userp , double dltotal , double dlnow , double ultotal , double ulnow )
{
return ( ( DownloadItem * ) userp ) - > progressCallback ( dltotal , dlnow ) ;
}
2017-01-24 12:15:24 +00:00
static int debugCallback ( CURL * handle , curl_infotype type , char * data , size_t size , void * userptr )
{
if ( type = = CURLINFO_TEXT )
vomit ( " curl: %s " , chomp ( std : : string ( data , size ) ) ) ;
return 0 ;
}
2016-09-14 14:00:40 +00:00
void init ( )
{
if ( ! req ) req = curl_easy_init ( ) ;
curl_easy_reset ( req ) ;
2017-01-24 12:15:24 +00:00
if ( verbosity > = lvlVomit ) {
curl_easy_setopt ( req , CURLOPT_VERBOSE , 1 ) ;
curl_easy_setopt ( req , CURLOPT_DEBUGFUNCTION , DownloadItem : : debugCallback ) ;
}
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_URL , request . uri . c_str ( ) ) ;
curl_easy_setopt ( req , CURLOPT_FOLLOWLOCATION , 1L ) ;
curl_easy_setopt ( req , CURLOPT_NOSIGNAL , 1 ) ;
2017-05-11 13:09:09 +00:00
curl_easy_setopt ( req , CURLOPT_USERAGENT ,
( " curl/ " LIBCURL_VERSION " Nix/ " + nixVersion +
( settings . userAgentSuffix ! = " " ? " " + settings . userAgentSuffix . get ( ) : " " ) ) . c_str ( ) ) ;
2016-10-14 12:44:15 +00:00
# if LIBCURL_VERSION_NUM >= 0x072b00
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_PIPEWAIT , 1 ) ;
2016-10-04 12:43:23 +00:00
# endif
2016-10-14 12:44:15 +00:00
# if LIBCURL_VERSION_NUM >= 0x072f00
2016-09-14 14:34:37 +00:00
if ( downloader . enableHttp2 )
curl_easy_setopt ( req , CURLOPT_HTTP_VERSION , CURL_HTTP_VERSION_2TLS ) ;
2016-10-04 12:43:23 +00:00
# endif
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_WRITEFUNCTION , DownloadItem : : writeCallbackWrapper ) ;
curl_easy_setopt ( req , CURLOPT_WRITEDATA , this ) ;
curl_easy_setopt ( req , CURLOPT_HEADERFUNCTION , DownloadItem : : headerCallbackWrapper ) ;
curl_easy_setopt ( req , CURLOPT_HEADERDATA , this ) ;
curl_easy_setopt ( req , CURLOPT_PROGRESSFUNCTION , progressCallbackWrapper ) ;
curl_easy_setopt ( req , CURLOPT_PROGRESSDATA , this ) ;
curl_easy_setopt ( req , CURLOPT_NOPROGRESS , 0 ) ;
curl_easy_setopt ( req , CURLOPT_HTTPHEADER , requestHeaders ) ;
if ( request . head )
curl_easy_setopt ( req , CURLOPT_NOBODY , 1 ) ;
2017-06-12 14:44:43 +00:00
if ( request . verifyTLS ) {
if ( settings . caFile ! = " " )
curl_easy_setopt ( req , CURLOPT_CAINFO , settings . caFile . c_str ( ) ) ;
} else {
2016-09-14 14:00:40 +00:00
curl_easy_setopt ( req , CURLOPT_SSL_VERIFYPEER , 0 ) ;
curl_easy_setopt ( req , CURLOPT_SSL_VERIFYHOST , 0 ) ;
}
2017-04-19 12:54:52 +00:00
curl_easy_setopt ( req , CURLOPT_CONNECTTIMEOUT , settings . connectTimeout . get ( ) ) ;
2017-02-09 17:16:09 +00:00
/* If no file exist in the specified path, curl continues to work
2017-02-16 13:46:36 +00:00
anyway as if netrc support was disabled . */
2017-04-13 18:53:23 +00:00
curl_easy_setopt ( req , CURLOPT_NETRC_FILE , settings . netrcFile . get ( ) . c_str ( ) ) ;
2017-02-01 12:37:34 +00:00
curl_easy_setopt ( req , CURLOPT_NETRC , CURL_NETRC_OPTIONAL ) ;
2016-09-14 14:00:40 +00:00
result . data = std : : make_shared < std : : string > ( ) ;
2015-10-07 15:31:50 +00:00
}
2015-05-05 12:39:48 +00:00
2016-09-14 14:00:40 +00:00
void finish ( CURLcode code )
{
long httpStatus = 0 ;
curl_easy_getinfo ( req , CURLINFO_RESPONSE_CODE , & httpStatus ) ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
char * effectiveUrlCStr ;
curl_easy_getinfo ( req , CURLINFO_EFFECTIVE_URL , & effectiveUrlCStr ) ;
if ( effectiveUrlCStr )
result . effectiveUrl = effectiveUrlCStr ;
2016-02-29 17:15:20 +00:00
2017-07-30 11:27:57 +00:00
debug ( format ( " finished download of '%s'; curl status = %d, HTTP status = %d, body = %d bytes " )
2016-09-14 14:00:40 +00:00
% request . uri % code % httpStatus % ( result . data ? result . data - > size ( ) : 0 ) ) ;
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
if ( code = = CURLE_WRITE_ERROR & & result . etag = = request . expectedETag ) {
code = CURLE_OK ;
httpStatus = 304 ;
}
if ( code = = CURLE_OK & &
( httpStatus = = 200 | | httpStatus = = 304 | | httpStatus = = 226 /* FTP */ | | httpStatus = = 0 /* other protocol */ ) )
{
result . cached = httpStatus = = 304 ;
done = true ;
2017-03-13 13:40:15 +00:00
try {
2017-03-14 14:03:53 +00:00
result . data = decodeContent ( encoding , ref < std : : string > ( result . data ) ) ;
2017-03-13 13:40:15 +00:00
callSuccess ( success , failure , const_cast < const DownloadResult & > ( result ) ) ;
2017-08-14 18:14:55 +00:00
act . progress ( result . data - > size ( ) , result . data - > size ( ) ) ;
2017-03-13 13:40:15 +00:00
} catch ( . . . ) {
done = true ;
callFailure ( failure , std : : current_exception ( ) ) ;
}
2016-09-14 14:00:40 +00:00
} else {
Error err =
( httpStatus = = 404 | | code = = CURLE_FILE_COULDNT_READ_FILE ) ? NotFound :
httpStatus = = 403 ? Forbidden :
( httpStatus = = 408 | | httpStatus = = 500 | | httpStatus = = 503
| | httpStatus = = 504 | | httpStatus = = 522 | | httpStatus = = 524
2017-02-27 13:36:09 +00:00
| | code = = CURLE_COULDNT_RESOLVE_HOST
| | code = = CURLE_RECV_ERROR
2017-04-10 13:28:44 +00:00
// this seems to occur occasionally for retriable reasons, and shows up in an error like this:
// curl: (23) Failed writing body (315 != 16366)
| | code = = CURLE_WRITE_ERROR
2017-04-06 18:18:43 +00:00
// this is a generic SSL failure that in some cases (e.g., certificate error) is permanent but also appears in transient cases, so we consider it retryable
| | code = = CURLE_SSL_CONNECT_ERROR
2017-02-28 13:03:43 +00:00
# if LIBCURL_VERSION_NUM >= 0x073200
2017-03-31 13:50:11 +00:00
| | code = = CURLE_HTTP2
2017-02-28 13:03:43 +00:00
| | code = = CURLE_HTTP2_STREAM
# endif
) ? Transient :
2016-09-14 14:00:40 +00:00
Misc ;
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
attempt + + ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
auto exc =
2016-09-16 16:54:14 +00:00
code = = CURLE_ABORTED_BY_CALLBACK & & _isInterrupted
2017-07-30 11:27:57 +00:00
? DownloadError ( Interrupted , format ( " download of '%s' was interrupted " ) % request . uri )
2016-09-16 16:54:14 +00:00
: httpStatus ! = 0
2017-07-30 11:27:57 +00:00
? DownloadError ( err , format ( " unable to download '%s': HTTP error %d (curl error: %s) " ) % request . uri % httpStatus % curl_easy_strerror ( code ) )
: DownloadError ( err , format ( " unable to download '%s': %s (%d) " ) % request . uri % curl_easy_strerror ( code ) % code ) ;
2016-09-14 14:00:40 +00:00
/* If this is a transient error, then maybe retry the
download after a while . */
if ( err = = Transient & & attempt < request . tries ) {
int ms = request . baseRetryTimeMs * std : : pow ( 2.0f , attempt - 1 + std : : uniform_real_distribution < > ( 0.0 , 0.5 ) ( downloader . mt19937 ) ) ;
2016-09-21 14:11:01 +00:00
printError ( format ( " warning: %s; retrying in %d ms " ) % exc . what ( ) % ms ) ;
2016-09-14 14:00:40 +00:00
embargo = std : : chrono : : steady_clock : : now ( ) + std : : chrono : : milliseconds ( ms ) ;
downloader . enqueueItem ( shared_from_this ( ) ) ;
}
else
fail ( exc ) ;
}
}
} ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
struct State
{
2016-10-17 21:11:04 +00:00
struct EmbargoComparator {
bool operator ( ) ( const std : : shared_ptr < DownloadItem > & i1 , const std : : shared_ptr < DownloadItem > & i2 ) {
return i1 - > embargo > i2 - > embargo ;
}
} ;
2016-09-14 14:00:40 +00:00
bool quit = false ;
2016-10-17 21:11:04 +00:00
std : : priority_queue < std : : shared_ptr < DownloadItem > , std : : vector < std : : shared_ptr < DownloadItem > > , EmbargoComparator > incoming ;
2016-09-14 14:00:40 +00:00
} ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
Sync < State > state_ ;
2015-05-05 12:39:48 +00:00
2016-09-14 14:00:40 +00:00
/* We can't use a std::condition_variable to wake up the curl
thread , because it only monitors file descriptors . So use a
pipe instead . */
Pipe wakeupPipe ;
2016-02-22 12:13:19 +00:00
2016-09-14 14:00:40 +00:00
std : : thread workerThread ;
2015-10-21 13:03:29 +00:00
2016-09-14 14:00:40 +00:00
CurlDownloader ( )
2016-10-05 19:29:18 +00:00
: mt19937 ( rd ( ) )
2016-09-14 14:00:40 +00:00
{
static std : : once_flag globalInit ;
std : : call_once ( globalInit , curl_global_init , CURL_GLOBAL_ALL ) ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
curlm = curl_multi_init ( ) ;
2015-10-21 12:59:01 +00:00
2016-10-14 12:44:15 +00:00
# if LIBCURL_VERSION_NUM >= 0x072b00 // correct?
2016-09-14 14:00:40 +00:00
curl_multi_setopt ( curlm , CURLMOPT_PIPELINING , CURLPIPE_MULTIPLEX ) ;
2016-10-04 12:43:23 +00:00
# endif
2016-09-14 14:34:37 +00:00
curl_multi_setopt ( curlm , CURLMOPT_MAX_TOTAL_CONNECTIONS ,
2017-04-14 12:42:20 +00:00
settings . binaryCachesParallelConnections . get ( ) ) ;
2016-09-14 14:34:37 +00:00
Explicitly model all settings and fail on unrecognized ones
Previously, the Settings class allowed other code to query for string
properties, which led to a proliferation of code all over the place making
up new options without any sort of central registry of valid options. This
commit pulls all those options back into the central Settings class and
removes the public get() methods, to discourage future abuses like that.
Furthermore, because we know the full set of options ahead of time, we
now fail loudly if someone enters an unrecognized option, thus preventing
subtle typos. With some template fun, we could probably also dump the full
set of options (with documentation, defaults, etc.) to the command line,
but I'm not doing that yet here.
2017-02-22 03:50:18 +00:00
enableHttp2 = settings . enableHttp2 ;
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
wakeupPipe . create ( ) ;
fcntl ( wakeupPipe . readSide . get ( ) , F_SETFL , O_NONBLOCK ) ;
workerThread = std : : thread ( [ & ] ( ) { workerThreadEntry ( ) ; } ) ;
}
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
~ CurlDownloader ( )
2017-01-17 17:21:02 +00:00
{
stopWorkerThread ( ) ;
workerThread . join ( ) ;
if ( curlm ) curl_multi_cleanup ( curlm ) ;
}
void stopWorkerThread ( )
2016-09-14 14:00:40 +00:00
{
/* Signal the worker thread to exit. */
{
auto state ( state_ . lock ( ) ) ;
state - > quit = true ;
2015-04-09 10:12:50 +00:00
}
2017-01-17 17:21:02 +00:00
writeFull ( wakeupPipe . writeSide . get ( ) , " " , false ) ;
2016-09-14 14:00:40 +00:00
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
void workerThreadMain ( )
{
2017-01-17 17:21:02 +00:00
/* Cause this thread to be notified on SIGINT. */
auto callback = createInterruptCallback ( [ & ] ( ) {
stopWorkerThread ( ) ;
} ) ;
2016-09-14 14:00:40 +00:00
std : : map < CURL * , std : : shared_ptr < DownloadItem > > items ;
2016-10-19 13:02:38 +00:00
bool quit = false ;
2016-09-14 14:00:40 +00:00
std : : chrono : : steady_clock : : time_point nextWakeup ;
while ( ! quit ) {
checkInterrupt ( ) ;
/* Let curl do its thing. */
int running ;
CURLMcode mc = curl_multi_perform ( curlm , & running ) ;
if ( mc ! = CURLM_OK )
throw nix : : Error ( format ( " unexpected error from curl_multi_perform(): %s " ) % curl_multi_strerror ( mc ) ) ;
/* Set the promises of any finished requests. */
CURLMsg * msg ;
int left ;
while ( ( msg = curl_multi_info_read ( curlm , & left ) ) ) {
if ( msg - > msg = = CURLMSG_DONE ) {
auto i = items . find ( msg - > easy_handle ) ;
assert ( i ! = items . end ( ) ) ;
i - > second - > finish ( msg - > data . result ) ;
curl_multi_remove_handle ( curlm , i - > second - > req ) ;
i - > second - > active = false ;
items . erase ( i ) ;
}
}
2015-10-07 15:31:50 +00:00
2016-09-14 14:00:40 +00:00
/* Wait for activity, including wakeup events. */
int numfds = 0 ;
struct curl_waitfd extraFDs [ 1 ] ;
extraFDs [ 0 ] . fd = wakeupPipe . readSide . get ( ) ;
extraFDs [ 0 ] . events = CURL_WAIT_POLLIN ;
extraFDs [ 0 ] . revents = 0 ;
auto sleepTimeMs =
nextWakeup ! = std : : chrono : : steady_clock : : time_point ( )
? std : : max ( 0 , ( int ) std : : chrono : : duration_cast < std : : chrono : : milliseconds > ( nextWakeup - std : : chrono : : steady_clock : : now ( ) ) . count ( ) )
: 1000000000 ;
2017-01-24 12:15:24 +00:00
vomit ( " download thread waiting for %d ms " , sleepTimeMs ) ;
2016-09-14 14:00:40 +00:00
mc = curl_multi_wait ( curlm , extraFDs , 1 , sleepTimeMs , & numfds ) ;
if ( mc ! = CURLM_OK )
throw nix : : Error ( format ( " unexpected error from curl_multi_wait(): %s " ) % curl_multi_strerror ( mc ) ) ;
nextWakeup = std : : chrono : : steady_clock : : time_point ( ) ;
/* Add new curl requests from the incoming requests queue,
except for requests that are embargoed ( waiting for a
2016-10-17 21:11:04 +00:00
retry timeout to expire ) . */
2016-09-14 14:00:40 +00:00
if ( extraFDs [ 0 ] . revents & CURL_WAIT_POLLIN ) {
char buf [ 1024 ] ;
auto res = read ( extraFDs [ 0 ] . fd , buf , sizeof ( buf ) ) ;
if ( res = = - 1 & & errno ! = EINTR )
throw SysError ( " reading curl wakeup socket " ) ;
}
2015-04-09 10:12:50 +00:00
2016-10-17 21:11:04 +00:00
std : : vector < std : : shared_ptr < DownloadItem > > incoming ;
2016-09-14 14:00:40 +00:00
auto now = std : : chrono : : steady_clock : : now ( ) ;
{
auto state ( state_ . lock ( ) ) ;
2016-10-17 21:11:04 +00:00
while ( ! state - > incoming . empty ( ) ) {
auto item = state - > incoming . top ( ) ;
if ( item - > embargo < = now ) {
2016-09-14 14:00:40 +00:00
incoming . push_back ( item ) ;
2016-10-17 21:11:04 +00:00
state - > incoming . pop ( ) ;
} else {
2016-09-14 14:00:40 +00:00
if ( nextWakeup = = std : : chrono : : steady_clock : : time_point ( )
| | item - > embargo < nextWakeup )
nextWakeup = item - > embargo ;
2016-10-17 21:11:04 +00:00
break ;
2016-09-14 14:00:40 +00:00
}
}
quit = state - > quit ;
}
2016-08-11 15:34:43 +00:00
2016-09-14 14:00:40 +00:00
for ( auto & item : incoming ) {
debug ( format ( " starting download of %s " ) % item - > request . uri ) ;
item - > init ( ) ;
curl_multi_add_handle ( curlm , item - > req ) ;
item - > active = true ;
items [ item - > req ] = item ;
}
}
2015-04-09 10:12:50 +00:00
2016-09-14 14:00:40 +00:00
debug ( " download thread shutting down " ) ;
2015-04-09 10:12:50 +00:00
}
2016-02-29 17:15:20 +00:00
2016-09-14 14:00:40 +00:00
void workerThreadEntry ( )
2016-02-29 17:15:20 +00:00
{
2016-09-14 14:00:40 +00:00
try {
workerThreadMain ( ) ;
2016-09-16 16:54:14 +00:00
} catch ( nix : : Interrupted & e ) {
2016-09-14 14:00:40 +00:00
} catch ( std : : exception & e ) {
2016-09-21 14:11:01 +00:00
printError ( format ( " unexpected error in download thread: %s " ) % e . what ( ) ) ;
2016-09-14 14:00:40 +00:00
}
2016-08-10 14:06:33 +00:00
2016-09-14 14:00:40 +00:00
{
auto state ( state_ . lock ( ) ) ;
2016-10-17 21:11:04 +00:00
while ( ! state - > incoming . empty ( ) ) state - > incoming . pop ( ) ;
2016-09-14 14:00:40 +00:00
state - > quit = true ;
2016-08-10 14:06:33 +00:00
}
2016-02-29 17:15:20 +00:00
}
2016-09-14 14:00:40 +00:00
void enqueueItem ( std : : shared_ptr < DownloadItem > item )
{
{
auto state ( state_ . lock ( ) ) ;
if ( state - > quit )
throw nix : : Error ( " cannot enqueue download request because the download thread is shutting down " ) ;
2016-10-17 21:11:04 +00:00
state - > incoming . push ( item ) ;
2016-09-14 14:00:40 +00:00
}
writeFull ( wakeupPipe . writeSide . get ( ) , " " ) ;
}
2016-09-16 16:54:14 +00:00
void enqueueDownload ( const DownloadRequest & request ,
std : : function < void ( const DownloadResult & ) > success ,
std : : function < void ( std : : exception_ptr exc ) > failure ) override
2016-09-14 14:00:40 +00:00
{
2017-02-14 13:20:00 +00:00
/* Ugly hack to support s3:// URIs. */
if ( hasPrefix ( request . uri , " s3:// " ) ) {
// FIXME: do this on a worker thread
2017-02-21 12:15:07 +00:00
sync2async < DownloadResult > ( success , failure , [ & ] ( ) - > DownloadResult {
2017-02-14 13:20:00 +00:00
# ifdef ENABLE_S3
2017-03-03 21:12:17 +00:00
S3Helper s3Helper ( Aws : : Region : : US_EAST_1 ) ; // FIXME: make configurable
2017-02-14 13:20:00 +00:00
auto slash = request . uri . find ( ' / ' , 5 ) ;
if ( slash = = std : : string : : npos )
2017-07-30 11:27:57 +00:00
throw nix : : Error ( " bad S3 URI '%s' " , request . uri ) ;
2017-02-14 13:20:00 +00:00
std : : string bucketName ( request . uri , 5 , slash - 5 ) ;
std : : string key ( request . uri , slash + 1 ) ;
// FIXME: implement ETag
auto s3Res = s3Helper . getObject ( bucketName , key ) ;
DownloadResult res ;
if ( ! s3Res . data )
2017-07-30 11:27:57 +00:00
throw DownloadError ( NotFound , fmt ( " S3 object '%s' does not exist " , request . uri ) ) ;
2017-02-14 13:20:00 +00:00
res . data = s3Res . data ;
return res ;
# else
2017-07-30 11:27:57 +00:00
throw nix : : Error ( " cannot download '%s' because Nix is not built with S3 support " , request . uri ) ;
2017-02-14 13:20:00 +00:00
# endif
} ) ;
return ;
}
2016-09-14 14:00:40 +00:00
auto item = std : : make_shared < DownloadItem > ( * this , request ) ;
2016-09-16 16:54:14 +00:00
item - > success = success ;
item - > failure = failure ;
2016-09-14 14:00:40 +00:00
enqueueItem ( item ) ;
}
2015-04-09 10:12:50 +00:00
} ;
2016-09-14 14:00:40 +00:00
ref < Downloader > getDownloader ( )
{
static std : : shared_ptr < Downloader > downloader ;
static std : : once_flag downloaderCreated ;
std : : call_once ( downloaderCreated , [ & ] ( ) { downloader = makeDownloader ( ) ; } ) ;
return ref < Downloader > ( downloader ) ;
}
2016-02-29 17:15:20 +00:00
ref < Downloader > makeDownloader ( )
2015-04-09 10:12:50 +00:00
{
2016-02-29 17:15:20 +00:00
return make_ref < CurlDownloader > ( ) ;
2015-04-09 10:12:50 +00:00
}
2016-09-16 16:54:14 +00:00
std : : future < DownloadResult > Downloader : : enqueueDownload ( const DownloadRequest & request )
{
auto promise = std : : make_shared < std : : promise < DownloadResult > > ( ) ;
enqueueDownload ( request ,
[ promise ] ( const DownloadResult & result ) { promise - > set_value ( result ) ; } ,
[ promise ] ( std : : exception_ptr exc ) { promise - > set_exception ( exc ) ; } ) ;
return promise - > get_future ( ) ;
}
2016-09-14 14:00:40 +00:00
DownloadResult Downloader : : download ( const DownloadRequest & request )
{
return enqueueDownload ( request ) . get ( ) ;
}
2016-08-31 13:57:56 +00:00
Path Downloader : : downloadCached ( ref < Store > store , const string & url_ , bool unpack , string name , const Hash & expectedHash , string * effectiveUrl )
2015-05-05 15:09:42 +00:00
{
2016-04-14 14:27:48 +00:00
auto url = resolveUri ( url_ ) ;
2016-08-15 11:37:11 +00:00
if ( name = = " " ) {
auto p = url . rfind ( ' / ' ) ;
if ( p ! = string : : npos ) name = string ( url , p + 1 ) ;
}
2016-07-26 19:16:52 +00:00
Path expectedStorePath ;
if ( expectedHash ) {
2016-07-26 19:25:52 +00:00
expectedStorePath = store - > makeFixedOutputPath ( unpack , expectedHash , name ) ;
2016-07-26 19:16:52 +00:00
if ( store - > isValidPath ( expectedStorePath ) )
return expectedStorePath ;
}
2016-04-20 12:12:38 +00:00
Path cacheDir = getCacheDir ( ) + " /nix/tarballs " ;
2015-05-05 15:09:42 +00:00
createDirs ( cacheDir ) ;
2017-07-04 12:47:59 +00:00
string urlHash = hashString ( htSHA256 , url ) . to_string ( Base32 , false ) ;
2015-05-05 15:09:42 +00:00
Path dataFile = cacheDir + " / " + urlHash + " .info " ;
Path fileLink = cacheDir + " / " + urlHash + " -file " ;
2017-07-30 11:27:57 +00:00
PathLocks lock ( { fileLink } , fmt ( " waiting for lock on '%1%'... " , fileLink ) ) ;
2017-07-11 21:11:35 +00:00
2015-05-05 15:09:42 +00:00
Path storePath ;
string expectedETag ;
Explicitly model all settings and fail on unrecognized ones
Previously, the Settings class allowed other code to query for string
properties, which led to a proliferation of code all over the place making
up new options without any sort of central registry of valid options. This
commit pulls all those options back into the central Settings class and
removes the public get() methods, to discourage future abuses like that.
Furthermore, because we know the full set of options ahead of time, we
now fail loudly if someone enters an unrecognized option, thus preventing
subtle typos. With some template fun, we could probably also dump the full
set of options (with documentation, defaults, etc.) to the command line,
but I'm not doing that yet here.
2017-02-22 03:50:18 +00:00
int ttl = settings . tarballTtl ;
2015-05-05 15:09:42 +00:00
bool skip = false ;
if ( pathExists ( fileLink ) & & pathExists ( dataFile ) ) {
storePath = readLink ( fileLink ) ;
store - > addTempRoot ( storePath ) ;
if ( store - > isValidPath ( storePath ) ) {
auto ss = tokenizeString < vector < string > > ( readFile ( dataFile ) , " \n " ) ;
if ( ss . size ( ) > = 3 & & ss [ 0 ] = = url ) {
time_t lastChecked ;
2016-08-11 15:34:43 +00:00
if ( string2Int ( ss [ 2 ] , lastChecked ) & & lastChecked + ttl > = time ( 0 ) ) {
2015-05-05 15:09:42 +00:00
skip = true ;
2016-08-31 13:57:56 +00:00
if ( effectiveUrl )
* effectiveUrl = url_ ;
2016-08-11 15:34:43 +00:00
} else if ( ! ss [ 1 ] . empty ( ) ) {
2017-07-30 11:27:57 +00:00
debug ( format ( " verifying previous ETag '%1%' " ) % ss [ 1 ] ) ;
2015-05-05 15:09:42 +00:00
expectedETag = ss [ 1 ] ;
}
}
} else
storePath = " " ;
}
if ( ! skip ) {
try {
2016-09-14 14:00:40 +00:00
DownloadRequest request ( url ) ;
request . expectedETag = expectedETag ;
auto res = download ( request ) ;
2016-08-31 13:57:56 +00:00
if ( effectiveUrl )
* effectiveUrl = res . effectiveUrl ;
2015-05-05 15:09:42 +00:00
2016-07-26 19:16:52 +00:00
if ( ! res . cached ) {
ValidPathInfo info ;
StringSink sink ;
dumpString ( * res . data , sink ) ;
Hash hash = hashString ( expectedHash ? expectedHash . type : htSHA256 , * res . data ) ;
2016-07-26 19:25:52 +00:00
info . path = store - > makeFixedOutputPath ( false , hash , name ) ;
2016-07-26 19:16:52 +00:00
info . narHash = hashString ( htSHA256 , * sink . s ) ;
2017-05-11 11:26:03 +00:00
info . narSize = sink . s - > size ( ) ;
2017-02-22 15:31:04 +00:00
info . ca = makeFixedOutputCA ( false , hash ) ;
2017-06-28 16:11:01 +00:00
store - > addToStore ( info , sink . s , NoRepair , NoCheckSigs ) ;
2016-07-26 19:16:52 +00:00
storePath = info . path ;
}
2015-05-05 15:09:42 +00:00
assert ( ! storePath . empty ( ) ) ;
replaceSymlink ( storePath , fileLink ) ;
2015-10-29 12:26:55 +00:00
writeFile ( dataFile , url + " \n " + res . etag + " \n " + std : : to_string ( time ( 0 ) ) + " \n " ) ;
2015-05-05 15:09:42 +00:00
} catch ( DownloadError & e ) {
if ( storePath . empty ( ) ) throw ;
2016-09-21 14:11:01 +00:00
printError ( format ( " warning: %1%; using cached result " ) % e . msg ( ) ) ;
2015-05-05 15:09:42 +00:00
}
}
if ( unpack ) {
Path unpackedLink = cacheDir + " / " + baseNameOf ( storePath ) + " -unpacked " ;
2017-07-30 11:27:57 +00:00
PathLocks lock2 ( { unpackedLink } , fmt ( " waiting for lock on '%1%'... " , unpackedLink ) ) ;
2015-05-05 15:09:42 +00:00
Path unpackedStorePath ;
if ( pathExists ( unpackedLink ) ) {
unpackedStorePath = readLink ( unpackedLink ) ;
store - > addTempRoot ( unpackedStorePath ) ;
if ( ! store - > isValidPath ( unpackedStorePath ) )
unpackedStorePath = " " ;
}
if ( unpackedStorePath . empty ( ) ) {
2017-07-30 11:27:57 +00:00
printInfo ( format ( " unpacking '%1%'... " ) % url ) ;
2015-05-05 15:09:42 +00:00
Path tmpDir = createTempDir ( ) ;
AutoDelete autoDelete ( tmpDir , true ) ;
2015-06-01 13:14:44 +00:00
// FIXME: this requires GNU tar for decompression.
2017-03-15 13:40:47 +00:00
runProgram ( " tar " , true , { " xf " , storePath , " -C " , tmpDir , " --strip-components " , " 1 " } ) ;
2017-06-28 16:11:01 +00:00
unpackedStorePath = store - > addToStore ( name , tmpDir , true , htSHA256 , defaultPathFilter , NoRepair ) ;
2015-05-05 15:09:42 +00:00
}
replaceSymlink ( unpackedStorePath , unpackedLink ) ;
2016-07-26 19:16:52 +00:00
storePath = unpackedStorePath ;
2015-05-05 15:09:42 +00:00
}
2016-07-26 19:16:52 +00:00
if ( expectedStorePath ! = " " & & storePath ! = expectedStorePath )
2017-07-30 11:27:57 +00:00
throw nix : : Error ( " store path mismatch in file downloaded from '%s' " , url ) ;
2016-07-26 19:16:52 +00:00
2015-05-05 15:09:42 +00:00
return storePath ;
}
2015-05-06 12:54:31 +00:00
bool isUri ( const string & s )
{
2016-04-14 14:27:48 +00:00
if ( s . compare ( 0 , 8 , " channel: " ) = = 0 ) return true ;
2015-05-06 12:54:31 +00:00
size_t pos = s . find ( " :// " ) ;
if ( pos = = string : : npos ) return false ;
string scheme ( s , 0 , pos ) ;
2017-02-14 13:20:00 +00:00
return scheme = = " http " | | scheme = = " https " | | scheme = = " file " | | scheme = = " channel " | | scheme = = " git " | | scheme = = " s3 " ;
2015-05-06 12:54:31 +00:00
}
2015-04-09 10:12:50 +00:00
}