diff --git a/func/omp/CMakeLists.txt b/func/omp/CMakeLists.txt index 18ac0a5..aad409e 100644 --- a/func/omp/CMakeLists.txt +++ b/func/omp/CMakeLists.txt @@ -54,3 +54,7 @@ omp_func(wtime wtime.cpp) # Custom target for building all functions add_custom_target(omp_all_funcs DEPENDS ${ALL_OMP_FUNCS}) + +# CloudSkin Func +faasm_func(elastic_imagesim ./elastic_imagesim.cpp) +target_link_libraries(elastic_imagesim faasmp faasm) diff --git a/func/omp/elastic_imagesim.cpp b/func/omp/elastic_imagesim.cpp new file mode 100644 index 0000000..1e78afe --- /dev/null +++ b/func/omp/elastic_imagesim.cpp @@ -0,0 +1,129 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#define BUCKET_NAME "faasm" + +std::vector listS3Keys(const std::string& path) +{ + std::vector s3files; + int numKeys = __faasm_s3_get_num_keys_with_prefix(BUCKET_NAME, path.c_str()); + + char **keysBuffer = (char **)malloc(numKeys * sizeof(char *)); + int *keysBufferLens = (int *)malloc(numKeys * sizeof(int32_t)); + + __faasm_s3_list_keys_with_prefix(BUCKET_NAME, path.c_str(), + keysBuffer, keysBufferLens); + + for (int i = 0; i < numKeys; i++) { + std::string tmpString; + tmpString.assign(keysBuffer[i], keysBuffer[i] + keysBufferLens[i]); + s3files.push_back(tmpString); + } + + return s3files; +} + +/* This method performs an image similarity search with an elastic number of + * threads + */ +bool doImageSim(const std::vector& images, int numThreads) +{ + // Split image array into even slices. + auto iters = 2 * numThreads; + const std::size_t n = images.size(); + const std::size_t base = n / static_cast(iters); + const std::size_t rem = n % static_cast(iters); + + std::cout << "Hello - n: " << n << " - base: " << base << " - rem: " << rem << std::endl; + +#pragma omp parallel for num_threads(numThreads) + // We need to give the for loop room to potentially grow when numThreads is + // internally scaled-up by the runtime. Here we hard-code to a situation + // where we initially run with numThreads = nproc / 2 and elastically scale + // to numThreads = nproc + for (int i = 0; i < iters; i++) { + usleep(2 * 1000 * 1000); + + const std::size_t ui = static_cast(i); + const std::size_t start = ui * base + std::min(ui, rem); + const std::size_t len = base + (ui < rem ? 1u : 0u); + + std::cout << "Thread processing " << len << " images" << std::endl; + for (auto j = 0; j < len; j++) { + const std::size_t idx = start + j; + int callId = -1; + while (callId == -1) { + callId = faasmChainNamed("imagesim", images.at(idx).c_str(), nullptr, 0); + + if (callId == -1) { + printf("ERROR: executing image-similarity (no hosts?)\n"); + usleep(1 * 1000 * 1000); + } + } + + faasmAwaitCall(callId); + } + } + + return 0; +} + +void doSleep(int numThreads) +{ +#pragma omp parallel for num_threads(numThreads) + for (int i = 0; i < numThreads; i++) { + usleep(5 * 1000 * 1000); + } +} + +int main(int argc, char** argv) +{ + if (argc != 4) { + printf("ERROR: usage: \n"); + return 1; + } + std::string imageBucket(argv[1]); + int numThreads = std::stoi(argv[2]); + int itersPerThread = std::stoi(argv[3]); + + // If we pass an itersPerThread of 0, it means that we don't run the image + // simulation workload, instead we just sleep on each thread for a while to + // take up resources + if (itersPerThread == 0) { + doSleep(numThreads); + return 0; + } + + // Otherwise, we first need to download all keys in the bucket. + auto images = listS3Keys(imageBucket); + std::vector imageNames; + for (const auto& image : images) { + auto imageName = std::filesystem::path(image).filename().string(); + __faasm_s3_download_key(BUCKET_NAME, image.c_str(), imageName.c_str()); + imageNames.push_back(imageName); + } + // FIXME: this images yield a runtime error in the KNN search. + + std::vector imageNamesLocal; + for (const auto& entry : std::filesystem::directory_iterator("data/test_embeddings/")) { + if (entry.is_regular_file()) { + std::cout << "adding file: " << entry.path() << std::endl; + imageNamesLocal.push_back(entry.path()); + // std::cout << entry.path() << '\n'; + } + } + + for (int i = 0; i < itersPerThread; i++) { + doImageSim(imageNamesLocal, numThreads); + } + + return 0; +} diff --git a/libfaasm/core.cpp b/libfaasm/core.cpp index 0fef908..e8e1e4b 100644 --- a/libfaasm/core.cpp +++ b/libfaasm/core.cpp @@ -159,10 +159,11 @@ unsigned int faasmAwaitCallOutput(unsigned int messageId, } unsigned int faasmChainNamed(const char* name, + const char* cmdline, const uint8_t* inputData, long inputDataSize) { - return __faasm_chain_name(name, inputData, inputDataSize); + return __faasm_chain_name(name, cmdline, inputData, inputDataSize); } unsigned int faasmChain(FaasmFuncPtr funcPtr, diff --git a/libfaasm/faasm/core.h b/libfaasm/faasm/core.h index bb755eb..d4e9024 100644 --- a/libfaasm/faasm/core.h +++ b/libfaasm/faasm/core.h @@ -161,6 +161,7 @@ extern "C" * Chains a function with the given input data */ unsigned int faasmChainNamed(const char* name, + const char* cmdline, const uint8_t* inputData, long inputDataSize); diff --git a/libfaasm/faasm/host_interface.h b/libfaasm/faasm/host_interface.h index d0cd1ec..588a2cd 100644 --- a/libfaasm/faasm/host_interface.h +++ b/libfaasm/faasm/host_interface.h @@ -101,6 +101,7 @@ void __faasm_write_output(const char* output, long outputLen); HOST_IFACE_FUNC unsigned int __faasm_chain_name(const char* name, + const char* cmdline, const unsigned char* inputData, long inputDataSize); @@ -204,4 +205,10 @@ int __faasm_s3_get_key_bytes(const char* bucketName, void* keyBuffer, int* keyBufferLen, bool tolerateMissing = false); + +HOST_IFACE_FUNC +int __faasm_s3_download_key(const char* bucketName, + const char* keyName, + const char* outPath, + bool tolerateMissing = false); #endif diff --git a/libfaasm/libfaasm.imports b/libfaasm/libfaasm.imports index c651cd7..bb8abc8 100644 --- a/libfaasm/libfaasm.imports +++ b/libfaasm/libfaasm.imports @@ -51,6 +51,7 @@ __faasm_s3_list_keys __faasm_s3_list_keys_with_prefix __faasm_s3_add_key_bytes __faasm_s3_get_key_bytes +__faasm_s3_download_key # Test __faasm_host_interface_test