threaded mode with define _RUN_THREADED

threaded
Avril 4 years ago
parent 9db000e864
commit ae9efca41b
Signed by: flanchan
GPG Key ID: 284488987C31F630

2
.gitignore vendored

@ -1,3 +1,5 @@
obj/ obj/
fcmp-* fcmp-*
test/ test/
perf.*
flamegraph.svg

@ -5,14 +5,17 @@ PROJECT=fcmp
OPT_FLAGS+= -fgraphite OPT_FLAGS+= -fgraphite
FEAT_CFLAGS?= -D_RUN_THREADED=0
FEAT_LDFLAGS?= -lpthread
RELEASE_CFLAGS?= -O3 -march=native -flto $(OPT_FLAGS) RELEASE_CFLAGS?= -O3 -march=native -flto $(OPT_FLAGS)
RELEASE_LDFLAGS?= -O3 -flto RELEASE_LDFLAGS?= -O3 -flto
DEBUG_CFLAGS?= -DDEBUG -O0 -g DEBUG_CFLAGS?= -DDEBUG -O0 -g
DEBUG_LDFLAGS?= -O0 DEBUG_LDFLAGS?= -O0
CFLAGS+= -Wall -pedantic --std=gnu11 $(addprefix -I,$(INCLUDE)) CFLAGS+= $(FEAT_CFLAGS) -Wall -pedantic --std=gnu11 $(addprefix -I,$(INCLUDE))
LDFLAGS+= LDFLAGS+= $(FEAT_LDFLAGS)
OBJ = $(addprefix obj/,$(SRC:.c=.o)) OBJ = $(addprefix obj/,$(SRC:.c=.o))

@ -0,0 +1,25 @@
#ifndef _FCMP_H
#define _FCMP_H
#ifdef DEBUG
#define _FORCE_INLINE static inline __attribute__((gnu_inline))
#else
#define _FORCE_INLINE extern inline __attribute__((gnu_inline))
#endif
#ifdef DEBUG
#define __name(d) #d
#define dprintf(fmt, ...) printf("[dbg @" __FILE__ "->%s:%d] " fmt "\n", __func__, __LINE__ __VA_OPT__(,) __VA_ARGS__)
#else
#define dprintf(fmt, ...)
#endif
/// Enabled threaded scheduling
// Set to 1 to FORCE threaded scheduling, 0 to use when opportune.
//
//#define _RUN_THREADED 0
extern const char* _prog_name;
#endif /* _FCMP_H */

@ -0,0 +1,22 @@
#ifndef _SCHED_H
#define _SCHED_H
#include <vector.h>
#include <fcmp.h>
typedef struct tasklist {
size_t argc;
struct taskarg* argv;
pthread_t* tasks;
} tasklist_t;
typedef void (*sched_cb)(vec_t* restrict tasklist);
#ifdef _RUN_THREADED
bool sched_spawn(vec_t full, sched_cb callback, struct tasklist *restrict t_list);
void sched_wait(struct tasklist* restrict t_list);
bool sched_should(size_t ntasks);
#endif
#endif /* _SHCED_H */

@ -4,6 +4,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdbool.h> #include <stdbool.h>
#include "fcmp.h"
typedef struct { typedef struct {
size_t len, cap; size_t len, cap;
@ -12,12 +14,6 @@ typedef struct {
void* ptr; void* ptr;
} vec_t; } vec_t;
#ifdef DEBUG
#define _FORCE_INLINE static inline __attribute__((gnu_inline))
#else
#define _FORCE_INLINE extern inline __attribute__((gnu_inline))
#endif
#define VEC_DEFAULT_CAP 16 #define VEC_DEFAULT_CAP 16
vec_t vec_new_with_cap(size_t elem, size_t cap); vec_t vec_new_with_cap(size_t elem, size_t cap);

@ -2,18 +2,18 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <fcmp.h>
#include <map.h> #include <map.h>
#include <vector.h> #include <vector.h>
static const char* _prog_name = "fcmp"; #ifdef _RUN_THREADED
#include <sched.h>
#ifdef DEBUG
#define __name(d) #d
#define dprintf(fmt, ...) printf("[dbg @" __FILE__ "->%s:%d] " fmt "\n", __func__, __LINE__ __VA_OPT__(,) __VA_ARGS__)
#else
#define dprintf(fmt, ...)
#endif #endif
const char* _prog_name = "fcmp";
__attribute__((noreturn, noinline)) void usage() __attribute__((noreturn, noinline)) void usage()
{ {
fprintf(stderr, "fcmp: compare files for identity\n"); fprintf(stderr, "fcmp: compare files for identity\n");
@ -56,6 +56,57 @@ static int compare_then_close(const mmap_t * restrict map1, mmap_t map2)
return rval; return rval;
} }
#ifdef _RUN_THREADED
struct t_task {
_Atomic int* othis;
int ithis;
const char* fthis;
mmap_t mthis;
const mmap_t* map1;
};
void proc_thread(vec_t* restrict v_tasks)
{
struct t_task * tasks = v_tasks->ptr;
mmap_t mrest[v_tasks->len];
#ifdef DEBUG
const char* frest[v_tasks->len];
#endif
int nrest = v_tasks->len;
const mmap_t* map1;
{
for(register int i=0;i<v_tasks->len;i++)
{
mrest[i] = tasks[i].mthis;
#ifdef DEBUG
frest[i] = tasks[i].fthis;
#endif
}
map1 = tasks[0].map1;
}
register int rval=0;
for(register int i=0;i<nrest;i++)
{
dprintf("Checking %d \"%s\"", tasks[i].ithis, frest[i]);
switch ((rval=compare_then_close(map1, mrest[i]))) {
case 0: break;
default:
// Close the rest
dprintf("Unmapping mrest from %d (len %d) while max of nrest is %d", (i+1), nrest-(i+1), nrest);
if(i<nrest-1) unmap_all(mrest+ (i+1), nrest- (i+1));
goto end;
}
dprintf("Ident %d OK", tasks[i].ithis);
}
end:
*tasks[0].othis = rval;
}
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
_prog_name = argv[0]; _prog_name = argv[0];
@ -91,7 +142,46 @@ int main(int argc, char** argv)
} }
dprintf("All map okay"); dprintf("All map okay");
register int rval=0; register int rval=0;
#ifdef _RUN_THREADED
if(sched_should(nrest) || _RUN_THREADED) {
dprintf("Running multi-threaded");
_Atomic int rvals[nrest];
vec_t vtask_args = vec_new_with_cap(sizeof(struct t_task), nrest);
struct t_task* task_args = vtask_args.ptr;
for (int i=0;i<nrest;i++) {
rvals[i] = 0;
task_args[i] = (struct t_task){
.ithis = i,
.fthis = frest[i],
.mthis = mrest[i],
.map1 = &map1,
.othis = &rvals[i],
};
}
vtask_args.len = (size_t)nrest;
tasklist_t threads;
if(!sched_spawn(vtask_args, &proc_thread, &threads)) {
fprintf(stderr, "Failed to spawn tasks\n");
abort(); //no clear way to exit gracefully from this...
}
vec_free(vtask_args);
dprintf("Children spawned");
sched_wait(&threads);
for (register int i=0;i<nrest;i++) {
if(rvals[i]) {
rval = rvals[i];
break;
}
}
goto end;
} else {
#endif
dprintf("Running single threaded");
for(register int i=0;i<nrest;i++) { for(register int i=0;i<nrest;i++) {
dprintf("Checking %d \"%s\"", i, frest[i]); dprintf("Checking %d \"%s\"", i, frest[i]);
switch ((rval=compare_then_close(&map1, mrest[i]))) { switch ((rval=compare_then_close(&map1, mrest[i]))) {
@ -106,11 +196,16 @@ int main(int argc, char** argv)
} }
end: end:
dprintf("Unmapping `map1`");
if(!unmap_and_close(map1)) { if(!unmap_and_close(map1)) {
fprintf(stderr, "Failed to unmap and close %s", f1); fprintf(stderr, "Failed to unmap and close %s", f1);
rval=-1; rval=-1;
} }
dprintf("Final rval is %d", rval); dprintf("Final rval is %d", rval);
return rval; return rval;
#ifdef _RUN_THREADED
}
#endif
} }

@ -0,0 +1,84 @@
// Scheduler
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <vector.h>
#include <pthread.h>
#ifdef _RUN_THREADED
inline static size_t num_cpus() {
return sysconf( _SC_NPROCESSORS_ONLN );
}
struct taskarg {
vec_t li;
sched_cb cb;
};
static void* _spawn(void* _arg)
{
struct taskarg* restrict arg = _arg;
if(arg->li.len>0)
arg->cb(&arg->li);
vec_free(arg->li);
return NULL;
}
bool sched_should(size_t ntasks)
{
register size_t num = num_cpus();
return (num > 1 && ntasks > 1);
}
bool sched_spawn(vec_t full, sched_cb callback, struct tasklist *restrict t_list)
{
register size_t spn = num_cpus() + 1;
if (spn > full.len) spn = full.len;
dprintf("Spawning %lu worker threads", spn);
// Split tasks
*t_list = (struct tasklist){
.argc = spn,
.argv = calloc(sizeof(struct taskarg), spn),
.tasks = calloc(sizeof(pthread_t), spn),
};
struct taskarg* tasklist = t_list->argv;
for(register int i=0;i<spn;i++) tasklist[i] = (struct taskarg){.li = vec_new_with_cap(full.element, full.len), .cb = callback };
for (register int i=0;i<full.len;i++)
{
vec_push(&tasklist[i%spn].li, vec_index(&full, i));
}
for(register int i=0;i<spn;i++)
{
if(pthread_create(&t_list->tasks[i], NULL, &_spawn, &tasklist[i]))
{
perror("Failed to spawn thread");
return false;
}
dprintf("Worker thead %d of %lu OK", i, spn);
}
return true;
}
void sched_wait(struct tasklist* restrict t_list)
{
dprintf("Waiting on %lu worker threads", t_list->argc);
for (size_t i=0;i<t_list->argc;i++) {
if(pthread_join(t_list->tasks[i], NULL)) {
perror("Failed to join thread");
continue;
}
dprintf("Joined thread %lu of %lu okay", i, t_list->argc);
}
free(t_list->tasks);
free(t_list->argv);
dprintf("Freed args and thread handles okay");
}
#endif
Loading…
Cancel
Save