From b85d302137ca0c57dc69ccb279829c516410611f Mon Sep 17 00:00:00 2001 From: Rob Vermaas Date: Thu, 6 Jun 2013 10:07:04 +0000 Subject: [PATCH] Upload script binary cache to s3. git-svn-id: https://nixos.org/repos/nix/release/trunk/channels@34735 70bd8c7a-acb8-0310-9f0d-9cc1c95dcdbb --- upload-binary-cache-s3.py | 91 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100755 upload-binary-cache-s3.py diff --git a/upload-binary-cache-s3.py b/upload-binary-cache-s3.py new file mode 100755 index 0000000..92af736 --- /dev/null +++ b/upload-binary-cache-s3.py @@ -0,0 +1,91 @@ +#! /usr/bin/env python +import os +import threading +import sys +import Queue +import random +import subprocess +import urlparse +import boto + +def run_tasks(nr_workers, tasks, worker_fun): + task_queue = Queue.Queue() + result_queue = Queue.Queue() + + nr_tasks = 0 + for t in tasks: task_queue.put(t); nr_tasks = nr_tasks + 1 + + if nr_tasks == 0: return [] + + if nr_workers == -1: nr_workers = nr_tasks + if nr_workers < 1: raise Exception("number of worker threads must be at least 1") + + def thread_fun(): + n = 0 + while True: + try: + t = task_queue.get(False) + except Queue.Empty: + break + n = n + 1 + try: + result_queue.put((worker_fun(t), None, None)) + except Exception as e: + result_queue.put((None, e, sys.exc_info()[2])) + #sys.stderr.write("thread {0} did {1} tasks\n".format(threading.current_thread(), n)) + + threads = [] + for n in range(nr_workers): + thr = threading.Thread(target=thread_fun) + thr.daemon = True + thr.start() + threads.append(thr) + + results = [] + while len(results) < nr_tasks: + try: + # Use a timeout to allow keyboard interrupts to be + # processed. The actual timeout value doesn't matter. + (res, exc, tb) = result_queue.get(True, 1000) + except Queue.Empty: + continue + if exc: + raise exc, None, tb + results.append(res) + + for thr in threads: + thr.join() + + return results + +if len(sys.argv) != 3: + print 'Usage: upload-s3.py ' + sys.exit(1) + +local_dir = sys.argv[1] +bucket_name = sys.argv[2] + +files = [ "{0}/{1}".format(root, f) for root, _, files in os.walk(local_dir) if files != [] for f in files] +files = sorted(files, key=os.path.getsize) + +files = [ (i, f) for i, f in enumerate(files) ] +total = len(files) + +__lock__ = threading.Lock() + +conn = boto.connect_s3() +bucket = boto.connect_s3().get_bucket(bucket_name) + +def upload(t): + (i, local_file) = t + remote_file = local_file.replace(local_dir+'/','') + if i % 1000 == 0: + with __lock__: + sys.stderr.write("{0}/{1}\n".format(i, total)) + + if (bucket.get_key(remote_file) is None) and not (".tmp" in remote_file): + with __lock__: + sys.stderr.write("Uploading {0}: {1} -> {2}\n".format(i, local_file, remote_file)) + subprocess.call(["s3cmd", "put", local_file, "s3://{0}/{1}".format(bucket_name,remote_file)]) + +run_tasks(15, files, upload)