From ff1aa9491738c0fa5fd91bdffd83d67e18dfef7d Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Sun, 25 May 2025 22:44:01 +0000 Subject: [PATCH 1/4] cloudskin: add func --- func/omp/CMakeLists.txt | 4 +++ func/omp/elastic_imagesim.cpp | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 func/omp/elastic_imagesim.cpp diff --git a/func/omp/CMakeLists.txt b/func/omp/CMakeLists.txt index 18ac0a5..aad409e 100644 --- a/func/omp/CMakeLists.txt +++ b/func/omp/CMakeLists.txt @@ -54,3 +54,7 @@ omp_func(wtime wtime.cpp) # Custom target for building all functions add_custom_target(omp_all_funcs DEPENDS ${ALL_OMP_FUNCS}) + +# CloudSkin Func +faasm_func(elastic_imagesim ./elastic_imagesim.cpp) +target_link_libraries(elastic_imagesim faasmp faasm) diff --git a/func/omp/elastic_imagesim.cpp b/func/omp/elastic_imagesim.cpp new file mode 100644 index 0000000..097eded --- /dev/null +++ b/func/omp/elastic_imagesim.cpp @@ -0,0 +1,68 @@ +#include +#include +#include + +#include +#include +#include +#include + +/* This method performs an image similarity search with an elastic number of + * threads + */ +bool doImageSim(int numThreads) +{ +#pragma omp parallel for num_threads(numThreads) + // We need to give the for loop room to potentially grow when numThreads is + // internally scaled-up by the runtime. Here we hard-code to a situation + // where we initially run with numThreads = nproc / 2 and elastically scale + // to numThreads = nproc + for (int i = 0; i < numThreads * 2; i++) { + usleep(2 * 1000 * 1000); + + int callId = -1; + while (callId == -1) { + callId = faasmChainNamed("imagesim", nullptr, 0); + + if (callId == -1) { + printf("ERROR: executing image-similarity (no hosts?)\n"); + usleep(1 * 1000 * 1000); + } + } + + faasmAwaitCall(callId); + } + + return 0; +} + +void doSleep(int numThreads) +{ +#pragma omp parallel for num_threads(numThreads) + for (int i = 0; i < numThreads; i++) { + usleep(2 * 1000 * 1000); + } +} + +int main(int argc, char** argv) +{ + if (argc != 3) { + printf("ERROR: usage: \n"); + return 1; + } + int numThreads = std::stoi(argv[1]); + int itersPerThread = std::stoi(argv[2]); + + // If we pass an itersPerThread of 0, it means that we don't run the image + // simulation workload, instead we just sleep on each thread for a while to + // take up resources + if (itersPerThread == 0) { + doSleep(numThreads); + } + + for (int i = 0; i < itersPerThread; i++) { + doImageSim(numThreads); + } + + return 0; +} From 28694521c66269e3ea311b39b4693f064f722a5f Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 16 Dec 2025 19:32:25 +0000 Subject: [PATCH 2/4] libfaasm: changes for cloudskin --- func/omp/elastic_imagesim.cpp | 79 ++++++++++++++++++++++++++------- libfaasm/core.cpp | 3 +- libfaasm/faasm/core.h | 1 + libfaasm/faasm/host_interface.h | 7 +++ libfaasm/libfaasm.imports | 1 + 5 files changed, 74 insertions(+), 17 deletions(-) diff --git a/func/omp/elastic_imagesim.cpp b/func/omp/elastic_imagesim.cpp index 097eded..d0f723e 100644 --- a/func/omp/elastic_imagesim.cpp +++ b/func/omp/elastic_imagesim.cpp @@ -3,34 +3,71 @@ #include #include +#include +#include #include #include #include +#define BUCKET_NAME "faasm" + +std::vector listS3Keys(const std::string& path) +{ + std::vector s3files; + int numKeys = __faasm_s3_get_num_keys_with_prefix(BUCKET_NAME, path.c_str()); + + char **keysBuffer = (char **)malloc(numKeys * sizeof(char *)); + int *keysBufferLens = (int *)malloc(numKeys * sizeof(int32_t)); + + __faasm_s3_list_keys_with_prefix(BUCKET_NAME, path.c_str(), + keysBuffer, keysBufferLens); + + for (int i = 0; i < numKeys; i++) { + std::string tmpString; + tmpString.assign(keysBuffer[i], keysBuffer[i] + keysBufferLens[i]); + s3files.push_back(tmpString); + } + + return s3files; +} + /* This method performs an image similarity search with an elastic number of * threads */ -bool doImageSim(int numThreads) +bool doImageSim(const std::vector& images, int numThreads) { + // Split image array into even slices. + auto iters = 2 * numThreads; + const std::size_t n = images.size(); + const std::size_t base = n / static_cast(iters); + const std::size_t rem = n % static_cast(iters); + #pragma omp parallel for num_threads(numThreads) // We need to give the for loop room to potentially grow when numThreads is // internally scaled-up by the runtime. Here we hard-code to a situation // where we initially run with numThreads = nproc / 2 and elastically scale // to numThreads = nproc - for (int i = 0; i < numThreads * 2; i++) { + for (int i = 0; i < iters; i++) { usleep(2 * 1000 * 1000); - int callId = -1; - while (callId == -1) { - callId = faasmChainNamed("imagesim", nullptr, 0); + const std::size_t ui = static_cast(i); + const std::size_t start = ui * base + std::min(ui, rem); + const std::size_t len = base + (ui < rem ? 1u : 0u); - if (callId == -1) { - printf("ERROR: executing image-similarity (no hosts?)\n"); - usleep(1 * 1000 * 1000); - } - } + std::cout << "Thread processing " << len << " images" << std::endl; + for (auto i = 0; i < len; i++) { + int callId = -1; + while (callId == -1) { + callId = faasmChainNamed("imagesim", images.at(i).c_str(), nullptr, 0); - faasmAwaitCall(callId); + if (callId == -1) { + printf("ERROR: executing image-similarity (no hosts?)\n"); + usleep(1 * 1000 * 1000); + } + } + + faasmAwaitCall(callId); + } } return 0; @@ -46,12 +83,13 @@ void doSleep(int numThreads) int main(int argc, char** argv) { - if (argc != 3) { - printf("ERROR: usage: \n"); + if (argc != 4) { + printf("ERROR: usage: \n"); return 1; } - int numThreads = std::stoi(argv[1]); - int itersPerThread = std::stoi(argv[2]); + std::string imageBucket(argv[1]); + int numThreads = std::stoi(argv[2]); + int itersPerThread = std::stoi(argv[3]); // If we pass an itersPerThread of 0, it means that we don't run the image // simulation workload, instead we just sleep on each thread for a while to @@ -60,8 +98,17 @@ int main(int argc, char** argv) doSleep(numThreads); } + // Otherwise, we first need to download all keys in the bucket. + auto images = listS3Keys(imageBucket); + std::vector imageNames; + for (const auto& image : images) { + auto imageName = std::filesystem::path(image).filename().string(); + __faasm_s3_download_key(BUCKET_NAME, image.c_str(), imageName.c_str()); + imageNames.push_back(imageName); + } + for (int i = 0; i < itersPerThread; i++) { - doImageSim(numThreads); + doImageSim(imageNames, numThreads); } return 0; diff --git a/libfaasm/core.cpp b/libfaasm/core.cpp index 0fef908..e8e1e4b 100644 --- a/libfaasm/core.cpp +++ b/libfaasm/core.cpp @@ -159,10 +159,11 @@ unsigned int faasmAwaitCallOutput(unsigned int messageId, } unsigned int faasmChainNamed(const char* name, + const char* cmdline, const uint8_t* inputData, long inputDataSize) { - return __faasm_chain_name(name, inputData, inputDataSize); + return __faasm_chain_name(name, cmdline, inputData, inputDataSize); } unsigned int faasmChain(FaasmFuncPtr funcPtr, diff --git a/libfaasm/faasm/core.h b/libfaasm/faasm/core.h index bb755eb..d4e9024 100644 --- a/libfaasm/faasm/core.h +++ b/libfaasm/faasm/core.h @@ -161,6 +161,7 @@ extern "C" * Chains a function with the given input data */ unsigned int faasmChainNamed(const char* name, + const char* cmdline, const uint8_t* inputData, long inputDataSize); diff --git a/libfaasm/faasm/host_interface.h b/libfaasm/faasm/host_interface.h index d0cd1ec..588a2cd 100644 --- a/libfaasm/faasm/host_interface.h +++ b/libfaasm/faasm/host_interface.h @@ -101,6 +101,7 @@ void __faasm_write_output(const char* output, long outputLen); HOST_IFACE_FUNC unsigned int __faasm_chain_name(const char* name, + const char* cmdline, const unsigned char* inputData, long inputDataSize); @@ -204,4 +205,10 @@ int __faasm_s3_get_key_bytes(const char* bucketName, void* keyBuffer, int* keyBufferLen, bool tolerateMissing = false); + +HOST_IFACE_FUNC +int __faasm_s3_download_key(const char* bucketName, + const char* keyName, + const char* outPath, + bool tolerateMissing = false); #endif diff --git a/libfaasm/libfaasm.imports b/libfaasm/libfaasm.imports index c651cd7..bb8abc8 100644 --- a/libfaasm/libfaasm.imports +++ b/libfaasm/libfaasm.imports @@ -51,6 +51,7 @@ __faasm_s3_list_keys __faasm_s3_list_keys_with_prefix __faasm_s3_add_key_bytes __faasm_s3_get_key_bytes +__faasm_s3_download_key # Test __faasm_host_interface_test From c9c667b179654b016b132aa41cf186ce18b800e2 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 22 Dec 2025 18:59:22 +0000 Subject: [PATCH 3/4] omp: fixes --- func/omp/elastic_imagesim.cpp | 55 ++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/func/omp/elastic_imagesim.cpp b/func/omp/elastic_imagesim.cpp index d0f723e..68ce978 100644 --- a/func/omp/elastic_imagesim.cpp +++ b/func/omp/elastic_imagesim.cpp @@ -36,12 +36,14 @@ std::vector listS3Keys(const std::string& path) */ bool doImageSim(const std::vector& images, int numThreads) { - // Split image array into even slices. - auto iters = 2 * numThreads; + // Split image array into even slices. + auto iters = 2 * numThreads; const std::size_t n = images.size(); const std::size_t base = n / static_cast(iters); const std::size_t rem = n % static_cast(iters); + std::cout << "Hello - n: " << n << " - base: " << base << " - rem: " << rem << std::endl; + #pragma omp parallel for num_threads(numThreads) // We need to give the for loop room to potentially grow when numThreads is // internally scaled-up by the runtime. Here we hard-code to a situation @@ -54,20 +56,21 @@ bool doImageSim(const std::vector& images, int numThreads) const std::size_t start = ui * base + std::min(ui, rem); const std::size_t len = base + (ui < rem ? 1u : 0u); - std::cout << "Thread processing " << len << " images" << std::endl; - for (auto i = 0; i < len; i++) { - int callId = -1; - while (callId == -1) { - callId = faasmChainNamed("imagesim", images.at(i).c_str(), nullptr, 0); - - if (callId == -1) { - printf("ERROR: executing image-similarity (no hosts?)\n"); - usleep(1 * 1000 * 1000); - } - } - - faasmAwaitCall(callId); - } + std::cout << "Thread processing " << len << " images" << std::endl; + for (auto j = 0; j < len; j++) { + const std::size_t idx = start + j; + int callId = -1; + while (callId == -1) { + callId = faasmChainNamed("imagesim", images.at(idx).c_str(), nullptr, 0); + + if (callId == -1) { + printf("ERROR: executing image-similarity (no hosts?)\n"); + usleep(1 * 1000 * 1000); + } + } + + faasmAwaitCall(callId); + } } return 0; @@ -100,15 +103,25 @@ int main(int argc, char** argv) // Otherwise, we first need to download all keys in the bucket. auto images = listS3Keys(imageBucket); - std::vector imageNames; + std::vector imageNames; for (const auto& image : images) { - auto imageName = std::filesystem::path(image).filename().string(); - __faasm_s3_download_key(BUCKET_NAME, image.c_str(), imageName.c_str()); - imageNames.push_back(imageName); + auto imageName = std::filesystem::path(image).filename().string(); + __faasm_s3_download_key(BUCKET_NAME, image.c_str(), imageName.c_str()); + imageNames.push_back(imageName); + } + // FIXME: this images yield a runtime error in the KNN search. + + std::vector imageNamesLocal; + for (const auto& entry : std::filesystem::directory_iterator("data/test_embeddings/")) { + if (entry.is_regular_file()) { + std::cout << "adding file: " << entry.path() << std::endl; + imageNamesLocal.push_back(entry.path()); + // std::cout << entry.path() << '\n'; + } } for (int i = 0; i < itersPerThread; i++) { - doImageSim(imageNames, numThreads); + doImageSim(imageNamesLocal, numThreads); } return 0; From 71aff1c29174cebd46ba498618679deb1e366701 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 23 Dec 2025 17:03:28 +0000 Subject: [PATCH 4/4] cloudskin: more work --- func/omp/elastic_imagesim.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/func/omp/elastic_imagesim.cpp b/func/omp/elastic_imagesim.cpp index 68ce978..1e78afe 100644 --- a/func/omp/elastic_imagesim.cpp +++ b/func/omp/elastic_imagesim.cpp @@ -80,7 +80,7 @@ void doSleep(int numThreads) { #pragma omp parallel for num_threads(numThreads) for (int i = 0; i < numThreads; i++) { - usleep(2 * 1000 * 1000); + usleep(5 * 1000 * 1000); } } @@ -99,6 +99,7 @@ int main(int argc, char** argv) // take up resources if (itersPerThread == 0) { doSleep(numThreads); + return 0; } // Otherwise, we first need to download all keys in the bucket.