Commit 277dd3dd authored by Jens Korinth's avatar Jens Korinth
Browse files

Implement API for job-attached transfers

* tapasco_jobs now define functions to attach memory transfers to jobs
  to be executed just-in-time before execution by the scheduler
* scheduler supports new constructs: if transfer length is marked > 0
  for an argument, it will allocate memory, transfer the data and
  updated the argument with the corresponding handle, as well as copy
  back and free the memory after return
* no top-level API is implemented yet, only tapasco_job level
* no malloc for local memories is implemented yet
parent 37972c5a
Pipeline #318 passed with stage
in 3 minutes and 4 seconds
......@@ -62,6 +62,15 @@ typedef enum {
TAPASCO_JOB_STATE_FINISHED,
} tapasco_job_state_t;
/** Internal structure for ad-hoc data transfers. **/
struct tapasco_transfer {
size_t len;
void *data;
tapasco_device_alloc_flag_t flags;
tapasco_handle_t handle;
};
typedef struct tapasco_transfer tapasco_transfer_t;
/**
* Internal job structure to maintain information on an execution to be
* scheduled some time in the future.
......@@ -151,6 +160,16 @@ uint32_t tapasco_jobs_get_arg32(tapasco_jobs_t const *jobs, tapasco_job_id_t con
uint64_t tapasco_jobs_get_arg64(tapasco_jobs_t const *jobs, tapasco_job_id_t const j_id,
uint32_t const arg_idx);
/**
* Returns the transfer struct for the given arg.
* @param jobs jobs context.
* @param j_id job id.
* @param arg_idx index of the argument to retrieve.
* @return pointer to tapasco_transfer_t struct.
**/
tapasco_transfer_t *tapasco_jobs_get_arg_transfer(tapasco_jobs_t *jobs,
tapasco_job_id_t const j_id, uint32_t const arg_idx);
/**
* Returns the value of an argument in a job.
* @param jobs jobs context.
......@@ -177,6 +196,22 @@ tapasco_res_t tapasco_jobs_set_arg(tapasco_jobs_t *jobs, tapasco_job_id_t const
uint32_t const arg_idx, size_t const arg_len,
void const *arg_value);
/**
* Attaches a data transfer to local memory to be run prior to execution of the
* job. Replaces argument arg_idx with the handle to the address.
* @param jobs jobs context.
* @param j_id job id.
* @param arg_idx index of the argument to retrieve.
* @param arg_len size of argument in bytes.
* @param arg_value pointer to value data.
* @param flags transfer flags (see @tapasco_device_alloc_flag_t).
* @return TAPASCO_SUCCESS, if value could be set.
**/
tapasco_res_t tapasco_jobs_set_arg_transfer(tapasco_jobs_t *jobs,
tapasco_job_id_t const j_id, uint32_t const arg_idx,
size_t const arg_len, void *arg_value,
tapasco_device_alloc_flag_t const flags);
/**
* Sets the return value of a job.
* @param jobs jobs context.
......
......@@ -47,11 +47,14 @@ struct tapasco_job {
uint64_t ret32;
uint64_t ret64;
} ret;
/** transfer array (max. 32 transfers) **/
tapasco_transfer_t transfers[TAPASCO_JOB_MAX_ARGS];
};
typedef struct tapasco_job tapasco_job_t;
/******************************************************************************/
inline static void init_job(tapasco_job_t *job, int i) {
memset(job, 0, sizeof(*job));
job->id = i + 1000;
job->args_len = 0;
job->args_sz = 0;
......@@ -135,6 +138,13 @@ inline uint64_t tapasco_jobs_get_arg64(tapasco_jobs_t const *jobs, tapasco_job_i
return jobs->q.elems[j_id - 1000].args[arg_idx].v64;
}
tapasco_transfer_t *tapasco_jobs_get_arg_transfer(tapasco_jobs_t *jobs,
tapasco_job_id_t const j_id, uint32_t const arg_idx) {
assert(jobs);
assert(arg_idx < jobs->q.elems[j_id - 1000].args_len);
return &jobs->q.elems[j_id - 1000].transfers[arg_idx];
}
inline tapasco_res_t tapasco_jobs_get_arg(tapasco_jobs_t *jobs, tapasco_job_id_t const j_id,
uint32_t const arg_idx, size_t const arg_len, void *arg_value) {
assert(jobs);
......@@ -155,8 +165,6 @@ inline tapasco_res_t tapasco_jobs_get_arg(tapasco_jobs_t *jobs, tapasco_job_id_t
inline tapasco_res_t tapasco_jobs_set_arg(tapasco_jobs_t *jobs, tapasco_job_id_t const j_id,
uint32_t const arg_idx, size_t const arg_len, void const *arg_value) {
assert(jobs);
/*printf("tapasco_jobs_set_arg: j_id = %d, arg_idx = %d, arg_len = %zd\n",
j_id, arg_idx, arg_len);*/
#ifndef NDEBUG
if (arg_len != 4 && arg_len != 8)
return TAPASCO_ERR_INVALID_ARG_SIZE;
......@@ -181,6 +189,25 @@ inline tapasco_res_t tapasco_jobs_set_arg(tapasco_jobs_t *jobs, tapasco_job_id_t
return TAPASCO_SUCCESS;
}
inline tapasco_res_t tapasco_jobs_set_arg_transfer(tapasco_jobs_t *jobs,
tapasco_job_id_t const j_id, uint32_t const arg_idx,
size_t const arg_len, void *arg_value,
tapasco_device_alloc_flag_t const flags) {
assert(jobs);
#ifndef NDEBUG
if (arg_idx >= TAPASCO_JOB_MAX_ARGS)
return TAPASCO_ERR_INVALID_ARG_INDEX;
if (j_id - 1000 > TAPASCO_JOBS_Q_SZ)
return TAPASCO_ERR_JOB_ID_NOT_FOUND;
#endif
jobs->q.elems[j_id - 1000].transfers[arg_idx].len = arg_len;
jobs->q.elems[j_id - 1000].transfers[arg_idx].data = arg_value;
jobs->q.elems[j_id - 1000].transfers[arg_idx].flags = flags;
if (jobs->q.elems[j_id - 1000].args_len < arg_idx + 1)
jobs->q.elems[j_id - 1000].args_len = arg_idx + 1;
return TAPASCO_SUCCESS;
}
inline tapasco_res_t tapasco_jobs_set_return(tapasco_jobs_t *jobs, tapasco_job_id_t const j_id,
size_t const ret_len, void const *ret_value) {
assert(jobs);
......@@ -217,7 +244,7 @@ inline tapasco_job_id_t tapasco_jobs_acquire(tapasco_jobs_t *jobs) {
inline void tapasco_jobs_release(tapasco_jobs_t *jobs, tapasco_job_id_t const j_id) {
assert(jobs);
jobs->q.elems[j_id - 1000].args_len = 0;
memset(&jobs->q.elems[j_id - 1000], 0, sizeof(tapasco_job_t));
jobs->q.elems[j_id - 1000].state = TAPASCO_JOB_STATE_READY;
tapasco_jobs_fsp_put(&jobs->q, j_id - 1000);
}
......@@ -29,6 +29,92 @@
#include <tapasco_logging.h>
#include <platform.h>
static inline
tapasco_res_t tapasco_transfer_to(tapasco_dev_ctx_t *dev_ctx,
tapasco_job_id_t const j_id, tapasco_transfer_t *t)
{
LOG(LALL_SCHEDULER, "job %lu: executing transfer to with length %zd bytes",
(unsigned long)j_id, (unsigned long)t->len);
tapasco_res_t res = tapasco_device_alloc(dev_ctx, &t->handle, t->len, t->flags);
if (res != TAPASCO_SUCCESS) {
ERR("job %lu: memory allocation failed!", (unsigned long)j_id);
return res;
}
res = tapasco_device_copy_to(dev_ctx, t->data, t->handle, t->len, t->flags);
if (res != TAPASCO_SUCCESS) {
ERR("job %lu: transfer failed - %zd bytes -> 0x%08x with flags %lu",
(unsigned long)j_id, t->len,
(unsigned long)t->handle,
(unsigned long)t->flags);
}
return res;
}
static inline
tapasco_res_t tapasco_transfer_from(tapasco_dev_ctx_t *dev_ctx,
tapasco_jobs_t *jobs, tapasco_job_id_t const j_id,
tapasco_transfer_t *t)
{
LOG(LALL_SCHEDULER, "job %lu: executing transfer from with length %zd bytes",
(unsigned long)j_id, (unsigned long)t->len);
tapasco_res_t res = tapasco_device_copy_from(dev_ctx, t->handle,
t->data, t->len, t->flags);
if (res != TAPASCO_SUCCESS) {
ERR("job %lu: transfer failed - %zd bytes <- 0x%08x with flags %lu",
(unsigned long)j_id, t->len,
(unsigned long)t->handle,
(unsigned long)t->flags);
}
tapasco_device_free(dev_ctx, t->handle, t->flags);
return res;
}
static inline
tapasco_res_t tapasco_write_arg(tapasco_dev_ctx_t *dev_ctx,
tapasco_jobs_t *jobs, tapasco_job_id_t const j_id,
tapasco_handle_t const h, uint32_t const a)
{
int const is64 = tapasco_jobs_is_arg_64bit(jobs, j_id, a);
if (is64) {
uint64_t v = tapasco_jobs_get_arg64(jobs, j_id, a);
LOG(LALL_SCHEDULER, "job %lu: writing 64b arg #%u = 0x%08lx to 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
if (platform_write_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
} else {
uint32_t v = tapasco_jobs_get_arg32(jobs, j_id, a);
LOG(LALL_SCHEDULER, "job %lu: writing 32b arg #%u = 0x%08lx to 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
if (platform_write_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
}
return TAPASCO_SUCCESS;
}
static inline
tapasco_res_t tapasco_read_arg(tapasco_dev_ctx_t *dev_ctx,
tapasco_jobs_t *jobs, tapasco_job_id_t const j_id,
tapasco_handle_t const h, uint32_t const a)
{
int const is64 = tapasco_jobs_is_arg_64bit(jobs, j_id, a);
if (is64) {
uint64_t v = 0;
if (platform_read_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
LOG(LALL_SCHEDULER, "job %lu: reading 64b arg #%u = 0x%08lx from 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
tapasco_jobs_set_arg(jobs, j_id, a, sizeof(v), &v);
} else {
uint32_t v = 0;
if (platform_read_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
LOG(LALL_SCHEDULER, "job %lu: reading 32b arg #%u = 0x%08lx from 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
tapasco_jobs_set_arg(jobs, j_id, a, sizeof(v), &v);
}
return TAPASCO_SUCCESS;
}
tapasco_res_t tapasco_scheduler_launch(
tapasco_dev_ctx_t *dev_ctx,
tapasco_jobs_t *jobs,
......@@ -55,24 +141,28 @@ tapasco_res_t tapasco_scheduler_launch(
uint32_t const num_args = tapasco_jobs_arg_count(jobs, j_id);
for (uint32_t a = 0; a < num_args; ++a) {
tapasco_res_t r = TAPASCO_SUCCESS;
tapasco_handle_t h = tapasco_address_map_func_arg_register(
dev_ctx,
slot_id,
a);
int const is64 = tapasco_jobs_is_arg_64bit(jobs, j_id, a);
if (is64) {
uint64_t v = tapasco_jobs_get_arg64(jobs, j_id, a);
LOG(LALL_SCHEDULER, "job %lu: writing 64b arg #%u = 0x%08lx to 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
if (platform_write_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
tapasco_transfer_t *t = tapasco_jobs_get_arg_transfer(jobs,
j_id, a);
if (t->len > 0) {
LOG(LALL_SCHEDULER, "job %lu: transferring %zd byte arg #%u",
(unsigned long)j_id, t->len, a);
r = tapasco_transfer_to(dev_ctx, j_id, t);
if (r != TAPASCO_SUCCESS) { return r; }
LOG(LALL_SCHEDULER, "job %lu: writing handle to arg #%u (0x%08x)",
(unsigned long)j_id, a, t->handle);
if (platform_write_ctl(h, sizeof(t->handle), &t->handle,
PLATFORM_CTL_FLAGS_NONE) !=
PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
} else {
uint32_t v = tapasco_jobs_get_arg32(jobs, j_id, a);
LOG(LALL_SCHEDULER, "job %lu: writing 32b arg #%u = 0x%08lx to 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
if (platform_write_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
tapasco_res_t r = tapasco_write_arg(dev_ctx, jobs, j_id,
a, h);
if (r != TAPASCO_SUCCESS) { return r; }
}
}
......@@ -106,23 +196,12 @@ tapasco_res_t tapasco_scheduler_launch(
dev_ctx,
slot_id,
a);
int const is64 = tapasco_jobs_is_arg_64bit(jobs, j_id, a);
if (is64) {
uint64_t v = 0;
if (platform_read_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
LOG(LALL_SCHEDULER, "job %lu: reading 64b arg #%u = 0x%08lx from 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
tapasco_jobs_set_arg(jobs, j_id, a, sizeof(v), &v);
} else {
uint32_t v = 0;
if (platform_read_ctl(h, sizeof(v), &v, PLATFORM_CTL_FLAGS_NONE) != PLATFORM_SUCCESS)
return TAPASCO_FAILURE;
LOG(LALL_SCHEDULER, "job %lu: reading 32b arg #%u = 0x%08lx from 0x%08x",
(unsigned long)j_id, a, (unsigned long)v, (unsigned)h);
tapasco_jobs_set_arg(jobs, j_id, a, sizeof(v), &v);
}
tapasco_res_t r = TAPASCO_SUCCESS;
tapasco_transfer_t *t = tapasco_jobs_get_arg_transfer(jobs,
j_id, a);
r = tapasco_read_arg(dev_ctx, jobs, j_id, h, a);
if (r != TAPASCO_SUCCESS) { return r; }
r = tapasco_transfer_from(dev_ctx, jobs, j_id, t);
}
// ack the interrupt
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment