From bc109648c41f8021707b55b815e68a890a09f2f6 Mon Sep 17 00:00:00 2001
From: John Ericson <John.Ericson@Obsidian.Systems>
Date: Wed, 15 Jul 2020 23:14:30 +0000
Subject: [PATCH] Get rid of `LocalStore::addToStoreCommon`

I got it to just become `LocalStore::addToStoreFromDump`, cleanly taking
a store and then doing nothing too fancy with it.

`LocalStore::addToStore(...Path...)` is now just a simple wrapper with a
bare-bones sinkToSource of the right dump command.
---
 src/libstore/local-store.cc | 93 ++++++++++++++++---------------------
 src/libstore/local-store.hh |  4 --
 src/libutil/serialise.cc    | 13 ++++++
 src/libutil/serialise.hh    | 15 +++++-
 4 files changed, 67 insertions(+), 58 deletions(-)

diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc
index b9fae6089..07e1679da 100644
--- a/src/libstore/local-store.cc
+++ b/src/libstore/local-store.cc
@@ -1033,38 +1033,22 @@ void LocalStore::addToStore(const ValidPathInfo & info, Source & source,
 }
 
 
-StorePath LocalStore::addToStoreFromDump(Source & dump, const string & name,
-    FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
-{
-    return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & wanted) {
-        while (1) {
-            constexpr size_t bufSize = 1024;
-            uint8_t buf[bufSize];
-            auto n = dump.read(buf, std::min(wanted, bufSize));
-            sink(buf, n);
-            // when control is yielded back to us wanted will be updated.
-        }
-    });
-}
-
-
 StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
     FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
 {
     Path srcPath(absPath(_srcPath));
-
-    return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & _) {
+    auto source = sinkToSource([&](Sink & sink, size_t & wanted) {
         if (method == FileIngestionMethod::Recursive)
             dumpPath(srcPath, sink, filter);
         else
             readFile(srcPath, sink);
     });
+    return addToStoreFromDump(*source, name, method, hashAlgo, repair);
 }
 
 
-StorePath LocalStore::addToStoreCommon(
-    const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
-    std::function<void(Sink &, size_t &)> demux)
+StorePath LocalStore::addToStoreFromDump(Source & source, const string & name,
+    FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
 {
     /* For computing the store path. */
     auto hashSink = std::make_unique<HashSink>(hashAlgo);
@@ -1075,50 +1059,53 @@ StorePath LocalStore::addToStoreCommon(
        destination store path is already valid, we just delete the
        temporary path. Otherwise, we move it to the destination store
        path. */
-    bool inMemory = true;
+    bool inMemory = false;
+
     std::string dump;
 
-    auto source = sinkToSource([&](Sink & sink, size_t & wanted) {
-        LambdaSink sink2([&](const unsigned char * buf, size_t len) {
-            (*hashSink)(buf, len);
-
-            if (inMemory) {
-                if (dump.size() + len > settings.narBufferSize) {
-                    inMemory = false;
-                    sink << 1;
-                    sink((const unsigned char *) dump.data(), dump.size());
-                    dump.clear();
-                } else {
-                    dump.append((const char *) buf, len);
-                }
-            }
-
-            if (!inMemory) sink(buf, len);
-        });
-        demux(sink2, wanted);
-    });
+    /* Fill out buffer, and decide whether we are working strictly in
+       memory based on whether we break out because the buffer is full
+       or the original source is empty */
+    while (dump.size() < settings.narBufferSize) {
+        auto oldSize = dump.size();
+        constexpr size_t chunkSize = 1024;
+        auto want = std::min(chunkSize, settings.narBufferSize - oldSize);
+        dump.resize(oldSize + want);
+        auto got = 0;
+        try {
+            got = source.read((uint8_t *) dump.data() + oldSize, want);
+        } catch (EndOfFile &) {
+            inMemory = true;
+            break;
+        }
+        /* Start hashing as we get data */
+        (*hashSink)((const uint8_t *) dump.data() + oldSize, got);
+        dump.resize(oldSize + got);
+    }
 
     std::unique_ptr<AutoDelete> delTempDir;
     Path tempPath;
 
-    try {
-        /* Wait for the source coroutine to give us some dummy
-           data. This is so that we don't create the temporary
-           directory if the NAR fits in memory. */
-        readInt(*source);
+    if (!inMemory) {
+        StringSource dumpSource { dump };
+        TeeSource rest { source, *hashSink };
+        ChainSource bothSource {
+            .source1 = dumpSource,
+            /* Continue hashing what's left, but don't rehash what we
+               already did. */
+            .source2 = rest,
+        };
 
         auto tempDir = createTempDir(realStoreDir, "add");
         delTempDir = std::make_unique<AutoDelete>(tempDir);
         tempPath = tempDir + "/x";
 
         if (method == FileIngestionMethod::Recursive)
-            restorePath(tempPath, *source);
+            restorePath(tempPath, bothSource);
         else
-            writeFile(tempPath, *source);
+            writeFile(tempPath, bothSource);
 
-    } catch (EndOfFile &) {
-        if (!inMemory) throw;
-        /* The NAR fits in memory, so we didn't do restorePath(). */
+        dump.clear();
     }
 
     auto [hash, size] = hashSink->finish();
@@ -1143,12 +1130,12 @@ StorePath LocalStore::addToStoreCommon(
             autoGC();
 
             if (inMemory) {
+                 StringSource dumpSource { dump };
                 /* Restore from the NAR in memory. */
-                StringSource source(dump);
                 if (method == FileIngestionMethod::Recursive)
-                    restorePath(realPath, source);
+                    restorePath(realPath, dumpSource);
                 else
-                    writeFile(realPath, source);
+                    writeFile(realPath, dumpSource);
             } else {
                 /* Move the temporary path we restored above. */
                 if (rename(tempPath.c_str(), realPath.c_str()))
diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh
index ae23004c4..355c2814f 100644
--- a/src/libstore/local-store.hh
+++ b/src/libstore/local-store.hh
@@ -290,10 +290,6 @@ private:
        specified by the ‘secret-key-files’ option. */
     void signPathInfo(ValidPathInfo & info);
 
-    StorePath addToStoreCommon(
-        const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
-        std::function<void(Sink &, size_t &)> demux);
-
     Path getRealStoreDir() override { return realStoreDir; }
 
     void createUser(const std::string & userName, uid_t userId) override;
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 141e9e976..4c72dc9f2 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -329,5 +329,18 @@ void StringSink::operator () (const unsigned char * data, size_t len)
     s->append((const char *) data, len);
 }
 
+size_t ChainSource::read(unsigned char * data, size_t len)
+{
+    if (useSecond) {
+        return source2.read(data, len);
+    } else {
+        try {
+            return source1.read(data, len);
+        } catch (EndOfFile &) {
+            useSecond = true;
+            return this->read(data, len);
+        }
+    }
+}
 
 }
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 6cb9d1bf5..3e3735ca5 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -256,6 +256,19 @@ struct LambdaSource : Source
     }
 };
 
+/* Chain two sources together so after the first is exhausted, the second is
+   used */
+struct ChainSource : Source
+{
+    Source & source1, & source2;
+    bool useSecond = false;
+    ChainSource(Source & s1, Source & s2)
+        : source1(s1), source2(s2)
+    { }
+
+    size_t read(unsigned char * data, size_t len) override;
+};
+
 
 /* Convert a function that feeds data into a Sink into a Source. The
    Source executes the function as a coroutine. */
@@ -271,7 +284,7 @@ static inline std::unique_ptr<Source> sinkToSource(
         throw EndOfFile("coroutine has finished");
     })
 {
-	return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof);
+    return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof);
 }