Code

[PATCH] Parallelize pulling by ssh
authorbarkalow@iabervon.org <barkalow@iabervon.org>
Tue, 2 Aug 2005 23:46:29 +0000 (19:46 -0400)
committerJunio C Hamano <junkio@cox.net>
Wed, 3 Aug 2005 05:53:11 +0000 (22:53 -0700)
This causes ssh-pull to request objects in prefetch() and read then in
fetch(), such that it reduces the unpipelined round-trip time.

This also makes sha1_write_from_fd() support having a buffer of data
which it accidentally read from the fd after the object; this was
formerly not a problem, because it would always get a short read at
the end of an object, because the next object had not been
requested. This is no longer true.

Signed-off-by: Daniel Barkalow <barkalow@iabervon.org>
Signed-off-by: Junio C Hamano <junkio@cox.net>
cache.h
sha1_file.c
ssh-pull.c

diff --git a/cache.h b/cache.h
index 381db3584ef5d5610f3f9e26cb6b643b457bdc63..1b49f0f4c3c5bef6e9fe6dcf68b117880c1a956a 100644 (file)
--- a/cache.h
+++ b/cache.h
@@ -198,7 +198,8 @@ extern int check_sha1_signature(const unsigned char *sha1, void *buf, unsigned l
 /* Read a tree into the cache */
 extern int read_tree(void *buffer, unsigned long size, int stage, const char **paths);
 
-extern int write_sha1_from_fd(const unsigned char *sha1, int fd);
+extern int write_sha1_from_fd(const unsigned char *sha1, int fd, char *buffer,
+                             size_t bufsize, size_t *bufposn);
 extern int write_sha1_to_fd(int fd, const unsigned char *sha1);
 
 extern int has_sha1_pack(const unsigned char *sha1);
index e808c91316975e08a0932887dabe2d10dab852d0..df5eb2ac670e2969fcb611148929fef68e3747cc 100644 (file)
@@ -1389,14 +1389,14 @@ int write_sha1_to_fd(int fd, const unsigned char *sha1)
        return 0;
 }
 
-int write_sha1_from_fd(const unsigned char *sha1, int fd)
+int write_sha1_from_fd(const unsigned char *sha1, int fd, char *buffer,
+                      size_t bufsize, size_t *bufposn)
 {
        char *filename = sha1_file_name(sha1);
 
        int local;
        z_stream stream;
        unsigned char real_sha1[20];
-       unsigned char buf[4096];
        unsigned char discard[4096];
        int ret;
        SHA_CTX c;
@@ -1414,7 +1414,24 @@ int write_sha1_from_fd(const unsigned char *sha1, int fd)
 
        do {
                ssize_t size;
-               size = read(fd, buf, 4096);
+               if (*bufposn) {
+                       stream.avail_in = *bufposn;
+                       stream.next_in = buffer;
+                       do {
+                               stream.next_out = discard;
+                               stream.avail_out = sizeof(discard);
+                               ret = inflate(&stream, Z_SYNC_FLUSH);
+                               SHA1_Update(&c, discard, sizeof(discard) -
+                                           stream.avail_out);
+                       } while (stream.avail_in && ret == Z_OK);
+                       write(local, buffer, *bufposn - stream.avail_in);
+                       memmove(buffer, buffer + *bufposn - stream.avail_in,
+                               stream.avail_in);
+                       *bufposn = stream.avail_in;
+                       if (ret != Z_OK)
+                               break;
+               }
+               size = read(fd, buffer + *bufposn, bufsize - *bufposn);
                if (size <= 0) {
                        close(local);
                        unlink(filename);
@@ -1423,18 +1440,8 @@ int write_sha1_from_fd(const unsigned char *sha1, int fd)
                        perror("Reading from connection");
                        return -1;
                }
-               write(local, buf, size);
-               stream.avail_in = size;
-               stream.next_in = buf;
-               do {
-                       stream.next_out = discard;
-                       stream.avail_out = sizeof(discard);
-                       ret = inflate(&stream, Z_SYNC_FLUSH);
-                       SHA1_Update(&c, discard, sizeof(discard) -
-                                   stream.avail_out);
-               } while (stream.avail_in && ret == Z_OK);
-               
-       } while (ret == Z_OK);
+               *bufposn += size;
+       } while (1);
        inflateEnd(&stream);
 
        close(local);
index 4cf9b6a1677542c74c99e88281417e3cc0d52bb1..bdc99dfdc532755622e9cfc5688ee0bba9e705de 100644 (file)
@@ -10,24 +10,49 @@ static int fd_out;
 static unsigned char remote_version = 0;
 static unsigned char local_version = 1;
 
+ssize_t force_write(int fd, void *buffer, size_t length)
+{
+       ssize_t ret = 0;
+       while (ret < length) {
+               ssize_t size = write(fd, buffer + ret, length - ret);
+               if (size < 0) {
+                       return size;
+               }
+               if (size == 0) {
+                       return ret;
+               }
+               ret += size;
+       }
+       return ret;
+}
+
 void prefetch(unsigned char *sha1)
 {
+       char type = 'o';
+       force_write(fd_out, &type, 1);
+       force_write(fd_out, sha1, 20);
+       //memcpy(requested + 20 * prefetches++, sha1, 20);
 }
 
+static char conn_buf[4096];
+static size_t conn_buf_posn = 0;
+
 int fetch(unsigned char *sha1)
 {
        int ret;
        signed char remote;
-       char type = 'o';
-       if (has_sha1_file(sha1))
-               return 0;
-       write(fd_out, &type, 1);
-       write(fd_out, sha1, 20);
-       if (read(fd_in, &remote, 1) < 1)
-               return -1;
+
+       if (conn_buf_posn) {
+               remote = conn_buf[0];
+               memmove(conn_buf, conn_buf + 1, --conn_buf_posn);
+       } else {
+               if (read(fd_in, &remote, 1) < 1)
+                       return -1;
+       }
+       //fprintf(stderr, "Got %d\n", remote);
        if (remote < 0)
                return remote;
-       ret = write_sha1_from_fd(sha1, fd_in);
+       ret = write_sha1_from_fd(sha1, fd_in, conn_buf, 4096, &conn_buf_posn);
        if (!ret)
                pull_say("got %s\n", sha1_to_hex(sha1));
        return ret;