X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=builtin-pack-objects.c;h=d3efeff03f89cc0f4d0da463ddf878c28effb31e;hb=891e85a0c08e12d3f6174d8eb10b4ef284c4b01b;hp=0be539ed7fd9bf95bb40515b560c7615ed318f37;hpb=9c51414f8efb73525a6c69917f1499dab081679d;p=git.git diff --git a/builtin-pack-objects.c b/builtin-pack-objects.c index 0be539ed7..d3efeff03 100644 --- a/builtin-pack-objects.c +++ b/builtin-pack-objects.c @@ -57,7 +57,7 @@ struct object_entry { * nice "minimum seek" order. */ static struct object_entry *objects; -static struct object_entry **written_list; +static struct pack_idx_entry **written_list; static uint32_t nr_objects, nr_alloc, nr_result, nr_written; static int non_empty; @@ -65,8 +65,6 @@ static int no_reuse_delta, no_reuse_object, keep_unreachable; static int local; static int incremental; static int allow_ofs_delta; -static const char *pack_tmp_name, *idx_tmp_name; -static char tmpname[PATH_MAX]; static const char *base_name; static int progress = 1; static int window = 10; @@ -75,7 +73,7 @@ static int depth = 50; static int delta_search_threads = 1; static int pack_to_stdout; static int num_preferred_base; -static struct progress progress_state; +static struct progress *progress_state; static int pack_compression_level = Z_DEFAULT_COMPRESSION; static int pack_compression_seen; @@ -447,7 +445,7 @@ static unsigned long write_object(struct sha1file *f, /* nothing */; deflateEnd(&stream); datalen = stream.total_out; - deflateEnd(&stream); + /* * The object header is a byte of 'type' followed by zero or * more bytes of length. @@ -579,7 +577,7 @@ static off_t write_one(struct sha1file *f, e->idx.offset = 0; return 0; } - written_list[nr_written++] = e; + written_list[nr_written++] = &e->idx; /* make sure off_t is sufficiently large not to wrap */ if (offset > offset + size) @@ -587,12 +585,6 @@ static off_t write_one(struct sha1file *f, return offset + size; } -static int open_object_dir_tmp(const char *path) -{ - snprintf(tmpname, sizeof(tmpname), "%s/%s", get_object_directory(), path); - return xmkstemp(tmpname); -} - /* forward declaration for write_pack_file */ static int adjust_perm(const char *path, mode_t mode); @@ -606,16 +598,21 @@ static void write_pack_file(void) uint32_t nr_remaining = nr_result; if (do_progress) - start_progress(&progress_state, "Writing %u objects...", "", nr_result); - written_list = xmalloc(nr_objects * sizeof(struct object_entry *)); + progress_state = start_progress("Writing objects", nr_result); + written_list = xmalloc(nr_objects * sizeof(*written_list)); do { unsigned char sha1[20]; + char *pack_tmp_name = NULL; if (pack_to_stdout) { - f = sha1fd(1, ""); + f = sha1fd_throughput(1, "", progress_state); } else { - int fd = open_object_dir_tmp("tmp_pack_XXXXXX"); + char tmpname[PATH_MAX]; + int fd; + snprintf(tmpname, sizeof(tmpname), + "%s/tmp_pack_XXXXXX", get_object_directory()); + fd = xmkstemp(tmpname); pack_tmp_name = xstrdup(tmpname); f = sha1fd(fd, pack_tmp_name); } @@ -632,8 +629,7 @@ static void write_pack_file(void) if (!offset_one) break; offset = offset_one; - if (do_progress) - display_progress(&progress_state, written); + display_progress(progress_state, written); } /* @@ -643,19 +639,20 @@ static void write_pack_file(void) if (pack_to_stdout || nr_written == nr_remaining) { sha1close(f, sha1, 1); } else { - sha1close(f, sha1, 0); - fixup_pack_header_footer(f->fd, sha1, pack_tmp_name, nr_written); - close(f->fd); + int fd = sha1close(f, NULL, 0); + fixup_pack_header_footer(fd, sha1, pack_tmp_name, nr_written); + close(fd); } if (!pack_to_stdout) { mode_t mode = umask(0); + char *idx_tmp_name, tmpname[PATH_MAX]; umask(mode); mode = 0444 & ~mode; - idx_tmp_name = write_idx_file(NULL, - (struct pack_idx_entry **) written_list, nr_written, sha1); + idx_tmp_name = write_idx_file(NULL, written_list, + nr_written, sha1); snprintf(tmpname, sizeof(tmpname), "%s-%s.pack", base_name, sha1_to_hex(sha1)); if (adjust_perm(pack_tmp_name, mode)) @@ -672,19 +669,20 @@ static void write_pack_file(void) if (rename(idx_tmp_name, tmpname)) die("unable to rename temporary index file: %s", strerror(errno)); + free(idx_tmp_name); + free(pack_tmp_name); puts(sha1_to_hex(sha1)); } /* mark written objects as written to previous pack */ for (j = 0; j < nr_written; j++) { - written_list[j]->idx.offset = (off_t)-1; + written_list[j]->offset = (off_t)-1; } nr_remaining -= nr_written; } while (nr_remaining && i < nr_objects); free(written_list); - if (do_progress) - stop_progress(&progress_state); + stop_progress(&progress_state); if (written != nr_result) die("wrote %u objects while expecting %u", written, nr_result); /* @@ -852,8 +850,7 @@ static int add_object_entry(const unsigned char *sha1, enum object_type type, else object_ix[-1 - ix] = nr_objects; - if (progress) - display_progress(&progress_state, nr_objects); + display_progress(progress_state, nr_objects); if (name && no_try_delta(name)) entry->no_try_delta = 1; @@ -993,7 +990,7 @@ static void add_pbase_object(struct tree_desc *tree, return; if (name[cmplen] != '/') { add_object_entry(entry.sha1, - S_ISDIR(entry.mode) ? OBJ_TREE : OBJ_BLOB, + object_type(entry.mode), fullname, 1); return; } @@ -1248,28 +1245,37 @@ static void get_object_details(void) free(sorted_by_offset); } +/* + * We search for deltas in a list sorted by type, by filename hash, and then + * by size, so that we see progressively smaller and smaller files. + * That's because we prefer deltas to be from the bigger file + * to the smaller -- deletes are potentially cheaper, but perhaps + * more importantly, the bigger file is likely the more recent + * one. The deepest deltas are therefore the oldest objects which are + * less susceptible to be accessed often. + */ static int type_size_sort(const void *_a, const void *_b) { const struct object_entry *a = *(struct object_entry **)_a; const struct object_entry *b = *(struct object_entry **)_b; - if (a->type < b->type) - return -1; if (a->type > b->type) - return 1; - if (a->hash < b->hash) return -1; - if (a->hash > b->hash) + if (a->type < b->type) return 1; - if (a->preferred_base < b->preferred_base) + if (a->hash > b->hash) return -1; - if (a->preferred_base > b->preferred_base) + if (a->hash < b->hash) return 1; - if (a->size < b->size) + if (a->preferred_base > b->preferred_base) return -1; + if (a->preferred_base < b->preferred_base) + return 1; if (a->size > b->size) + return -1; + if (a->size < b->size) return 1; - return a > b ? -1 : (a < b); /* newest last */ + return a < b ? -1 : (a > b); /* newest first */ } struct unpacked { @@ -1320,14 +1326,6 @@ static pthread_mutex_t progress_mutex = PTHREAD_MUTEX_INITIALIZER; #endif -/* - * We search for deltas _backwards_ in a list sorted by type and - * by size, so that we see progressively smaller and smaller files. - * That's because we prefer deltas to be from the bigger file - * to the smaller - deletes are potentially cheaper, but perhaps - * more importantly, the bigger file is likely the more recent - * one. - */ static int try_delta(struct unpacked *trg, struct unpacked *src, unsigned max_depth, unsigned long *mem_usage) { @@ -1425,10 +1423,6 @@ static int try_delta(struct unpacked *trg, struct unpacked *src, } } - trg_entry->delta = src_entry; - trg_entry->delta_size = delta_size; - trg->depth = src->depth + 1; - /* * Handle memory allocation outside of the cache * accounting lock. Compiler will optimize the strangeness @@ -1442,7 +1436,7 @@ static int try_delta(struct unpacked *trg, struct unpacked *src, trg_entry->delta_data = NULL; } if (delta_cacheable(src_size, trg_size, delta_size)) { - delta_cache_size += trg_entry->delta_size; + delta_cache_size += delta_size; cache_unlock(); trg_entry->delta_data = xrealloc(delta_buf, delta_size); } else { @@ -1450,6 +1444,10 @@ static int try_delta(struct unpacked *trg, struct unpacked *src, free(delta_buf); } + trg_entry->delta = src_entry; + trg_entry->delta_size = delta_size; + trg->depth = src->depth + 1; + return 1; } @@ -1481,10 +1479,10 @@ static unsigned long free_unpacked(struct unpacked *n) return freed_mem; } -static void find_deltas(struct object_entry **list, unsigned list_size, +static void find_deltas(struct object_entry **list, unsigned *list_size, int window, int depth, unsigned *processed) { - uint32_t i = list_size, idx = 0, count = 0; + uint32_t i, idx = 0, count = 0; unsigned int array_size = window * sizeof(struct unpacked); struct unpacked *array; unsigned long mem_usage = 0; @@ -1492,11 +1490,23 @@ static void find_deltas(struct object_entry **list, unsigned list_size, array = xmalloc(array_size); memset(array, 0, array_size); - do { - struct object_entry *entry = list[--i]; + for (;;) { + struct object_entry *entry = *list++; struct unpacked *n = array + idx; int j, max_depth, best_base = -1; + progress_lock(); + if (!*list_size) { + progress_unlock(); + break; + } + (*list_size)--; + if (!entry->preferred_base) { + (*processed)++; + display_progress(progress_state, *processed); + } + progress_unlock(); + mem_usage -= free_unpacked(n); n->entry = entry; @@ -1514,12 +1524,6 @@ static void find_deltas(struct object_entry **list, unsigned list_size, if (entry->preferred_base) goto next; - progress_lock(); - (*processed)++; - if (progress) - display_progress(&progress_state, *processed); - progress_unlock(); - /* * If the current object is at pack edge, take the depth the * objects that depend on the current object into account @@ -1579,7 +1583,7 @@ static void find_deltas(struct object_entry **list, unsigned list_size, count++; if (idx >= window) idx = 0; - } while (i > 0); + } for (i = 0; i < window; ++i) { free_delta_index(array[i].index); @@ -1590,94 +1594,180 @@ static void find_deltas(struct object_entry **list, unsigned list_size, #ifdef THREADED_DELTA_SEARCH +/* + * The main thread waits on the condition that (at least) one of the workers + * has stopped working (which is indicated in the .working member of + * struct thread_params). + * When a work thread has completed its work, it sets .working to 0 and + * signals the main thread and waits on the condition that .data_ready + * becomes 1. + */ + struct thread_params { pthread_t thread; struct object_entry **list; unsigned list_size; + unsigned remaining; int window; int depth; + int working; + int data_ready; + pthread_mutex_t mutex; + pthread_cond_t cond; unsigned *processed; }; -static pthread_mutex_t data_request = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t data_ready = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t data_provider = PTHREAD_MUTEX_INITIALIZER; -static struct thread_params *data_requester; +static pthread_cond_t progress_cond = PTHREAD_COND_INITIALIZER; static void *threaded_find_deltas(void *arg) { struct thread_params *me = arg; - for (;;) { - pthread_mutex_lock(&data_request); - data_requester = me; - pthread_mutex_unlock(&data_provider); - pthread_mutex_lock(&data_ready); - pthread_mutex_unlock(&data_request); + while (me->remaining) { + find_deltas(me->list, &me->remaining, + me->window, me->depth, me->processed); - if (!me->list_size) - return NULL; + progress_lock(); + me->working = 0; + pthread_cond_signal(&progress_cond); + progress_unlock(); - find_deltas(me->list, me->list_size, - me->window, me->depth, me->processed); + /* + * We must not set ->data_ready before we wait on the + * condition because the main thread may have set it to 1 + * before we get here. In order to be sure that new + * work is available if we see 1 in ->data_ready, it + * was initialized to 0 before this thread was spawned + * and we reset it to 0 right away. + */ + pthread_mutex_lock(&me->mutex); + while (!me->data_ready) + pthread_cond_wait(&me->cond, &me->mutex); + me->data_ready = 0; + pthread_mutex_unlock(&me->mutex); } + /* leave ->working 1 so that this doesn't get more work assigned */ + return NULL; } static void ll_find_deltas(struct object_entry **list, unsigned list_size, int window, int depth, unsigned *processed) { - struct thread_params *target, p[delta_search_threads]; - int i, ret; - unsigned chunk_size; + struct thread_params p[delta_search_threads]; + int i, ret, active_threads = 0; if (delta_search_threads <= 1) { - find_deltas(list, list_size, window, depth, processed); + find_deltas(list, &list_size, window, depth, processed); return; } - pthread_mutex_lock(&data_provider); - pthread_mutex_lock(&data_ready); - + /* Partition the work amongst work threads. */ for (i = 0; i < delta_search_threads; i++) { + unsigned sub_size = list_size / (delta_search_threads - i); + p[i].window = window; p[i].depth = depth; p[i].processed = processed; + p[i].working = 1; + p[i].data_ready = 0; + + /* try to split chunks on "path" boundaries */ + while (sub_size && sub_size < list_size && + list[sub_size]->hash && + list[sub_size]->hash == list[sub_size-1]->hash) + sub_size++; + + p[i].list = list; + p[i].list_size = sub_size; + p[i].remaining = sub_size; + + list += sub_size; + list_size -= sub_size; + } + + /* Start work threads. */ + for (i = 0; i < delta_search_threads; i++) { + if (!p[i].list_size) + continue; + pthread_mutex_init(&p[i].mutex, NULL); + pthread_cond_init(&p[i].cond, NULL); ret = pthread_create(&p[i].thread, NULL, threaded_find_deltas, &p[i]); if (ret) die("unable to create thread: %s", strerror(ret)); + active_threads++; } - /* this should be auto-tuned somehow */ - chunk_size = window * 1000; + /* + * Now let's wait for work completion. Each time a thread is done + * with its work, we steal half of the remaining work from the + * thread with the largest number of unprocessed objects and give + * it to that newly idle thread. This ensure good load balancing + * until the remaining object list segments are simply too short + * to be worth splitting anymore. + */ + while (active_threads) { + struct thread_params *target = NULL; + struct thread_params *victim = NULL; + unsigned sub_size = 0; - do { - unsigned sublist_size = chunk_size; - if (sublist_size > list_size) - sublist_size = list_size; + progress_lock(); + for (;;) { + for (i = 0; !target && i < delta_search_threads; i++) + if (!p[i].working) + target = &p[i]; + if (target) + break; + pthread_cond_wait(&progress_cond, &progress_mutex); + } - /* try to split chunks on "path" boundaries */ - while (sublist_size < list_size && list[sublist_size]->hash && - list[sublist_size]->hash == list[sublist_size-1]->hash) - sublist_size++; - - pthread_mutex_lock(&data_provider); - target = data_requester; - target->list = list; - target->list_size = sublist_size; - pthread_mutex_unlock(&data_ready); - - list += sublist_size; - list_size -= sublist_size; - if (!sublist_size) { + for (i = 0; i < delta_search_threads; i++) + if (p[i].remaining > 2*window && + (!victim || victim->remaining < p[i].remaining)) + victim = &p[i]; + if (victim) { + sub_size = victim->remaining / 2; + list = victim->list + victim->list_size - sub_size; + while (sub_size && list[0]->hash && + list[0]->hash == list[-1]->hash) { + list++; + sub_size--; + } + if (!sub_size) { + /* + * It is possible for some "paths" to have + * so many objects that no hash boundary + * might be found. Let's just steal the + * exact half in that case. + */ + sub_size = victim->remaining / 2; + list -= sub_size; + } + target->list = list; + victim->list_size -= sub_size; + victim->remaining -= sub_size; + } + target->list_size = sub_size; + target->remaining = sub_size; + target->working = 1; + progress_unlock(); + + pthread_mutex_lock(&target->mutex); + target->data_ready = 1; + pthread_cond_signal(&target->cond); + pthread_mutex_unlock(&target->mutex); + + if (!sub_size) { pthread_join(target->thread, NULL); - i--; + pthread_cond_destroy(&target->cond); + pthread_mutex_destroy(&target->mutex); + active_threads--; } - } while (i); + } } #else -#define ll_find_deltas find_deltas +#define ll_find_deltas(l, s, w, d, p) find_deltas(l, &s, w, d, p) #endif static void prepare_pack(int window, int depth) @@ -1714,16 +1804,14 @@ static void prepare_pack(int window, int depth) delta_list[n++] = entry; } - if (nr_deltas) { + if (nr_deltas && n > 1) { unsigned nr_done = 0; if (progress) - start_progress(&progress_state, - "Deltifying %u objects...", "", - nr_deltas); + progress_state = start_progress("Compressing objects", + nr_deltas); qsort(delta_list, n, sizeof(*delta_list), type_size_sort); ll_find_deltas(delta_list, n, window+1, depth, &nr_done); - if (progress) - stop_progress(&progress_state); + stop_progress(&progress_state); if (nr_done != nr_deltas) die("inconsistency with delta count"); } @@ -1773,6 +1861,12 @@ static int git_pack_config(const char *k, const char *v) #endif return 0; } + if (!strcmp(k, "pack.indexversion")) { + pack_idx_default_version = git_config_int(k, v); + if (pack_idx_default_version > 2) + die("bad pack.indexversion=%d", pack_idx_default_version); + return 0; + } return git_default_config(k, v); } @@ -1920,7 +2014,7 @@ static void get_object_list(int ac, const char **av) while (fgets(line, sizeof(line), stdin) != NULL) { int len = strlen(line); - if (line[len - 1] == '\n') + if (len && line[len - 1] == '\n') line[--len] = 0; if (!len) break; @@ -2135,23 +2229,17 @@ int cmd_pack_objects(int argc, const char **argv, const char *prefix) prepare_packed_git(); if (progress) - start_progress(&progress_state, "Generating pack...", - "Counting objects: ", 0); + progress_state = start_progress("Counting objects", 0); if (!use_internal_rev_list) read_object_list_from_stdin(); else { rp_av[rp_ac] = NULL; get_object_list(rp_ac, rp_av); } - if (progress) { - stop_progress(&progress_state); - fprintf(stderr, "Done counting %u objects.\n", nr_objects); - } + stop_progress(&progress_state); if (non_empty && !nr_result) return 0; - if (progress && (nr_objects != nr_result)) - fprintf(stderr, "Result has %u objects.\n", nr_result); if (nr_result) prepare_pack(window, depth); write_pack_file();