igzip: Add optional threaded compression to cli tool

Change-Id: Ia29e877cfa8bef2285d8b48bb9133b2ff5b2eea0
Signed-off-by: Greg Tucker <greg.b.tucker@intel.com>
This commit is contained in:
Greg Tucker 2019-06-11 13:45:41 -07:00
parent a46da529d9
commit 0a7e3167ce
4 changed files with 370 additions and 55 deletions

View File

@ -145,9 +145,15 @@ $(all_llvm_fuzz_tests): CXXFLAGS += -fsanitize-coverage=trace-pc-guard -fsanitiz
$(all_llvm_fuzz_tests): % : %.o $(lib_name)
$(CXX) $(CXXFLAGS) $^ $(LDLIBS) $(FUZZLINK) -o $@
# Check for pthreads
have_threads ?= $(shell printf "\#include <pthread.h>\nint main(void){return 0;}\n" | $(CC) -x c - -o /dev/null -lpthread && echo y )
THREAD_LD_$(have_threads) := -lpthread
THREAD_CFLAGS_$(have_threads) := -DHAVE_THREADS
progs: $(bin_PROGRAMS)
$(bin_PROGRAMS): CFLAGS += -DVERSION=\"$(version)\"
$(bin_PROGRAMS): LDLIBS += $(THREAD_LD_y)
$(bin_PROGRAMS): CFLAGS += $(THREAD_CFLAGS_y)
sim test trace: $(addsuffix .run,$(all_unit_tests))
perf: $(addsuffix .run,$(all_perf_tests))
check: $(addsuffix .run,$(all_check_tests))

View File

@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.47.10.
.TH IGZIP "1" "May 2019" "igzip command line interface 2.26.0" "User Commands"
.TH IGZIP "1" "June 2019" "igzip command line interface 2.26.0" "User Commands"
.SH NAME
igzip \- compress or decompress files similar to gzip
.SH SYNOPSIS
@ -59,6 +59,9 @@ do not save/use file name and timestamp in compress/decompress
\fB\-t\fR, \fB\-\-test\fR
test compressed file integrity
.TP
\fB\-T\fR, \fB\-\-threads\fR
use N threads to compress if enabled
.TP
\fB\-q\fR, \fB\-\-quiet\fR
suppress warnings
.PP

View File

@ -40,6 +40,11 @@
#include <stdarg.h>
#include "igzip_lib.h" /* Normally you use isa-l.h instead for external programs */
#if defined (HAVE_THREADS)
# include <pthread.h>
# include "crc.h"
#endif
#if !defined (VERSION)
# if defined (ISAL_VERSION)
# define VERSION ISAL_VERSION
@ -163,6 +168,13 @@ struct cli_options {
int verbose_level;
int name;
int test;
int threads;
uint8_t *in_buf;
uint8_t *out_buf;
uint8_t *level_buf;
size_t in_buf_size;
size_t out_buf_size;
size_t level_buf_size;
};
struct cli_options global_options;
@ -182,9 +194,15 @@ void init_options(struct cli_options *options)
options->force = false;
options->quiet_level = 0;
options->verbose_level = 0;
options->verbose_level = 0;
options->name = NAME_DEFAULT;
options->test = NO_TEST;
options->in_buf = NULL;
options->out_buf = NULL;
options->level_buf = NULL;
options->in_buf_size = 0;
options->out_buf_size = 0;
options->level_buf_size = 0;
options->threads = 1;
};
int is_interactive(void)
@ -274,6 +292,7 @@ int usage(int exit_code)
" -N, --name save/use file name and timestamp in compress/decompress\n"
" -n, --no-name do not save/use file name and timestamp in compress/decompress\n"
" -t, --test test compressed file integrity\n"
" -T, --threads <n> use n threads to compress if enabled\n"
" -q, --quiet suppress warnings\n\n"
"with no infile, or when infile is - , read standard input\n\n",
ISAL_DEF_MAX_LEVEL);
@ -404,6 +423,156 @@ void open_out_file(FILE ** out, char *outfile_name)
}
}
#if defined(HAVE_THREADS)
#define MAX_THREADS 8
#define MAX_JOBQUEUE 16 /* must be a power of 2 */
enum job_status {
JOB_UNALLOCATED = 0,
JOB_ALLOCATED,
JOB_SUCCESS,
JOB_FAIL
};
struct thread_job {
uint8_t *next_in;
uint32_t avail_in;
uint8_t *next_out;
uint32_t avail_out;
uint32_t total_out;
uint32_t type;
uint32_t status;
};
struct thread_pool {
pthread_t threads[MAX_THREADS];
struct thread_job job[MAX_JOBQUEUE];
pthread_mutex_t mutex;
pthread_cond_t cond;
int head;
int tail;
int queue;
int shutdown;
};
// Globals for thread pool
struct thread_pool pool;
static inline int pool_has_space()
{
return ((pool.head + 1) % MAX_JOBQUEUE) != pool.tail;
}
static inline int pool_has_work()
{
return (pool.queue != pool.head);
}
int pool_get_work()
{
assert(pool.queue != pool.head);
pool.queue = (pool.queue + 1) % MAX_JOBQUEUE;
return pool.queue;
}
int pool_put_work(struct isal_zstream *stream)
{
pthread_mutex_lock(&pool.mutex);
if (!pool_has_space() || pool.shutdown) {
pthread_mutex_unlock(&pool.mutex);
return 1;
}
int idx = (pool.head + 1) % MAX_JOBQUEUE;
pool.job[idx].next_in = stream->next_in;
pool.job[idx].avail_in = stream->avail_in;
pool.job[idx].next_out = stream->next_out;
pool.job[idx].avail_out = stream->avail_out;
pool.job[idx].status = JOB_ALLOCATED;
pool.job[idx].type = stream->end_of_stream == 0 ? 0 : 1;
pool.head = idx;
pthread_cond_signal(&pool.cond);
pthread_mutex_unlock(&pool.mutex);
return 0;
}
void *thread_worker(void *none)
{
struct isal_zstream wstream;
int check;
int work_idx;
int level = global_options.level;
int level_size = level_size_buf[level];
uint8_t *level_buf = malloc_safe(level_size);
log_print(VERBOSE, "Start worker\n");
while (!pool.shutdown) {
pthread_mutex_lock(&pool.mutex);
while (!pool_has_work() && !pool.shutdown) {
pthread_cond_wait(&pool.cond, &pool.mutex);
}
if (pool.shutdown) {
pthread_mutex_unlock(&pool.mutex);
continue;
}
work_idx = pool_get_work();
pthread_cond_signal(&pool.cond);
pthread_mutex_unlock(&pool.mutex);
isal_deflate_stateless_init(&wstream);
wstream.next_in = pool.job[work_idx].next_in;
wstream.next_out = pool.job[work_idx].next_out;
wstream.avail_in = pool.job[work_idx].avail_in;
wstream.avail_out = pool.job[work_idx].avail_out;
wstream.end_of_stream = pool.job[work_idx].type;
wstream.flush = FULL_FLUSH;
wstream.level = global_options.level;
wstream.level_buf = level_buf;
wstream.level_buf_size = level_size;
check = isal_deflate_stateless(&wstream);
log_print(VERBOSE, "Worker finished job %d, out=%d\n",
work_idx, wstream.total_out);
pool.job[work_idx].total_out = wstream.total_out;
pool.job[work_idx].status = JOB_SUCCESS + check; // complete or fail
if (check)
break;
}
free(level_buf);
log_print(VERBOSE, "Worker quit\n");
pthread_exit(NULL);
}
int pool_create()
{
int i;
int nthreads = global_options.threads - 1;
pool.head = 0;
pool.tail = 0;
pool.queue = 0;
pool.shutdown = 0;
for (i = 0; i < nthreads; i++)
pthread_create(&pool.threads[i], NULL, thread_worker, NULL);
log_print(VERBOSE, "Created %d pool threads\n", nthreads);
return 0;
}
void pool_quit()
{
int i;
pthread_mutex_lock(&pool.mutex);
pool.shutdown = 1;
pthread_mutex_unlock(&pool.mutex);
pthread_cond_broadcast(&pool.cond);
for (i = 0; i < global_options.threads - 1; i++)
pthread_join(pool.threads[i], NULL);
log_print(VERBOSE, "Deleted %d pool threads\n", i);
}
#endif // defined(HAVE_THREADS)
int compress_file(void)
{
FILE *in = NULL, *out = NULL;
@ -455,13 +624,13 @@ int compress_file(void)
if (out == NULL)
goto compress_file_cleanup;
inbuf_size = BLOCK_SIZE;
outbuf_size = BLOCK_SIZE;
inbuf_size = global_options.in_buf_size;
outbuf_size = global_options.out_buf_size;
inbuf = malloc_safe(inbuf_size);
outbuf = malloc_safe(outbuf_size);
level_size = level_size_buf[level];
level_buf = malloc_safe(level_size);
inbuf = global_options.in_buf;
outbuf = global_options.out_buf;
level_size = global_options.level_buf_size;
level_buf = global_options.level_buf;
isal_gzip_header_init(&gz_hdr);
if (global_options.name == NAME_DEFAULT || global_options.name == YES_NAME) {
@ -483,32 +652,130 @@ int compress_file(void)
isal_write_gzip_header(&stream, &gz_hdr);
do {
if (stream.avail_in == 0) {
stream.next_in = inbuf;
stream.avail_in =
fread_safe(stream.next_in, 1, inbuf_size, in, infile_name);
stream.end_of_stream = feof(in);
}
if (global_options.threads > 1) {
#if defined(HAVE_THREADS)
int q;
int end_of_stream = 0;
uint32_t crc = 0;
uint64_t total_in = 0;
if (stream.next_out == NULL) {
stream.next_out = outbuf;
stream.avail_out = outbuf_size;
}
// Write the header
fwrite_safe(outbuf, 1, stream.total_out, out, outfile_name);
ret = isal_deflate(&stream);
do {
size_t nread;
size_t inbuf_used = 0;
size_t outbuf_used = 0;
uint8_t *iptr = inbuf;
uint8_t *optr = outbuf;
ret = 0;
if (ret != ISAL_DECOMP_OK) {
log_print(ERROR,
"igzip: Error encountered while compressing file %s\n",
infile_name);
goto compress_file_cleanup;
}
for (q = 0; q < MAX_JOBQUEUE - 1; q++) {
inbuf_used += BLOCK_SIZE;
outbuf_used += 2 * BLOCK_SIZE;
if (inbuf_used > inbuf_size || outbuf_used > outbuf_size)
break;
fwrite_safe(outbuf, 1, stream.next_out - outbuf, out, outfile_name);
stream.next_out = NULL;
nread = fread_safe(iptr, 1, BLOCK_SIZE, in, infile_name);
crc = crc32_gzip_refl(crc, iptr, nread);
end_of_stream = feof(in);
total_in += nread;
stream.next_in = iptr;
stream.next_out = optr;
stream.avail_in = nread;
stream.avail_out = 2 * BLOCK_SIZE;
stream.end_of_stream = end_of_stream;
ret = pool_put_work(&stream);
if (ret || end_of_stream)
break;
iptr += BLOCK_SIZE;
optr += 2 * BLOCK_SIZE;
}
while (pool.tail != pool.head) { // Unprocessed jobs
int t = (pool.tail + 1) % MAX_JOBQUEUE;
if (pool.job[t].status >= JOB_SUCCESS) { // Finished next
if (pool.job[t].status > JOB_SUCCESS) {
success = 0;
log_print(ERROR,
"igzip: Error encountered while compressing file %s\n",
infile_name);
goto compress_file_cleanup;
}
fwrite_safe(pool.job[t].next_out, 1,
pool.job[t].total_out, out, outfile_name);
pool.job[t].total_out = 0;
pool.job[t].status = 0;
pool.tail = t;
pthread_cond_broadcast(&pool.cond);
}
// Pick up a job while we wait
pthread_mutex_lock(&pool.mutex);
if (!pool_has_work()) {
pthread_mutex_unlock(&pool.mutex);
continue;
}
int work_idx = pool_get_work();
pthread_cond_signal(&pool.cond);
pthread_mutex_unlock(&pool.mutex);
isal_deflate_stateless_init(&stream);
stream.next_in = pool.job[work_idx].next_in;
stream.next_out = pool.job[work_idx].next_out;
stream.avail_in = pool.job[work_idx].avail_in;
stream.avail_out = pool.job[work_idx].avail_out;
stream.end_of_stream = pool.job[work_idx].type;
stream.flush = FULL_FLUSH;
stream.level = global_options.level;
stream.level_buf = level_buf;
stream.level_buf_size = level_size;
int check = isal_deflate_stateless(&stream);
log_print(VERBOSE, "Self finished job %d, out=%d\n",
work_idx, stream.total_out);
pool.job[work_idx].total_out = stream.total_out;
pool.job[work_idx].status = JOB_SUCCESS + check; // complete or fail
}
} while (!end_of_stream);
// Write gzip trailer
ret = fwrite_safe(&crc, sizeof(uint32_t), 1, out, outfile_name);
ret += fwrite_safe(&total_in, sizeof(uint32_t), 1, out, outfile_name);
#else // No compiled threading support but asked for threads > 1
assert(1);
#endif
} else { // Single thread
do {
if (stream.avail_in == 0) {
stream.next_in = inbuf;
stream.avail_in =
fread_safe(stream.next_in, 1, inbuf_size, in, infile_name);
stream.end_of_stream = feof(in);
}
if (stream.next_out == NULL) {
stream.next_out = outbuf;
stream.avail_out = outbuf_size;
}
ret = isal_deflate(&stream);
if (ret != ISAL_DECOMP_OK) {
log_print(ERROR,
"igzip: Error encountered while compressing file %s\n",
infile_name);
goto compress_file_cleanup;
}
fwrite_safe(outbuf, 1, stream.next_out - outbuf, out, outfile_name);
stream.next_out = NULL;
} while (!feof(in) || stream.avail_out == 0);
}
} while (!feof(in) || stream.avail_out == 0);
success = 1;
compress_file_cleanup:
@ -524,15 +791,6 @@ int compress_file(void)
if (global_options.outfile_name == NULL && outfile_name != NULL)
free(outfile_name);
if (inbuf != NULL)
free(inbuf);
if (outbuf != NULL)
free(outbuf);
if (level_buf != NULL)
free(level_buf);
return (success == 0);
}
@ -603,11 +861,10 @@ int decompress_file(void)
file_time = get_posix_filetime(in);
inbuf_size = BLOCK_SIZE;
outbuf_size = BLOCK_SIZE;
inbuf = malloc_safe(inbuf_size);
outbuf = malloc_safe(outbuf_size);
inbuf_size = global_options.in_buf_size;
outbuf_size = global_options.out_buf_size;
inbuf = global_options.in_buf;
outbuf = global_options.out_buf;
isal_gzip_header_init(&gz_hdr);
if (outfile_type == implicit) {
@ -693,19 +950,13 @@ int decompress_file(void)
if (global_options.outfile_name == NULL && outfile_name != NULL)
free(outfile_name);
if (inbuf != NULL)
free(inbuf);
if (outbuf != NULL)
free(outbuf);
return (success == 0);
}
int main(int argc, char *argv[])
{
int c;
char optstring[] = "hcdz0123456789o:S:kfqVvNnt";
char optstring[] = "hcdz0123456789o:S:kfqVvNntT:";
int long_only_flag;
int ret = 0;
int bad_option = 0;
@ -731,12 +982,12 @@ int main(int argc, char *argv[])
{"no-name", no_argument, NULL, 'n'},
{"name", no_argument, NULL, 'N'},
{"test", no_argument, NULL, 't'},
{"threads", required_argument, NULL, 'T'},
/* Possible future extensions
{"recursive, no_argument, NULL, 'r'},
{"list", no_argument, NULL, 'l'},
{"benchmark", optional_argument, NULL, 'b'},
{"benchmark_end", required_argument, NULL, 'e'},
{"threads", optional_argument, NULL, 'T'},
*/
{0, 0, 0, 0}
};
@ -808,6 +1059,14 @@ int main(int argc, char *argv[])
global_options.test = TEST;
global_options.mode = DECOMPRESS_MODE;
break;
case 'T':
#if defined(HAVE_THREADS)
c = atoi(optarg);
c = c > MAX_THREADS ? MAX_THREADS : c;
c = c < 1 ? 1 : c;
global_options.threads = c;
#endif
break;
case 'h':
usage(0);
default:
@ -839,9 +1098,24 @@ int main(int argc, char *argv[])
return 0;
}
global_options.in_buf_size = BLOCK_SIZE;
global_options.out_buf_size = BLOCK_SIZE;
#if defined(HAVE_THREADS)
if (global_options.threads > 1) {
global_options.in_buf_size += (BLOCK_SIZE * MAX_JOBQUEUE);
global_options.out_buf_size += (BLOCK_SIZE * MAX_JOBQUEUE * 2);
pool_create();
}
#endif
global_options.in_buf = malloc_safe(global_options.in_buf_size);
global_options.out_buf = malloc_safe(global_options.out_buf_size);
global_options.level_buf_size = level_size_buf[global_options.level];
global_options.level_buf = malloc_safe(global_options.level_buf_size);
if (global_options.mode == COMPRESS_MODE) {
if (optind >= argc)
compress_file();
ret |= compress_file();
while (optind < argc) {
global_options.infile_name = argv[optind];
global_options.infile_name_len = strlen(global_options.infile_name);
@ -851,7 +1125,7 @@ int main(int argc, char *argv[])
} else if (global_options.mode == DECOMPRESS_MODE) {
if (optind >= argc)
decompress_file();
ret |= decompress_file();
while (optind < argc) {
global_options.infile_name = argv[optind];
global_options.infile_name_len = strlen(global_options.infile_name);
@ -859,6 +1133,13 @@ int main(int argc, char *argv[])
optind++;
}
}
#if defined(HAVE_THREADS)
if (global_options.threads > 1)
pool_quit();
#endif
free(global_options.in_buf);
free(global_options.out_buf);
free(global_options.level_buf);
return ret;
}

View File

@ -3,9 +3,9 @@ set -o pipefail
CWD=$PWD
SRC_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
IGZIP=$SRC_DIR/igzip
IGZIP="$SRC_DIR/igzip $@"
TEST_DIR="/tmp/igzip_cli_test_$$/"
TEST_FILE=$IGZIP
TEST_FILE=$SRC_DIR/igzip
DIFF="diff -q"
mkdir -p $TEST_DIR
@ -216,5 +216,30 @@ $IGZIP -t $file1$ds &> /dev/null && ret=1
pass_check $ret "Test test"
clear_dir
# Large stream test with threading if enabled
ret=0
(for i in `seq 100`; do cat $TEST_FILE ; done) | $IGZIP -c -T 4 | $IGZIP -t || ret=1
pass_check $ret "Large stream test"
# Large stream tests with decompression and threading if enabled
if command -V md5sum >/dev/null 2>&1 && command -V dd >/dev/null 2>&1; then
ret=0
dd if=<(for i in `seq 1000`; do cat $TEST_FILE; done) iflag=fullblock bs=1M count=201 2> out.stder | tee >(md5sum > out.sum1) \
| $IGZIP -c -T 4 | $IGZIP -d | md5sum > out.sum2 \
&& $DIFF out.sum1 out.sum2 || ret=1
pass_check $ret "Large stream compresss test"
clear_dir
if test -e /dev/urandom; then
ret=0
dd if=/dev/urandom iflag=fullblock bs=1M count=200 2> out.stder | tee >(md5sum > out.sum3) \
| $IGZIP -c -T 2 | $IGZIP -d | md5sum > out.sum4 \
&& $DIFF out.sum3 out.sum4 || ret=1
pass_check $ret "Large stream random data test"
clear_dir
fi
fi
echo "Passed all cli checks"
cleanup 0